Realtime Processing using Storm-Kafka- Part1

This is a three-part series of a POC on how to build a near Realtime Processing system using Apache Storm and Kafka in Java. So to give a brief introduction on how the system works, messages come into a Kafka topic, Storm picks up these messages using Kafka Spout and gives it to a Bolt, which parses and identifies the message type based on the header. Once the message type is identified, the content of the message is extracted and is sent to different bolts for persistence - SOLR bolt, MongoDB bolt or HDFS bolt.
In this first part, we will be dealing with setting up of the environment. If you already have the environment setup, you can jump to the Part 2 which talks about how to setup the project in Eclipse and how to write the Bolts. Execution of the project and creation of Spout is discussed in the Part 3
The source code for this project is available in my github
Setup
For building this system, we would require
- Hadoop 2.6.0
- Zookeeper 3.4.6
- Apache Kafka 0.8.2.1 Scala2.9.1
- Apache Solr 5.3.1
- MongoDB 3.0.7
- Apache Storm 0.10.0
Note: All are single node setup
Versions I used are given in italics, It is not necessary to use the same versions but there might be some changes needed if versions are different.
Update (07/05/2016) : The source code for Storm-1.0 is available in the branch storm1.0-kafka-poc
Hadoop
I am assuming that hadoop is installed and I am not going through the installation steps here. If not you can do so easily by following the instructions here. After installation, start hadoop daemons and verify that all daemons have started by running jps command.
$ jps
12768 QuorumPeerMain
14848 SecondaryNameNode
15024 NodeManager
14949 ResourceManager
14758 DataNode
15067 Jps
14687 NameNode
Zookeeper
Zookeeper setup is also pretty straight forward, you can download zookeeper from here. Once downloaded, extract the archive and look for a zoo_sample.cfg inside conf folder. Copy it and change the name to zoo.cfg
$cp zoo_sample.cfg zoo.cfg
For single node setup most of the configurations in zoo_sample.cfg can be used as is, the only configuration that I usually change is dataDir=/tmp/zookeeper
.
Create a new directory of your convenience and point dataDir to that directory to. e.g., dataDir=/Users/vishnu/zookeeper_data
Start the zookeeper server by running below command from zookeeper base directory.
$bin/zkServer.sh start
Starting zookeeper ... STARTED
Apache Kafka
Download the kafka binary from here and unpack it. Now start the kafka broker by running below command from kafka base directory
$bin/kafka-server-start.sh config/server.properties
[2015-11-06 10:00:54,412] INFO Registered broker 0 at path /brokers/ids/0 with address 10.0.0.8:9092. (kafka.utils.ZkUtils$)
[2015-11-06 10:00:54,418] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
config/server.properties holds the information about the kafka broker. Take a note of
broker.id=0
port=9092
zookeeper.connect=localhost:2181
Now we need to create a kafka topic to which the message will be posted. Let’s name this topic ‘incoming’. For creating a topic, we need to specify the topic name by --topic
, zookeeper url by --zookeeper
, number of partitions and replication factor. Run the below command for creating the topic.
$bin/kafka-topics.sh --create --topic incoming --zookeeper localhost:2181 --partitions 1 --replication-factor 1
Created topic "incoming"
Let’s now test if the topic has be created successfully by posting and retrieving some messages. Open a new terminal and run below command
$bin/kafka-console-consumer.sh --topic incoming --zookeeper localhost:2181
In another terminal start a kafka console producer and send a sample message.
$bin/kafka-console-producer.sh --topic incoming --broker localhost:9092
hdfs testmessage
Check the terminal running the consumer if the message(‘hdfs testmessage’) has been received.
Apache Solr
Download the solr distribution from here and unpack it. Start the Solr server by running
$bin/solr start
Waiting up to 30 seconds to see Solr running on port 8983 [/]
Started Solr server on port 8983 (pid=15373). Happy searching!
You can now access the Solr UI via http://localhost:8983/solr/#/
Solr home page:

Now we need to create a collection in solr. This will be used by our Storm topology to store the message of the type solr. For creating a collection there are a set of configuration files needed, solr provides us basic configuration files which can be used for this. These files are available in
SOLR_BASE/server/solr/configsets/basic_configs/conf
Let’s first create a new folder and copy the basic configs to it.
$mkdir server/solr/collection1
$cp -r server/solr/configsets/basic_configs/conf/ server/solr/collection1/conf
We need to change the default schema given by the basic configuration. To do that, open the file schema.xml in server/solr/collection1/conf
add the below line after <field name="id" . . ./>
. This adds a field named value of the type string, it is a required attribute and is stored (stored=true makes the field retrievable while doing search). Indexed = false indicates that we are not going to do search on this field.
<field name="value" type="string" indexed="false" stored="true" required="true"/>
Now we have modified the schema as per our requirement, we will go ahead and create collection - named collection1. To create the collection, run
$bin/solr create -c collection1
Creating new core 'collection1' using command:
http://localhost:8983/solr/admin/cores?action=CREATE&name=collection1&instanceDir=collection1
{
"responseHeader":{
"status":0,
"QTime":539},
"core":"collection1"}
You can view the collection via http://localhost:8983/solr/#/collection1. Also, make sure that the fields id and value are created correctly from the dropdown in schema browser.
MongoDB
Download and unpack mongodb from here. We need to create a folder which will be used by mongodb as data directory.
mkdir mongodb_data
Start the mongod
daemon by running below command from mongodb installation folder. We need to pass the path of the data directory to mongod script.
bin/mongod --dbpath /Users/vishnu/mongodb_data
...
2015-11-07T17:51:05.223-0600 I STORAGE [FileAllocator] done allocating datafile /Users/vishnu/mongodb_data/local.0, size: 64MB, took 0.039 secs
2015-11-07T17:51:05.238-0600 I NETWORK [initandlisten] waiting for connections on port 27017
Now we will be creating a database and a collection to store our messages. Open another terminal and run below commands.
$ bin/mongo
MongoDB shell version: 3.0.7
connecting to: test
> use storm
switched to db storm
> db.createCollection("collection1");
{ "ok" : 1 }
use storm
creates a new database called storm and switches to it. createCollection("collection1")
creates a new collection named ‘collection1’
Apache Storm
Download Storm distribution from here and unpack it. It is better to add the STORM_HOME/bin to you PATH. You can do so by changing your bash_profile
. *Note: this step by might vary based on your OS. See how to set the path variable for Linux or Mac.
export STORM_HOME=/Users/vishnu/apache-storm-0.10.0
export PATH=$PATH/:$STORM_HOME/bin
We need to start the storm master - nimbus and the slave supervisor. Along with these we will also start Storm UI server and logviewer - this enables us to view the logs from storm ui
$bin/storm nimbus
$bin/storm supervisor
$bin/storm ui
$bin/storm logviewer
Check http://localhost:8080/index.html and make sure that supervisor and the nimbus servers has been started.

We have completed the environment setup and in the next part we will see how to setup the Eclipse project and start writing Storm Topology.