Kafka is Apache’s open-source platform for distributed stream processing in high throughput environments. In the following tutorial we will try to configure Apache Kafka and Zookeeper as well as demonstrating a basic and a fault tolerant messaging setup.
This post tries to cover the following areas on a Linux machine.
- Kafka Introduction
- Setting up Kafka
- Basic Messaging - Running a Zookeeper Server Instance - Running a Kafka Server Instance - Creating and Checking Topics - Creating Producer and Consumer
- Scaled Messaging - Creating Zookeeper Ensemble - Setting up a Kafka Cluster - Creating Replicated Topics - Testing Resiliency
Kafka Introduction
Kafka is a distributed publish/subscribe fault tolerant messaging system with partitioning and replication abilities that is based on distributed commit logs. It can handle high volumes of message streams (more than 1.5 million messages per second). Kafka heavily depends on the Apache Zookeeper maintain its internal states.
In a nutshell, Zookeeper is a distributed key-value store used for maintaining configuration information, distributed synchronization, group services and etc. for various distributed applications.
However, before diving into the installation and configuration, let’s go through the basic terminology and key facts about Apache Kafka.
- Kafka Message: is an entity with a arrival timestamp, a unique id and the binary data payload.
- Topic: is a logical concept; a named feed of messages to be exchanged.
- Broker: is a software process or node by which Kafka manages topics.
- They can be scaled out to achieve a higher load distribution, throughput and fault tolerance.
- Cluster: is a group of brokers (a leader and followers) on one or more hosts
- Partition: is an immutable sequence of topic messages in a physical log file.
- A broker could have one or more partitions.
- One partition cannot be split across machines.
- Message Offset: is a non-global zero-based immutable sequence-number of a message within a partition that is maintained by consumer.
- Retention Policy: is Kafka’s configurable period during which the messages are retained by Kafka cluster for a topic.
- Replication Factor: is the data redundancy factor by which Kafka replicates messages among brokers.
- In-Sync Replicas (ISR): is the number of in-sync brokers in a topic’s replica-set or quorum.
- Producer & consumer: entities interested in sending and receiving messages to and from topics.
- Kafka won’t allow more than 2 consumers to read from the same partition simultaneously.
- Controller: is an elected broker within a cluster for carrying out administrative tasks (e.g. managing the states of partitions and replicas).
- Partition Leader: is a broker responsible for recruiting workers for replicating and propagating messages.
- Follower/Worker: is a broker that only replicates the leader.
- A broker can simultaneously be the leader for one topic a worker for another topic.
Setting up Kafka
Installing Kafka is as easy as downloading and unzipping the archive into a folder.
|
|
The above script switches to home directory, downloads and unzips kafka and finally removes version from folder name to simplify invocations.
Inside kafka/ directory, the bin/ is where all the Kafka and Zookeeper executables exist. Also, the config/ folder as the name suggests, contains the configuration files for both Zookeeper and Kafka.
Basic Messaging
The following section demonstrates how to setup a producer/consumer messaging using a single Zookeeper node and a single Kafka broker.
Running a Zookeeper Server Instance
In order to start a Kafka server instance, an instance of the Zookeeper server needs to be running. This is done using the following executable and the instance’s configuration.
|
|
The instance can be shutdown using ctrl+c or the running the binary below:
|
|
Check the Zookeeper server using:
|
|
followed by a Zookeeper command like ‘stat‘ to produce a similar output as below:
|
|
Running a Kafka Server Instance
Now that a Zookeeper server is up and running, a Kafka server can startup in the same fashion.
|
|
Again, a ctrl+c or the following binary stops the Kafka server.
|
|
Creating and Checking Topics
Now let’s create a topic (named ‘TestTopic’) by passing the address to our running Zookeeper instance. Let’s also leave the description to ‘–partitions’ and ‘–replication-factor’ switches for later.
|
|
Use the following commands in order to get the list and description of the created topics.
|
|
Creating Producer and Consumer
The only missing parts of the basic messaging is a producer and a consumer that can publish/receive messages on the topic that was created earlier. Let’s create a producer first.
|
|
The above command creates a producer pointing it to where it can find one or more brokers to determine the leader for the given topic. After running the producer console program, it is possible to queue up messages (type and ‘Enter’) to be later fetched by consumer.
Now let’s create a consumer for the ‘TestTopic’ topic that receives all the messages published on the topic previously (–from-beginning).
|
|
This shall show previously sent messages as well as messages that are sent now by the producer.
Scaled Messaging
In basic producer-consumer communication, in case the Zookeeper node or Kafka broker is crashed, the whole messaging fails. However it’s possible to make it fault tolerant by introducing a Zookeeper ensemble (a cluster of Zookeeper nodes) together with a Kafka cluster with multiple brokers (3 Zookeeper nodes and 3 Kafka brokers).
Creating Zookeeper Ensemble
If the Zookeeper and Kafka servers are still running from the previous section, stop them and head into ‘config/‘ folder in kafka installation directory. Create 3 files called ‘zoo1.properties‘, ‘zoo2.properties‘ and ‘zoo3.properties‘. Use the following content as a template and copy it to zoo2 and zoo3 and increment the corresponding values of ‘dataDir‘ and ‘clientPort‘.
|
|
Essential Properties
- tickTime: The number of milliseconds to be considered as one tick.
- dataDir: The server’s data directory.
- Notice that the /tmp/zookeeper directory might be removed after each restart.
- clientPort: The port on which this server listens to client connections.
- maxClientCnxns: Maximum amount of client connections allowed for this server.
- initLimit: Maximum number of ticks for followers to connect and sync up with leader.
- syncLimit: Maximum number of ticks for followers to sync up with Zookeeper.
- server.x: The list of servers that create the Zookeeper ensemble.
Also, for each of our servers that will run, we need to create a file called ‘myid’ in their data directory. This file basically consists of a single line and a single number of that machine’s id.
|
|
Now let’s run the first Zookeeper server in our ensemble.
|
|
After running our server a couple of ‘java.net.ConnectException: Connection refused‘ will be thrown which imply that our server instance cannot connect to the other two (since they are not running yet). So ignore the warning and run the rest of them.
|
|
As of this point, our Zookeeper ensemble with 3 server nodes shall be up and running.
Setting up a Kafka Cluster
Let’s now create a Kafka cluster with 3 broker inside it which is as easy as running 3 server instances with different configurations. So let’s create 3 properties files kafka-0, kafka-1 and kafka-2 that are only different in values of ‘broker.id‘, ‘port‘ and ‘log.dirs‘. As a result, increment those values for kafka-1.properties and kafka-2.properties.
|
|
Essential Properties
- broker.id: The unique broker id for this server instance.
- port: A deprecated property denoting the port to which this server listens and accepts connections.
- log.dirs: The list of directories to maintain data logs.
- zookeeper.connect: The list of nodes within the Zookeeper ensemble.
After creating the properties files with proper content, it’s time to run the Kafka server instances.
|
|
Creating Replicated Topics
After having Zookeeper and Kafka brokers up and running, it’s now time to create a couple of topics that
|
|
|
|
|
|
The output is the description of topics and their relation with existing brokers. The first two lines of output state that the topic ‘chat’ has one partition that is replicated among brokers 1,2 and 3 while all three of them are in sync at the moment (Isr: 1,2,0). The rest of the output demonstrate similar information for the ‘event’ topic. In order to get a better understanding, compare it with the diagram shown at the beginning of the post.
Testing Resiliency
It’s now time to create one consumer and one producer for our ‘chat’ topic (the same will stand for the ‘event’ topic as well) and test how resilient our setup is.
|
|
|
|
Try to send a few messages to ensure the setup works in the first place. According to the ‘chat’ topic description, broker1 is the topic leader. However, if that broker is killed, Kafka will elect another broker as the topic’s leader. Test this by killing broker 1 and issuing the topic description command which shall produce a similar output as below:
|
|
As can be seen, the producer and consumer are not affected and messages can still be passed; however, the topics are. The ‘chat’ topic has now the broker2 as its new leader and has only 2 ISRs while topic ‘event’ now owns only 1 ISR.
To further test the setup resiliency, try to kill a Zookeeper node and see that messaging will not fail until all nodes are down.