This is a three-part series of a POC on how to build a near Realtime Processing system using Apache Storm and Kafka in Java. So to give a brief introduction on how the system works, messages come into a Kafka topic, Storm picks up these messages using Kafka Spout and gives it to a Bolt, which parses and identifies the message type based on the header. Once the message type is identified, the content of the message is extracted and is sent to different bolts for persistence - SOLR bolt, MongoDB bolt or HDFS bolt.
In this first part, we will be dealing with setting up of the environment. If you already have the environment setup, you can jump to the Part 2 which talks about how to setup the project in Eclipse and how to write the Bolts. Execution of the project and creation of Spout is discussed in the Part 3
The source code for this project is available in my github
For building this system, we would require
- Hadoop 2.6.0
- Zookeeper 3.4.6
- Apache Kafka 0.8.2.1 Scala2.9.1
- Apache Solr 5.3.1
- MongoDB 3.0.7
- Apache Storm 0.10.0
Note: All are single node setup
Versions I used are given in italics, It is not necessary to use the same versions but there might be some changes needed if versions are different.
Update (07/05/2016) : The source code for Storm-1.0 is available in the branch storm1.0-kafka-poc
I am assuming that hadoop is installed and I am not going through the installation steps here. If not you can do so easily by following the instructions here. After installation, start hadoop daemons and verify that all daemons have started by running jps command.
Zookeeper setup is also pretty straight forward, you can download zookeeper from here. Once downloaded, extract the archive and look for a zoo_sample.cfg inside conf folder. Copy it and change the name to zoo.cfg
For single node setup most of the configurations in zoo_sample.cfg can be used as is, the only configuration that I usually change is
Create a new directory of your convenience and point dataDir to that directory to. e.g.,
Start the zookeeper server by running below command from zookeeper base directory.
Download the kafka binary from here and unpack it. Now start the kafka broker by running below command from kafka base directory
config/server.properties holds the information about the kafka broker. Take a note of
Now we need to create a kafka topic to which the message will be posted. Let’s name this topic ‘incoming’. For creating a topic, we need to specify the topic name by
--topic, zookeeper url by
--zookeeper, number of partitions and replication factor. Run the below command for creating the topic.
Let’s now test if the topic has be created successfully by posting and retrieving some messages. Open a new terminal and run below command
In another terminal start a kafka console producer and send a sample message.
Check the terminal running the consumer if the message(‘hdfs testmessage’) has been received.
Download the solr distribution from here and unpack it. Start the Solr server by running
You can now access the Solr UI via http://localhost:8983/solr/#/
Solr home page:
Now we need to create a collection in solr. This will be used by our Storm topology to store the message of the type solr. For creating a collection there are a set of configuration files needed, solr provides us basic configuration files which can be used for this. These files are available in
Let’s first create a new folder and copy the basic configs to it.
We need to change the default schema given by the basic configuration. To do that, open the file schema.xml in
add the below line after
<field name="id" . . ./>. This adds a field named value of the type string, it is a required attribute and is stored (stored=true makes the field retrievable while doing search). Indexed = false indicates that we are not going to do search on this field.
Now we have modified the schema as per our requirement, we will go ahead and create collection - named collection1. To create the collection, run
Download and unpack mongodb from here. We need to create a folder which will be used by mongodb as data directory.
mongod daemon by running below command from mongodb installation folder. We need to pass the path of the data directory to mongod script.
Now we will be creating a database and a collection to store our messages. Open another terminal and run below commands.
use storm creates a new database called storm and switches to it.
createCollection("collection1") creates a new collection named ‘collection1’
Download Storm distribution from here and unpack it. It is better to add the STORM_HOME/bin to you PATH. You can do so by changing your
bash_profile. *Note: this step by might vary based on your OS. See how to set the path variable for Linux or Mac.
We need to start the storm master - nimbus and the slave supervisor. Along with these we will also start Storm UI server and logviewer - this enables us to view the logs from storm ui
Check http://localhost:8080/index.html and make sure that supervisor and the nimbus servers has been started.
We have completed the environment setup and in the next part we will see how to setup the Eclipse project and start writing Storm Topology.