This is the last part of the blog Realtime Processing using Storm and Kafka. You can find the previous parts here - Part 1, Part 2. In this section we will develop the Kafka Spout, Storm Topology and execute the project.
The source code for this project is available in my github
Creating Kafka Spout
Kafka spout reads from the kafka topic we created. So it has to know how to connect to Kafka broker, the name of the topic from which it has to read, zookeeper root and consumer group id. Zookeeper root and group id is used by the spout to store the offset information of till where it has read from the topic. In case of failure, the spout can use this information to start reading from where it failed. If zkRoot is ‘kafka’ and consumer group id is ‘sample_group’, then /kafka/sample_group will be created in zookeeper.
Below java method creates a KafkaSpout. It first creates SpoutConfig using the values form the default_config.properties file and then passes it on to KafkaSpout class. This method is written inside the class SpoutBuilder.java
Building the Topology
Topology.java is the main class which connects all the spouts and bolts together. Below diagram shows how the spout and bolts are connected together. Kafka spout picks up message from the topic.SinkTypeBolt listens to the KafkaSpout. SinkTypeBolt emits the tuples in three streams. SOLR bolt listens to the solr stream of SinkTypeBolt and similarly HDFS bolt and MongoDB bolt listens to hdfs stream and the mongodb stream of the SinkTypeBolt respectively.
The Topology class uses SpoutBuilder and BoltBuilder to build all the spouts and bolts
These spouts and bolts are linked together by the TopologyBuilder class. Each spout should define from which stream it should receive it’s input from. e.g., If bolt ‘B’ wants to receive it’s input from bolt ‘A’, then we should call
If bolt ‘A’ is emitting multiple streams -x and y, then bolt ‘B’ should also specify the stream name of bolt ‘A’. It would look something like
kafkaSpoutCount : parallelism-hint for the kafkaSpout - defines number of executors/threads to be spawn per container
Note: shuffleGrouping is one of the eight stream grouping methods available in Storm (it sends the tuples to bolts in random). Another type of grouping is fieldsGrouping - in fields grouping, the tuples are grouped based on a specified field and the tuples having same value for that field is always sent to the same task. We can also implement custom grouping by implementing the interface CustomStreamGrouping.
Finally the topology can be submitted by
For execution, we need to start the below servers
- Hadoop servers
- Solr server
- Kafka broker
- Mongod server
- Storm nimbus
- Storm supervisor
- Storm UI (optional)
Build the jar using the command
mvn clean install. The command will create your toplogy jar with all the dependencies -
Run the jar using the command
com.vishnu.storm is the package name and
Topology is the class containing the main method.
Open your storm UI at http://localhost:8080/ and verify that job has been deployed correctly. Storm UI provides a very good visualization of the toplogy, you can view it by clicking
Now let us insert some sample messages for each of the sinks - MongoDB, SOLR and HDFS and check if those messages makes their way to the destination. To do that, start your kafka-console-producer. If you had forgotten the name of the kafka topic we created earlier (I know I did !) you can use the following command from kafka base folder.
We now verify each of the sinks
1) MongoDB - from your mongodb folder, you can run
2) SOLR - You can see the Solr message by accessing the SOLR UI url.
3) HDFS - You can either run
hadoop fs -ls /from_storm or access namenode UI url.
I hope you got a fair idea about how to integrate Storm, Kafka, MongoDB, SOLR and HDFS for Realtime analysis. Although this was implemented in a single node cluster for learning purpose, it can be extended for multi-node scenarios as well. For further doubts and clarifications please comment below and I will respond as soon as possible.