Saturday, February 4, 2017

Simple Spark Streaming with Kafka in Docker

I have tried to use simple way of Data streaming in Spark from Kafka using Docker environment. For this test you would need Docker installed and has access to Internet.

1. First start the docker (I am using boot2docker in Windows environment) as:
$ docker-machine start default
$ docker-machine ssh default or ssh docker@localhost -p 23

2. Inside docker environment load the Spark docker images (for this example I have used sequenceiq/spark)
$ docker pull sequenceiq/spark

3. his will pull the latest docker images to your docker environment. Once pull is complete run the following command in docker
$ docker run -it -p 4040:4040 -p 2181:2181 -p 9092:9092 -v /<some_path_to_share>:/data sequenceiq/spark:1.6.0 /bin/bash

following this command the spark image console is appear.

4. Now download the Apache Kafka from (https://kafka.apache.org/downloads) page. Choose suitable kafka version. For this example purpose, I downloaded kafka_2.10-0.10.1.0.tgz for Scala version 2.10

5. Unzip the file as:
$ tar -xvf kafka_2.10-0.10.1.0.tgz
$ mv kafka_2.10-0.10.1.0 /usr/local/kafka
$ cd /usr/local/kafka/bin

5. Now after download and unpack and move to suitable folder, start zookeeper and kafka from bin folder as below:
$ ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
$ ./kafka-server-start.sh /usr/local/kafka/config/server.properties

6. After the zookeeper and kafka sucessfully started, create the topic for streaming as:
$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
- this will create topic

7. Next thing is to create the sample Spark application as:
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaReceiver {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("Kafka Receiver")

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")

    val topics = List("spark-topic").toSet

    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

    println("printing Streaming data.......")

    lines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

8. Package this with required libraries as in build.sbt below

name := "SparkExample"

version := "1.0"

val sparkVersion = "1.6.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion
)


9. Run the jar after packaging from spark-submit as:
$ spark-submit --master local[*] --class KafkaReceiver --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 /sparkexample_2.10-1.0.jar

once it start running check the console

10. Finally run the kafka producer to produce the data for streaming as:
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
Hello Spark Streaming

In Spark console you will see something like:

17/02/04 21:55:50 INFO executor.Executor: Running task 0.0 in stage 11.0 (TID 11)
17/02/04 21:55:50 INFO kafka.KafkaRDD: Beginning offset 1000011 is the same as ending offset skipping spark-topic 0
17/02/04 21:55:50 INFO executor.Executor: Finished task 0.0 in stage 11.0 (TID 11). 915 bytes result sent to driver
17/02/04 21:55:50 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 11.0 (TID 11) in 13 ms on localhost (1/1)
17/02/04 21:55:50 INFO scheduler.DAGScheduler: ResultStage 11 (print at KafkaReceiver.scala:27) finished in 0.012 s
17/02/04 21:55:50 INFO scheduler.DAGScheduler: Job 11 finished: print at KafkaReceiver.scala:27, took 0.063248 s
-------------------------------------------
Time: 1486263350000 ms
-------------------------------------------
Hello Spark Streaming

17/02/04 21:55:50 INFO scheduler.JobScheduler: Finished job streaming job 1486263350000 ms.0 from job set of time 1486263350000 ms
17/02/04 21:55:50 INFO scheduler.JobScheduler: Total delay: 0.115 s for time 1486263350000 ms (execution: 0.079 s)
17/02/04 21:55:50 INFO rdd.MapPartitionsRDD: Removing RDD 21 from persistence list
17/02/04 21:55:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool
17/02/04 21:55:50 INFO storage.BlockManager: Removing RDD 21
17/02/04 21:55:50 INFO kafka.KafkaRDD: Removing RDD 20 from persistence list
17/02/04 21:55:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
17/02/04 21:55:50 INFO scheduler.InputInfoTracker: remove old batch metadata: 1486263340000 ms
17/02/04 21:55:50 INFO storage.BlockManager: Removing RDD 20