Getting Started with Kafka


Kafka is Apache’s open-source platform for distributed stream processing in high throughput environments. In the following tutorial we will try to configure Apache Kafka and Zookeeper as well as demonstrating a basic and a fault tolerant messaging setup.

This post tries to cover the following areas on a Linux machine.

Kafka Introduction

Kafka is a distributed publish/subscribe fault tolerant messaging system with partitioning and replication abilities that is based on distributed commit logs. It can handle high volumes of message streams (more than 1.5 million messages per second). Kafka heavily depends on the Apache Zookeeper maintain its internal states.

In a nutshell, Zookeeper is a distributed key-value store used for maintaining configuration information, distributed synchronization, group services and etc. for various distributed applications.

A general view of Zookeeper and Kafka internals as well as their relationship

However, before diving into the installation and configuration, let’s go through the basic terminology and key facts about Apache Kafka.

  • Kafka Message: is an entity with a arrival timestamp, a unique id and the binary data payload.
  • Topic: is a logical concept; a named feed of messages to be exchanged.
  • Broker: is a software process or node by which Kafka manages topics.
    • They can be scaled out to achieve a higher load distribution, throughput and fault tolerance.
  • Cluster: is a group of brokers (a leader and followers) on one or more hosts
  • Partition: is an immutable sequence of topic messages in a physical log file.
    • A broker could have one or more partitions.
    • One partition cannot be split across machines.
  • Message Offset: is a non-global zero-based immutable sequence-number of a message within a partition that is maintained by consumer.
  • Retention Policy: is Kafka’s configurable period during which the messages are retained by Kafka cluster for a topic.
  • Replication Factor: is the data redundancy factor by which Kafka replicates messages among brokers.
  • In-Sync Replicas (ISR): is the number of in-sync brokers in a topic’s replica-set or quorum.
  • Producer & consumer: entities interested in sending and receiving messages to and from topics.
    • Kafka won’t allow more than 2 consumers to read from the same partition simultaneously.
  • Controller: is an elected broker within a cluster for carrying out administrative tasks (e.g. managing the states of partitions and replicas).
  • Partition Leader: is a broker responsible for recruiting workers for replicating and propagating messages.
  • Follower/Worker: is a broker that only replicates the leader.
    • A broker can simultaneously be the leader for one topic a worker for another topic.

Setting up Kafka

Installing Kafka is as easy as downloading and unzipping the archive into a folder.

1
2
3
4
$ cd
$ wget http://www-eu.apache.org/dist/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
$ tar -xzvf kafka_2.12-0.10.2.0.tgz
$ mv kafka_2.12-0.10.2.0 kafka

The above script switches to home directory, downloads and unzips kafka and finally removes version from folder name to simplify invocations.

Inside kafka/ directory, the bin/ is where all the Kafka and Zookeeper executables exist. Also, the config/ folder as the name suggests, contains the configuration files for both Zookeeper and Kafka.

Basic Messaging

The following section demonstrates how to setup a producer/consumer messaging using a single Zookeeper node and a single Kafka broker.

Running a Zookeeper Server Instance

In order to start a Kafka server instance, an instance of the Zookeeper server needs to be running. This is done using the following executable and the instance’s configuration.

1
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties

The instance can be shutdown using ctrl+c or the running the binary below:

1
$ sh kafka/bin/zookeeper-server-stop.sh

Check the Zookeeper server using:

1
$ telnet localhost 2181

followed by a Zookeeper command like ‘stat‘ to produce a similar output as below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
stat
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
/127.0.0.1:48134[1](queued=0,recved=1107,sent=1109)
/127.0.0.1:41180[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/115
Received: 1259
Sent: 1264
Connections: 2
Outstanding: 0
Zxid: 0x22
Mode: standalone
Node count: 21
Connection closed by foreign host.

Running a Kafka Server Instance

Now that a Zookeeper server is up and running, a Kafka server can startup in the same fashion.

1
$ sh kafka/bin/kafka-server-start.sh kafka/config/server.properties

Again, a ctrl+c or the following binary stops the Kafka server.

1
$ sh kafka/bin/kafka-server-stop.sh

Creating and Checking Topics

Now let’s create a topic (named ‘TestTopic’) by passing the address to our running Zookeeper instance. Let’s also leave the description to ‘–partitions’ and ‘–replication-factor’ switches for later.

1
2
3
4
5
6
$ sh kafka/bin/kafka-topics.sh \
> --create \
> --topic TestTopic \
> --zookeeper localhost:2181 \
> --partitions 1 \
> --replication-factor 1

Use the following commands in order to get the list and description of the created topics.

1
2
$ sh kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
$ sh kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181

Creating Producer and Consumer

The only missing parts of the basic messaging is a producer and a consumer that can publish/receive messages on the topic that was created earlier. Let’s create a producer first.

1
2
3
$ sh kafka/bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic TestTopic

The above command creates a producer pointing it to where it can find one or more brokers to determine the leader for the given topic. After running the producer console program, it is possible to queue up messages (type and ‘Enter’) to be later fetched by consumer.

Now let’s create a consumer for the ‘TestTopic’ topic that receives all the messages published on the topic previously (–from-beginning).

1
2
3
4
$ sh kafka/bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 \
> --from-beginning \
> --topic TestTopic

This shall show previously sent messages as well as messages that are sent now by the producer.

Scaled Messaging

In basic producer-consumer communication, in case the Zookeeper node or Kafka broker is crashed, the whole messaging fails. However it’s possible to make it fault tolerant by introducing a Zookeeper ensemble (a cluster of Zookeeper nodes) together with a Kafka cluster with multiple brokers (3 Zookeeper nodes and 3 Kafka brokers).

Creating Zookeeper Ensemble

If the Zookeeper and Kafka servers are still running from the previous section, stop them and head into ‘config/‘ folder in kafka installation directory. Create 3 files called ‘zoo1.properties‘, ‘zoo2.properties‘ and ‘zoo3.properties‘. Use the following content as a template and copy it to zoo2 and zoo3 and increment the corresponding values of ‘dataDir‘ and ‘clientPort‘.

1
2
3
4
5
6
7
8
9
10
11
# zoo1.properties
tickTime=2000
dataDir=/tmp/zookeeper/1
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
Essential Properties
  • tickTime: The number of milliseconds to be considered as one tick.
  • dataDir: The server’s data directory.
    • Notice that the /tmp/zookeeper directory might be removed after each restart.
  • clientPort: The port on which this server listens to client connections.
  • maxClientCnxns: Maximum amount of client connections allowed for this server.
  • initLimit: Maximum number of ticks for followers to connect and sync up with leader.
  • syncLimit: Maximum number of ticks for followers to sync up with Zookeeper.
  • server.x: The list of servers that create the Zookeeper ensemble.

Also, for each of our servers that will run, we need to create a file called ‘myid’ in their data directory. This file basically consists of a single line and a single number of that machine’s id.

1
2
3
4
5
6
7
8
9
10
11
$ mkdir -p /tmp/zookeeper/1
$ touch /tmp/zookeeper/1/myid
$ echo '1' >> /tmp/zookeeper/1/myid
$ mkdir -p /tmp/zookeeper/2
$ touch /tmp/zookeeper/2/myid
$ echo '2' >> /tmp/zookeeper/2/myid
$ mkdir -p /tmp/zookeeper/3
$ touch /tmp/zookeeper/3/myid
$ echo '3' >> /tmp/zookeeper/3/myid

Now let’s run the first Zookeeper server in our ensemble.

1
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zoo1.properties

After running our server a couple of ‘java.net.ConnectException: Connection refused‘ will be thrown which imply that our server instance cannot connect to the other two (since they are not running yet). So ignore the warning and run the rest of them.

1
2
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zoo2.properties
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zoo3.properties

As of this point, our Zookeeper ensemble with 3 server nodes shall be up and running.

Setting up a Kafka Cluster

Let’s now create a Kafka cluster with 3 broker inside it which is as easy as running 3 server instances with different configurations. So let’s create 3 properties files kafka-0, kafka-1 and kafka-2 that are only different in values of ‘broker.id‘, ‘port‘ and ‘log.dirs‘. As a result, increment those values for kafka-1.properties and kafka-2.properties.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# kafka-0.properties
broker.id=0
port=9090
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs/0
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
Essential Properties
  • broker.id: The unique broker id for this server instance.
  • port: A deprecated property denoting the port to which this server listens and accepts connections.
  • log.dirs: The list of directories to maintain data logs.
  • zookeeper.connect: The list of nodes within the Zookeeper ensemble.

After creating the properties files with proper content, it’s time to run the Kafka server instances.

1
2
3
$ sh kafka/bin/kafka-server-start.sh kafka/config/kafka-0.properties
$ sh kafka/bin/kafka-server-start.sh kafka/config/kafka-1.properties
$ sh kafka/bin/kafka-server-start.sh kafka/config/kafka-2.properties

Creating Replicated Topics

After having Zookeeper and Kafka brokers up and running, it’s now time to create a couple of topics that

1
2
3
4
5
6
7
8
9
10
11
12
13
$ sh kafka/bin/kafka-topics.sh \
> --create \
> --topic chat \
> --zookeeper localhost:2181,localhost:2182,localhost:2183 \
> --replication-factor 3 \
> --partitions 3
$ sh kafka/bin/kafka-topics.sh
> --create \
> --topic event \
> --zookeeper localhost:2181,localhost:2182,localhost:2183 \
> --replication-factor 2 \
> --partitions 2
1
$ sh kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181
1
2
3
4
Topic:chat PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chat Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic:event PartitionCount:1 ReplicationFactor:2 Configs:
Topic: event Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

The output is the description of topics and their relation with existing brokers. The first two lines of output state that the topic ‘chat’ has one partition that is replicated among brokers 1,2 and 3 while all three of them are in sync at the moment (Isr: 1,2,0). The rest of the output demonstrate similar information for the ‘event’ topic. In order to get a better understanding, compare it with the diagram shown at the beginning of the post.

Testing Resiliency

It’s now time to create one consumer and one producer for our ‘chat’ topic (the same will stand for the ‘event’ topic as well) and test how resilient our setup is.

1
2
3
$ sh kafka/bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic chat
1
2
3
$ sh kafka/bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 \
> --from-beginning --topic chat

Try to send a few messages to ensure the setup works in the first place. According to the ‘chat’ topic description, broker1 is the topic leader. However, if that broker is killed, Kafka will elect another broker as the topic’s leader. Test this by killing broker 1 and issuing the topic description command which shall produce a similar output as below:

1
2
3
4
Topic:chat PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chat Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
Topic:event PartitionCount:1 ReplicationFactor:2 Configs:
Topic: event Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0

As can be seen, the producer and consumer are not affected and messages can still be passed; however, the topics are. The ‘chat’ topic has now the broker2 as its new leader and has only 2 ISRs while topic ‘event’ now owns only 1 ISR.

To further test the setup resiliency, try to kill a Zookeeper node and see that messaging will not fail until all nodes are down.

Share Comments