A 'Kafka > Storm > Kafka' Topology gotcha
If you're trying to make a Kafka Storm
topology work, and are getting baffled by your recipient topic not receiving any damn thing, here's the secret:
The default
org.apache.storm.kafka.bolt.KafkaBolt
implementation expects only a single key field from the upstream (Bolt/Spout
)If you're tying your
KafkaBolt
to aKafkaSpout
, you've got to use the internal name:str
However, if you have an upstream Bolt, doing some filtering, then make sure that you tie the name of your ONLY output field (
value
) to theKafkaBolt
Let me break it down a little bit more for the larger good.
Consider a very basic Storm topology where we read raw messages from a Kafka Topic (say, raw_records
), enrich/cleanse them (in a Bolt), and publish these enriched/filtered records on another Kaka Topic (say, filtered_records
).
Given that the final publisher (the guy that talks to filtered_records
) is a KafkaBolt
, it needs a way to find out the relevant key
that the values are available from. And that key
is what you need to specify/detect from the upstream bolt or spout.
So, the declared output field of the upstream Bolt
would be something like:
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(new String[]{"output"}));
}
Note the key
field named "output".
Now, in KafkaBolt
the only thing to take care of is using this key
field in the configuration, like so:
KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL,
OUTPUT_TOPIC))
.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key",
"output"));
The default key field name is "message
", so you could as well use the no-arg constructor of FieldNameBasedTupleToKafkaMapper
, by specifying the upstream key as "message
".
If however, you have scenario where you'd want to pass both the key
and value
from the upstream, for example,
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(new String[]{"word","count"}));
}
Note that we've specified the key field here as "word".
Then obviously, we need to use this (modified) key name downstream, like so:
KafkaBolt bolt = (new KafkaBolt()).withProducerProperties(newProps(BROKER_URL,
OUTPUT_TOPIC))
.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word",
"count"));
Update (2017-08-23): Added the scenario where a modified key name can be used.