This is a three-part series of a POC on how to build a near Realtime Processing system using Apache Storm and Kafka in Java. So to give a brief introduction on how the system works, messages come into a Kafka topic, Storm picks up these messages using Kafka Spout and gives it to a Bolt, which parses and identifies the message type based on the header. Once the message type is identified, the content of the message is extracted and is sent to different bolts for persistence - SOLR bolt, MongoDB bolt or HDFS bolt.

In this first part, we will be dealing with setting up of the environment. If you already have the environment setup, you can jump to the Part 2 which talks about how to setup the project in Eclipse and how to write the Bolts. Execution of the project and creation of Spout is discussed in the Part 3

The source code for this project is available in my github

Setup

For building this system, we would require

  1. Hadoop 2.6.0
  2. Zookeeper 3.4.6
  3. Apache Kafka 0.8.2.1 Scala2.9.1
  4. Apache Solr 5.3.1
  5. MongoDB 3.0.7
  6. Apache Storm 0.10.0

Note: All are single node setup

Versions I used are given in italics, It is not necessary to use the same versions but there might be some changes needed if versions are different.
Update (07/05/2016) : The source code for Storm-1.0 is available in the branch storm1.0-kafka-poc

Hadoop

I am assuming that hadoop is installed and I am not going through the installation steps here. If not you can do so easily by following the instructions here. After installation, start hadoop daemons and verify that all daemons have started by running jps command.

$ jps
12768 QuorumPeerMain
14848 SecondaryNameNode
15024 NodeManager
14949 ResourceManager
14758 DataNode
15067 Jps
14687 NameNode

Zookeeper

Zookeeper setup is also pretty straight forward, you can download zookeeper from here. Once downloaded, extract the archive and look for a zoo_sample.cfg inside conf folder. Copy it and change the name to zoo.cfg

$cp zoo_sample.cfg zoo.cfg

For single node setup most of the configurations in zoo_sample.cfg can be used as is, the only configuration that I usually change is dataDir=/tmp/zookeeper. Create a new directory of your convenience and point dataDir to that directory to. e.g., dataDir=/Users/vishnu/zookeeper_data

Start the zookeeper server by running below command from zookeeper base directory.

$bin/zkServer.sh start
Starting zookeeper ... STARTED

Apache Kafka

Download the kafka binary from here and unpack it. Now start the kafka broker by running below command from kafka base directory

$bin/kafka-server-start.sh config/server.properties
[2015-11-06 10:00:54,412] INFO Registered broker 0 at path /brokers/ids/0 with address 10.0.0.8:9092. (kafka.utils.ZkUtils$)
[2015-11-06 10:00:54,418] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

config/server.properties holds the information about the kafka broker. Take a note of

broker.id=0
port=9092
zookeeper.connect=localhost:2181

Now we need to create a kafka topic to which the message will be posted. Let’s name this topic ‘incoming’. For creating a topic, we need to specify the topic name by --topic, zookeeper url by --zookeeper, number of partitions and replication factor. Run the below command for creating the topic.

$bin/kafka-topics.sh --create --topic incoming --zookeeper localhost:2181 --partitions 1 --replication-factor 1
Created topic "incoming"

Let’s now test if the topic has be created successfully by posting and retrieving some messages. Open a new terminal and run below command

$bin/kafka-console-consumer.sh --topic incoming --zookeeper localhost:2181 

In another terminal start a kafka console producer and send a sample message.

$bin/kafka-console-producer.sh --topic incoming --broker localhost:9092
hdfs testmessage

Check the terminal running the consumer if the message(‘hdfs testmessage’) has been received.

Apache Solr

Download the solr distribution from here and unpack it. Start the Solr server by running

$bin/solr start
Waiting up to 30 seconds to see Solr running on port 8983 [/]  
Started Solr server on port 8983 (pid=15373). Happy searching! 

You can now access the Solr UI via http://localhost:8983/solr/#/

Solr home page:

Now we need to create a collection in solr. This will be used by our Storm topology to store the message of the type solr. For creating a collection there are a set of configuration files needed, solr provides us basic configuration files which can be used for this. These files are available in

SOLR_BASE/server/solr/configsets/basic_configs/conf

Let’s first create a new folder and copy the basic configs to it.

$mkdir server/solr/collection1
$cp -r server/solr/configsets/basic_configs/conf/ server/solr/collection1/conf

We need to change the default schema given by the basic configuration. To do that, open the file schema.xml in server/solr/collection1/conf

add the below line after <field name="id" . . ./>. This adds a field named value of the type string, it is a required attribute and is stored (stored=true makes the field retrievable while doing search). Indexed = false indicates that we are not going to do search on this field.

<field name="value" type="string" indexed="false" stored="true" required="true"/>

Now we have modified the schema as per our requirement, we will go ahead and create collection - named collection1. To create the collection, run

$bin/solr create -c collection1
Creating new core 'collection1' using command:
http://localhost:8983/solr/admin/cores?action=CREATE&name=collection1&instanceDir=collection1

{
  "responseHeader":{
    "status":0,
    "QTime":539},
  "core":"collection1"}

You can view the collection via http://localhost:8983/solr/#/collection1. Also, make sure that the fields id and value are created correctly from the dropdown in schema browser.

MongoDB

Download and unpack mongodb from here. We need to create a folder which will be used by mongodb as data directory.

mkdir mongodb_data

Start the mongod daemon by running below command from mongodb installation folder. We need to pass the path of the data directory to mongod script.

bin/mongod --dbpath /Users/vishnu/mongodb_data
...
2015-11-07T17:51:05.223-0600 I STORAGE  [FileAllocator] done allocating datafile /Users/vishnu/mongodb_data/local.0, size: 64MB,  took 0.039 secs
2015-11-07T17:51:05.238-0600 I NETWORK  [initandlisten] waiting for connections on port 27017

Now we will be creating a database and a collection to store our messages. Open another terminal and run below commands.

$ bin/mongo
MongoDB shell version: 3.0.7
connecting to: test
> use storm
switched to db storm
> db.createCollection("collection1");
{ "ok" : 1 }

use storm creates a new database called storm and switches to it. createCollection("collection1") creates a new collection named ‘collection1’

Apache Storm

Download Storm distribution from here and unpack it. It is better to add the STORM_HOME/bin to you PATH. You can do so by changing your bash_profile. *Note: this step by might vary based on your OS. See how to set the path variable for Linux or Mac.

export STORM_HOME=/Users/vishnu/apache-storm-0.10.0
export PATH=$PATH/:$STORM_HOME/bin

We need to start the storm master - nimbus and the slave supervisor. Along with these we will also start Storm UI server and logviewer - this enables us to view the logs from storm ui

$bin/storm nimbus
$bin/storm supervisor
$bin/storm ui
$bin/storm logviewer

Check http://localhost:8080/index.html and make sure that supervisor and the nimbus servers has been started.

We have completed the environment setup and in the next part we will see how to setup the Eclipse project and start writing Storm Topology.

Next