Apache Kafka-A Real Time Streamer .

Gobalakrishnan Viswanathan
10 min readFeb 27, 2021

From It’s official website,

Apache Kafka is an open-source distributed event streaming platform.

Kafka is a distributed platform consists clients and servers. It runs as a cluster with one or more servers which can span multiple data centers/cloud regions.

Servers acts as storage layers(Brokers) and also they run as Kafka Connect tool to export and import data continuously to integrate with out sources.

Clients allows to write distributed applications and microservices that read, write, and process streams of events in parallel, at scale, and in a fault-tolerant manner.

Kafka is highly scalable and Fault-tolerant means that, if any of its Servers/Clients fails, the other servers will take over the work to make sure no data loss happens. Some more basic concepts can be read from official doc here.

Before getting into Command line, lets learn some basic Kafka Components which is equally important.

Kafka Topic: Topic is the particular stream of data. Similar to a table (without any constraints)in a database where same type of data getting stored. Topic is identified by its name & we can create as many as we want. Kafka topics will not store the data for very long time. Default retention time for the messages in Kafka Topics is 7 days.

Partitions: Topics are split into partitions. All the messages we sent to Topic will be stored in the partition. Every message in the partitions will get unique incremental ID’s called offset. Offset numbers are specific to the partition means that specific message in the partition can be defined as topic_name.partition_number.offset.
we can specify how many partitions we needed when creating the topic. Once the data is written to the partition, It cannot be updated (Immutable)

Brokers: Kafka Cluster is consists of multiple Brokers(Servers). The data is shared across the cluster. If you connect to one broker, You are connected to the entire cluster. If we create the topic with three partitions, these partitions can be in any brokers of the cluster. When we read data from the topic, Kafka will read the messages of the topic across the brokers, returns the data to us.

Replication: Replication factor is copying the data as many times we need to avoid data loss due to broker crashes. The default replication factor is 1 for the topic, means the data copy will not be taken. If replication is 2, all the partitions of the topic will be available in two brokers. When one of the broker fails, topic’s partition data can be retrieved from another broker. So replication factor allows us, data wont be loss.

Leader of the partition: Here, the leader is specific to the replicas/partition. When there are two or more replicas, one replica will act as a leader and others are just a copy (ISR — In-Sync replica) of the leader. All read/write operations will be taken care by leader. When leader goes down, one of the copies of partition will act as a leader.

Producer: Producer write data to the topics. We never needed to specify to which broker/partition to write the data, Kafka will automatically do the load balancing magic for us.

Producer Keys: A Producer can send a key along with the message. key can be anything like string, number. When there is a key sent with the messages, all the messages with the same key will be sent to the same partition. For example, we are sending student data to Kafka. For student1, we are sending message and key as student_1 , for student2, the the key is student_2. Here, we are not sure to which partition this message is going to be. But Kafka assures that student_1 messages always goes to the same partition & same for student_2.
Keys are given when there is a need of order to the written messages. Kafka ensures the order of the messages within a partition, but not across partitions in a topic. When no key given, the messages of the topic will be written across the partitions/brokers (Round Robin) where there is no possibility for the order of the messages.

Consumers: Reads the data from topic. If message key given when write the data to the Topic, Consumer will read the data in order because message will be written to the same partition. If no key given, no guarantee in order of the messages because the messages will be stored across the partitions.

Consumer Groups:
A consumer group is a group of multiple consumers application. Each consumer present in a group reads data directly from the exclusive partitions. In case, the number of consumers are more than the number of partitions, some of the consumers will be in an inactive state. Somehow, if we lose any active consumer within the group then the inactive one can takeover and will come in an active state to read the data

Consumer Offsets: Once consumer from consumer group read the data of the topic, the consumer will commit the offset of the message to the topic named __consumer_offsets. It means next time, the consumer will read data not from the beginning but from the committed point. Even consumer dies, It can read the data from the committed point the next time. Committing an offset is like a bookmark which a reader uses while reading a book or a novel. Committing the offsets can be done by three ways.

1. At most Once:
In this semantic, Producer will send the data only once and forget. It is acceptable to loss the message than retrying a message twice. In this method we can get higher throughput and low latency.
2. At least Once:
In this semantic, message should not be lost. Producer will retry sending the message until it got confirmation from Kafka. This is mostly preferred semantics out of all. Applications adopting at least once semantics may have moderate throughput and moderate latency.
3. Exactly Once:
This is the semantic where message should not be lost and it should be sent only once. This is the most complicated semantic that it gives lower throughput and higher latency.

Broker Discovery: When we send/read message to Kafka, Producer or Consumer will automatically understand which broker to connect. We no need to specify this explicitly. When we connect to one Broker, It means we connected to all the brokers in the Cluster. Every broker knows each and every brokers, their topics and partitions that is metadata. So now, We can connect to any bootstrap server in the Kafka-Cluster, read/write can be done to any of the bootstrap_servers.
For example, lets take we are connected to one of the bootstrap in Kafka Cluster. And requesting kafka to list all the available topics in the cluster. Since, all bootstraps known to each other, we can get all the available topics in all the brokers.

Zoo Keeper: Zookeeper manages the bootstrap servers. It keeps the status, list of all the servers, topics and partitions in the Cluster. It helps to perform leader election for partitions when broker goes down. It sends the status to Kafka when changes in Topics and broker status. Kafka cant work without zookeeper. Zookeeper also runs as a cluster where we can have as many Zookeeper servers we need.

Kafka Installation:

Lets see how to install the Kafka for Ubuntu here.

  1. First thing we need is Java. We can check the availability of java by running java -version . If it is saying java not available, install the java with below command.
    sudo apt install openjdk-8
    After the installation, the version command should give you the output.
  2. Now, download our actual interest.
Download the kafka binaries from here and extract it.tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
add the kafka path to the bashrc so that we can run the kafka commands from anywhere in the terminal.copy the kafka directory name kafka_2.13-2.7.0 . Open ~/.bashrc file using any of the editor. Go to the very bottom of the file, add the below line as the last line of the file.export PATH=/<path_to_the_directory>/kafka_2.13-2.7.0/bin:$PATHclose the file, run source ~/.bashrc to apply the change. Now, we no need to go to kafka folder everytime to run, from anywhere in the command line we can run the commands.

Start the required Services:

Go to the Kafka Directory...Start the Zookeeper:
zookeeper-server-start.sh config/zookeeper.properties
Start the Kafka-Server (new terminal):
kafka-server-start.sh config/server-properties.sh
Now we have enough services to start with Kafka commands.

Kafka Command Line:

kafka-topics:
kafka-topics.sh is the command line tool to Create, Delete, Describe or Change the topic. Type kafka-topics.sh to get the documentation of the tool.

kafka-topics.sh command to get the doc.

Lets start with create the topic. Use the below command to create the topic named “topic1”.

Topic create

In the above command,
** we are using the zookeeper we already started locally with — zookeeper parameter. default port for the zookeeper is 2181.
** — topic defines the name of the topic to be created
** — create says to create the topic
** — partitions means, how many partitions should be available in the topic
** — replication-factor says, how many times the partitions of the topic should be replicated.
One important thing to notice here with replication-factor is, we cannot create topic with replication factor greater than how many brokers we have. For example, in our local set-up we have only one broker. And if we give replication number as 2, Kafka will give error says Replication factor: 2, larger than available brokers: 1.

Now lets check our Kafka topic is created. We can list all the topics available in the cluster & check. below command used to list all the topics available.

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
Kafka — topics listing

If we need more information about the topic, we can use — describe option

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topic1 --describe
Topic description

For our topic, we have 3 partitions, 1 replication factor, configs: means there is no additional configuration given when creating the topic.
Leader 0 means that the broker’s id. replicas & ISR (In Sync replicas)also represents the broker’s id. all are 0 here because we have only one broker here. When we create the topic in a cluster with replication > 1 , we should see completely different output.

To delete the topic, use — delete parameter instead of — create.

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic <topic_name>     --delete

These are some commonly used commands using Kafka-topics.sh tool. Now lets jump to Producer (Sending data to topic)

Kafka-Console-Producer.sh:

Usage of this command line tool is to send the data to the topic. Means we can write the messages to the topic using this tool. To send a message to kafka topic, two parameters are mandatory. Topic name and broker_list to connect.

kafka-console-producer.sh -broker-list localhost:9092 -topic <topic_name>
Producer sending the message to topic

Now we have written the data to the topic. But how to validate it? yes, by reading the messages from the topic. We can use consumer module to do that.

Kafka-Console-Consumer.sh

This is used to read the messages from the topic. use below command to read the messages available in the Topic.

kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic topic1 --from-beginning

Consumer is intented to read the streaming messages. here, — from-beginning is describing to read all the existing messages & start to read the real-time messages. Once you start the consumer, It will print all the available messages, and waits for the new messages.

kafka-consumer

with this both producer & consumer tools, we can test the real time write/read process in Kafka. Open both producer & consumer in new terminals, If I enter the new message in Producer, In real time I can see the same message being read by Consumer. Do like below to understand.

kafka topic — real time read/write using tools.

As we see earlier, we can send the message with producer-key to make sure Kafka storing all the messages of the topic in same partition. if no key given when writing data, the messages will be stored across the partitions in the cluster with Round Robin method with key=null. We can send the Producer key like below.

kafka-console-producer.sh --broker-list localhost:9092 --topic <topic_name> --property parse.key=true --property key.separator=,

All the default values for the topics can be changed in server.properties file in config folder.

From above examples, we have seen how to use basic command line tools of kafka. But in real time cases, It is not in this way. Command lines will not be used to produce/consume messages. Programming languages will be used to send/receive data from Kafka Topics to make life easy. There are lot of languages out there to make this effectively such as Python, Java etc.

For now, that’s it from my side about the Kafka introduction. I will try to come-up with a real-time use case using Python & Kafka. We will definitely meet again. Ta-Ta.

  • Gobalakrishnan Viswanathan
    Yet another Pythonic-Automation guy.

--

--