Messaging with Kafka


In the previous post ‘Kafka: Absolute Basics‘ a few essential concepts in Kafka were discussed. In this post we will go through setting up and configuring Apache Kafka together with Zookeeper on a Linux machine in order to form a plain producer/consumer setup in the first step and a simple fault tolerant messaging system in the next one.

This post tries to cover the following areas on a Linux machine.

Setting up Kafka

A general view of Zookeeper and Kafka internals as well as their relationship

Installing Kafka is as easy as downloading and unzipping the archive into a folder.

1
2
3
4
$ cd
$ wget http://www-eu.apache.org/dist/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
$ tar -xzvf kafka_2.12-0.10.2.0.tgz
$ mv kafka_2.12-0.10.2.0 kafka

The above script switches to home directory, downloads and unzips kafka and finally removes version from folder name to simplify invocations.

Inside kafka/ directory, the bin/ is where all the Kafka and Zookeeper executables exist. Also, the config/ folder as the name suggests, contains the configuration files for both Zookeeper and Kafka.

Basic Messaging

The following section demonstrates how to setup a producer/consumer messaging using a single Zookeeper node and a single Kafka broker.

Running a Zookeeper Server Instance

In order to start a Kafka server instance, an instance of the Zookeeper server needs to be running. This is done using the following executable and the instance’s configuration.

1
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties

The instance can be shutdown using ctrl+c or the running the binary below:

1
$ sh kafka/bin/zookeeper-server-stop.sh

Check the Zookeeper server using:

1
$ telnet localhost 2181

followed by a Zookeeper command like stat to produce a similar output as below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Trying 127.0.0.1...
Connected to localhost.
Escape character is ^].
stat
Zookeeper version: 3.4.9-1757313, built on 08/23/2016 06:50 GMT
Clients:
/127.0.0.1:48134[1](queued=0,recved=1107,sent=1109)
/127.0.0.1:41180[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/115
Received: 1259
Sent: 1264
Connections: 2
Outstanding: 0
Zxid: 0x22
Mode: standalone
Node count: 21
Connection closed by foreign host.

Running a Kafka Server Instance

Now that a Zookeeper server is up and running, a Kafka server can startup in the same fashion.

1
$ sh kafka/bin/kafka-server-start.sh kafka/config/server.properties

Again, a ctrl+c or the following binary stops the Kafka server.

1
$ sh kafka/bin/kafka-server-stop.sh

Creating and Checking Topics

Now let’s create a topic (named ‘TestTopic’) by passing the address to our running Zookeeper instance. Let’s also leave the description to --partitions and --replication-factor switches for later.

1
2
3
4
5
6
$ sh kafka/bin/kafka-topics.sh \
> --create \
> --topic TestTopic \
> --zookeeper localhost:2181 \
> --partitions 1 \
> --replication-factor 1

Use the following commands in order to get the list and description of the created topics.

1
2
$ sh kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
$ sh kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181

Creating Producer and Consumer

The only missing parts of the basic messaging is a producer and a consumer that can publish/receive messages on the topic that was created earlier. Let’s create a producer first.

1
2
3
$ sh kafka/bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic TestTopic

The above command creates a producer pointing it to where it can find one or more brokers to determine the leader for the given topic. After running the producer console program, it is possible to queue up messages (type and ‘Enter’) to be later fetched by consumer.

Now let’s create a consumer for the ‘TestTopic’ topic that receives all the messages published on the topic previously (--from-beginning).

1
2
3
4
$ sh kafka/bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 \
> --from-beginning \
> --topic TestTopic

This shall show previously sent messages as well as messages that are sent now by the producer.

Scaled Messaging

In basic producer-consumer communication, in case the Zookeeper node or Kafka broker is crashed, the whole messaging fails. However it’s possible to make it fault tolerant by introducing a Zookeeper ensemble (a cluster of Zookeeper nodes) together with a Kafka cluster with multiple brokers (3 Zookeeper nodes and 3 Kafka brokers).

Creating Zookeeper Ensemble

If the Zookeeper and Kafka servers are still running from the previous section, stop them and head into ‘config/‘ folder in kafka installation directory. Create 3 files called zoo1.properties, zoo2.properties and zoo3.properties. Use the following content as a template and copy it to zoo2 and zoo3 and increment the corresponding values of dataDir and clientPort.

1
2
3
4
5
6
7
8
9
10
11
# zoo1.properties
tickTime=2000
dataDir=/tmp/zookeeper/1
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890

tickTime: The number of milliseconds to be considered as one tick.

dataDir: The server’s data directory.
Notice that the /tmp/zookeeper directory might be removed after each restart.

clientPort: The port on which this server listens to client connections.

maxClientCnxns: Maximum amount of client connections allowed for this server.

initLimit: Maximum number of ticks for followers to connect and sync up with leader.

syncLimit: Maximum number of ticks for followers to sync up with Zookeeper.

server.x: The list of servers that create the Zookeeper ensemble.

Also, for each of our servers that will run, we need to create a file called ‘myid‘ in their data directory. This file basically consists of a single line and a single number of that machine’s id.

1
2
3
4
5
6
7
8
9
10
11
$ mkdir -p /tmp/zookeeper/1
$ touch /tmp/zookeeper/1/myid
$ echo '1' >> /tmp/zookeeper/1/myid
$ mkdir -p /tmp/zookeeper/2
$ touch /tmp/zookeeper/2/myid
$ echo '2' >> /tmp/zookeeper/2/myid
$ mkdir -p /tmp/zookeeper/3
$ touch /tmp/zookeeper/3/myid
$ echo '3' >> /tmp/zookeeper/3/myid

Now let’s run the first Zookeeper server in our ensemble.

1
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zoo1.properties

After running our server a couple of ‘java.net.ConnectException: Connection refused‘ will be thrown which imply that our server instance cannot connect to the other two (since they are not running yet). So ignore the warning and run the rest of them.

1
2
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zoo2.properties
$ sh kafka/bin/zookeeper-server-start.sh kafka/config/zoo3.properties

As of this point, our Zookeeper ensemble with 3 server nodes shall be up and running.

Setting up a Kafka Cluster

Let’s now create a Kafka cluster with 3 broker inside it which is as easy as running 3 server instances with different configurations. So let’s create 3 properties files kafka-0, kafka-1 and kafka-2 that are only different in values of broker.id, port and log.dirs. As a result, increment those values for kafka-1.properties and kafka-2.properties.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# kafka-0.properties
broker.id=0
port=9090
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs/0
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000

broker.id: The unique broker id for this server instance.

port: A deprecated property denoting the port to which this server listens and accepts connections.

log.dirs: The list of directories to maintain data logs.

zookeeper.connect: The list of nodes within the Zookeeper ensemble.

After creating the properties files with proper content, it’s time to run the Kafka server instances.

1
2
3
$ sh kafka/bin/kafka-server-start.sh kafka/config/kafka-0.properties
$ sh kafka/bin/kafka-server-start.sh kafka/config/kafka-1.properties
$ sh kafka/bin/kafka-server-start.sh kafka/config/kafka-2.properties

Creating Replicated Topics

After having Zookeeper and Kafka brokers up and running, it’s now time to create a couple of topics that

1
2
3
4
5
6
7
8
9
10
11
12
13
$ sh kafka/bin/kafka-topics.sh \
> --create \
> --topic chat \
> --zookeeper localhost:2181,localhost:2182,localhost:2183 \
> --replication-factor 3 \
> --partitions 3
$ sh kafka/bin/kafka-topics.sh
> --create \
> --topic event \
> --zookeeper localhost:2181,localhost:2182,localhost:2183 \
> --replication-factor 2 \
> --partitions 2
1
$ sh kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181
1
2
3
4
Topic:chat PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chat Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic:event PartitionCount:1 ReplicationFactor:2 Configs:
Topic: event Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1

The output is the description of topics and their relation with existing brokers. The first two lines of output state that the topic ‘chat’ has one partition that is replicated among brokers 1,2 and 3 while all three of them are in sync at the moment (Isr: 1,2,0). The rest of the output demonstrate similar information for the ‘event’ topic. In order to get a better understanding, compare it with the diagram shown at the beginning of the post.

Testing Resiliency

It’s now time to create one consumer and one producer for our ‘chat’ topic (the same will stand for the ‘event’ topic as well) and test how resilient our setup is.

1
2
3
$ sh kafka/bin/kafka-console-producer.sh \
> --broker-list localhost:9092 \
> --topic chat
1
2
3
$ sh kafka/bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 \
> --from-beginning --topic chat

Try to send a few messages to ensure the setup works in the first place. According to the ‘chat’ topic description, broker1 is the topic leader. However, if that broker is killed, Kafka will elect another broker as the topic’s leader. Test this by killing broker 1 and issuing the topic description command which shall produce a similar output as below:

1
2
3
4
Topic:chat PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chat Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
Topic:event PartitionCount:1 ReplicationFactor:2 Configs:
Topic: event Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0

As can be seen, the producer and consumer are not affected and messages can still be passed; however, the topics are. The ‘chat’ topic has now the broker2 as its new leader and has only 2 ISRs while topic ‘event’ now owns only 1 ISR.

To further test the setup resiliency, try to kill a Zookeeper node and see that messaging will not fail until all nodes are down.

Share Comments