Apache Spark & Kafka, Streaming Partners

Gobalakrishnan Viswanathan
9 min readAug 17, 2020

--

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

what is event streaming?
Capturing data in real-time from multiple sources in the form of streams of events. Storing these streamings can be used for later retrieval, manipulating, and processing. Even It is possible to react to the events in real-time.

Kafka is event streaming, how?
1. Kafka can read and write stream of events
2. Kafka can store data as long as needed.
3. To process streams of events as they occur or retrospectively.

Architecture of Kafka:

Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP protocol.

Kafka Servers:

  • Kafka is run as a cluster of one or more servers that can span multiple data centers or cloud regions. Some of these servers form the storage layer, called the brokers. Other servers run to continuously import or export data as events to connect Kafka with other sources.

Kafka Clients:

  • They allow you to write distributed applications and microservices that read, write, and process streams of events in parallel, at scale, and in a fault-tolerant manner.
  • Kafka Clients are available for Java, Scala, Python, C, and many other languages.

What is “event” in Kafka?

  • When you read or write to Kafka, It is in the form of Events. Conceptually, an event has a key, value, timestamp, and optional metadata headers.

Producers, Consumers:

  • Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. In Kafka, Producers and Consumers are fully decoupled.

Topics:

  • Events are organized and durably stored in topics. Topics can be considered as folders and events are files in the folder. Topics are always multi-producer and multi-consumer one, which means that any number of sources can send data to single topic, and in the same manner, read also can be done from any number of sources. Events can be configured such as how long it should be stored in the Topic.
  • Events stored in the Topic is distributed across different Kafka brokers. When a new event is added to the topic, it will be appended to any one of the Partitions of the Topic. But Kafka guarantees sources can read the events in the same order as they were written.
Partitions in Topic
  • As we see in the image, there are two sources that send data to a Kafka topic. Events with the same key (denoted here by color) will be added to the same partition. Here both producers can send the data to the same partition also.
  • Every topic in the Kafka can be replicated number of times across different geo-regions to make sure backup is available in case something goes wrong. Common replication setting is 3 means there are always three copies of data will be available somewhere in the network cluster.

Kafka APIs:
In addition to Kafka's command-line tool, Kafka has 5 Core APIs.

  1. Admin API: To manage Topics, Brokers, and other Kafka objects.
    Official document link: Admin API
  2. Producer API: To write a stream of events to Kafka Topics.
    Official document link: Producer API
  3. Consumer API: to (read) one or more topics and to process the stream of events produced to them.
    Official document link: Consumer API
  4. Streams API: to implement stream processing applications and microservices.
    Official document link: Streams API
  5. Connect API: to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events.
    Official document link: Connect API

Test Kafka with Producer, Consumer using Command Line:

  • Download Kafka using this link: Download Kafka. Once downloaded, Extract it. Now we need to start Zookeeper and Kafka server .
  • Kafka uses Zookeeper to do all the services for Kafka Brokers. Zookeeper sends changes of the topology to Kafka, so each node in the cluster knows when a new broker joined, a Broker died, a topic was removed or a topic was added, etc Start the zookeeper using below command.
  • For windows, I am using the following commands to start zoo-keeper and Kafka-Server. Run these commands separately in each command window.
bin\windows\zookeeper-server-start.bat config\zookeeper.propertiesbin\windows\kafka-server-start.bat config\server.properties
  • Now we need to create the topic from where we can read/write data. Open the new window and use the command to create a new topic. This command creates the topic name “quickstart-events”.
bin\windows\kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
  • Now we need to Start Producer by which we can send the data to the Kafka topic. Use the below command in a new window. Once you Enter, the Command line will be ready to get data from you. You guessed it right. We created our local source by which we can send data to the topic.
bin\windows\kafka-console-producer.bat --topic quickstart-events --broker-list localhost:9092
  • But wait. As I said, we sent data to the Topic. How to check that? Yes, using consumer. Now we can subscribe to the same topic using consumer tools available in Kafka, We can see the realtime data transfer from producer to consumer. Command to create consumers for the same topic.
bin\windows\kafka-console-consumer.bat --topic quickstart-events --bootstrap-server localhost:9092
Kafka-Producer-Consumer

As we see in the image, When I enter the message in the producer window, the data have been transferred to consumers in real-time which is perfectly done 🙌. This is how I tested the Kafka with some real-time fun. Now let's get serious and head into our actual topic, Apache Spark with Kafka.

Kafka with Apache Spark & Scala:
Until now we have seen how to interact with Kafka using the command line. Now, Let's use Kafka with Spark and Scala to get some real-time implementations. Head into this to get Introduction about Apache if needed.

Spark, Kafka with Scala (Photo credits to intellipaat)

The below code explains how the scala interacts with Spark.

import org.apache.spark.sql.SparkSession

object SimpleApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
val xx = spark.read.textFile("E:\\GV\\SparkKafkaScala\\SparkScala\\sample.txt")
for (element <- xx)
{
println(element)
}
}
}

Using the spark session, we build the session. Using the session variable, reading the textfile from local, and printing line by line using for loop. Yes, it is very basic for loop, But it was taking lots of effects to debugging the IntelliJ editor issues, Scala compatibility issues with Spark, and more. Finally, I am happy that I can contact Spark now and use it.

Creating our first RDD:
Using SparkContext, we can create RDD. The below example shows how to initialize SparkContext and create RDD. This Scala program can be reduced even inside the main function itself, But to get hands-on in scala, I intentionally using the classes and functions.

package SparkHandsOn
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

class CreatingSparkObjects{

def getSparkConf(): SparkConf ={
val sparkConf: SparkConf = new SparkConf()
sparkConf.setAppName("First Spark Application")
sparkConf.setMaster("local")
sparkConf
}
def getSparkContext(sparkConf: SparkConf): SparkContext = {
val sparkContext = new SparkContext(sparkConf)
sparkContext
}
}

object CreatingSparkContextObj{
def main(args: Array[String]): Unit = {
val sparkObjs = new CreatingSparkObjects()
val sparkConf = sparkObjs.getSparkConf()
val sparkContext = sparkObjs.getSparkContext(sparkConf)

val arrayRDD = sparkContext.parallelize(Array(1,2,3,4,5,6,7,8))
println(arrayRDD.getClass)
arrayRDD.foreach(println)
}
}
  • Here, we are initializing the Spark Configuration object with the basic information. Then, We passing this configuration object to SparkContext to initialize it. Then using parallelize function of Spark Context to create RDD using the Array. This is a legacy way of creating SparkContext. There is one more new way to get SparkContext that is using SparkSession Object like given below.
import org.apache.spark.sql.SparkSession


object CreatingSparkContextObj{
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("Spark Session").master("local").getOrCreate()
val sparkContext = sparkSession.sparkContext
val arrayRDD = sparkContext.parallelize(Array(1,2,3,4,5,6,7,8))
println(arrayRDD.getClass, arrayRDD.count())
}
}

RDD with CSV:
Reading CSV file as RDD and write it back as a text file.

package SparkHandsOn
import org.apache.spark.sql.SparkSession

class RDDwithCSV {
def getSparkSession(): SparkSession = {
new SparkSession.Builder().appName("RDD with CSV").master("local").getOrCreate()
}
}

object mainObj {
def main(args: Array[String]): Unit = {
val sparkSession = new RDDwithCSV().getSparkSession()
val csvRDD = sparkSession.sparkContext.textFile("C:\\Users\\support\\Downloads\\spark-3.0.0-bin-hadoop3.2\\python\\test_support\\sql\\ages.csv")
csvRDD.foreach(item => if(item!="") println(item))
}
}

We have lots of options and operations in Spark. Now, we will move to Spark with Kafka on Scala which is our main topic of this post. Though Apache Spark has four different modules, we are heading to The Apache Spark Streaming module to get the taste of live streaming processing.

Apache Streaming:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets and can be processed using functions given by SparkCore. Finally, the processed data can be pushed to filesystems, databases, and live dashboards.

How Spark Streaming works
Spark Stream receives the input data continuously, It splits the data into batches. Then Spark Engine process the data Finally results in the stream of batches.

Spark Streaming Flow

Spark Streaming provides an abstraction on the name of DStream which is a continuous stream of data. DStreams can be created using input sources or applying functions on existing DStreasms. Internally, DStreams are represented as a sequence of RDDs. We can write the Spark Streaming Program using Scala, Java, Python. We are continuing with Scala here.

Spark Streaming Hands-on:
Spark Streaming Context entry point for Spark Streaming processing. The below code explains, how to create a Spark Streaming context.

import org.apache.spark._
import org.apache.spark.streaming._

class SparkStreamingExample() {

def GetSparkConf(): SparkConf ={
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("NetworkWordCount")
conf
}

def GetStreamingContext(): StreamingContext ={
new StreamingContext(GetSparkConf(), Seconds(1))
}
}

object SparkStreamingExample {
def main(args: Array[String]): Unit = {
val ssc = new SparkStreamingExample().GetStreamingContext()
print(ssc)
}
}

Let's discuss some sample use cases using Spark Streaming.

  1. Monitor new files in the Directory
    We have a directory that should be monitored 24/7. If any new files created in the directory, The file content should be processed in realtime and send it to some other resources. This is a very simple use case yet knowledgable one to understand Spark Streamings basic. Let's dive into the implementation.

import org.apache.spark._
import org.apache.spark.streaming._

class SparkStreamingExample() {
def GetSparkConf(): SparkConf ={
val conf = new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("NetworkWordCount")
conf
}
def GetStreamingContext(): StreamingContext ={
new StreamingContext(GetSparkConf(), Seconds(10))
}
}

object SparkStreamingExample {
def main(args: Array[String]): Unit = {
val ssc = new SparkStreamingExample().GetStreamingContext()
val lines = ssc.textFileStream(<dir_path>)
val words = lines.flatMap(_.split(" "))
words.print()

ssc.start()
ssc.awaitTermination()
}
}

We already discussed how we can create SparkStreamingContext. one I got the context, I used textFileStream function to monitor the directory. It means that, if a new file created in the directory, ssc will read the file. I converted the lines to flatted array consists of all the words in the lines and printed it. The last two lines are very important, The first line ssc.start() starts the Streaming process, ssc.awaitTermination() says that doesn't stop the monitoring until we stopping it manually. There are many streaming related functions out there in the Spark library for different kinds of requirements. Now let’s see how to connect Kafka with Spark Streaming.

Since I am using scala with sbt for my development environment. I had some issues to make dependencies work for Kafka. I had to downgrade my scala version. So the below build.sbt file will be very useful for the development.

name := "SparkKafkaHandsOn"

version := "0.1"

scalaVersion := "2.11.11"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"

Spark Kafka Integration was not much difficult as I was expecting. The below code pulls all the data coming to the Kafka topic “test”. To make this test, I opened the Kafka Producer to send the data to Kafka Topic which can be read by the Spark Streaming Real-time. No need for any explanation to the below code snippet since it is self-explanatory.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka


class SparkObjects() {

def GetSparkConf(): SparkConf ={
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("NetworkWordCount")
conf
}

def GetStreamingContext(): StreamingContext ={
new StreamingContext(GetSparkConf(), Seconds(10))
}
}


object SparkKafkaConnection {
def main(args: Array[String]): Unit = {
val ssc = new SparkObjects().GetStreamingContext()
val kafkaStream = kafka.KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("test" -> 5) )
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
}

Yes, This is a very simple example for Spark Streaming — Kafka integration. Still, It serves good knowledge to go further deep in the learning. From this, I am signing off this post. Encourage me by giving comments and claps. Thanks.

Hope we meet again in another post soon. ta ta !!!

--

--

Gobalakrishnan Viswanathan
Gobalakrishnan Viswanathan

No responses yet