Kafka, a Dive into Java Client API


In my previous post “The Apache Kafka Basics”, the Kafka’s essential component members (Producer, Broker & Consumer) were introduced and a simple producer/consumer setup was explained using shell programs. In this post we want to get our hands more into coding by creating and configuring a Kafka producer utilizing the Kafka’s Java API.
  • Creating Topic
  • Producer
    • Properties Description
    • Create and Dispatch
  • Consumer
    • Properties Description
    • Create and Receive

Creating Topic

3 partitions
3 brokers
3 replication factor

Producer

Properties Description

a Kafka producer can be instantiated using a set of properties among which a few of them are required. The producer owns an instance of the ProducerConfig class which contains the property keys to configure a producer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092, localhost:9093");
props.put(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "");
props.put("retry.backoff.ms", 5);
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 60000);
props.put("max.in.flight.requests.per.connection", 5);
props.put(
"partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
props.put("client.id", "");
props.put("max.request.size", 1048576);
props.put("request.timeout.ms", 30000);
props.put("timeout.ms", 30000);
props.put("compression.type", "none");

bootstrap.servers:
The list of brokers from which the producer connects to the first available one in order to receive information about partitions, leaders and etc.

key.serializer“: The message-key serializer.

value.serializer“: The message-value serializer.

acks“:
This is the level of acknowledgement demanded by the producer for its sent messages. It could have one of the values mentioned below:

  • 0 Fire and Forget
  • 1 Leader acknowledged
  • 2 Quorum acknowledged

retries“:
how many times a producer shall retry sending a message before aborting

retry.backoff.ms“:
the wait period in milliseconds between retries

batch.size“:
number of bytes that can be buffered per each RecordBatch.
limit on the amount of records to be buffered
if this size is reached the batch is dispatched immediately.

Batching Classes
  • RecordBatch is a batch of ProducerRecords to be sent to the broker that owns the assigned partition
  • RecordAccumulators which are in-memory queue-like structure allows for micro-batching high-volume dispatches. it contains a collection of RecordBatchs.
  • RecordBuffer asdlfj

linger.ms“:
the amount of waiting time in milliseconds after which the batch content is dispatched

buffer.memory“:
the total number of bytes in memory to buffer records waiting to be send to the brokers

max.block.ms“:
the amount of time in milliseconds during which the send() method will be blocked
used to force the back pressure on the producer

max.in.flight.requests.per.connection“:
the amount of simultaneous requests that can be made before the confirmation is received

Dispatch

1
2
3
4
5
6
7
8
9
10
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, messageText);
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}

when the send() method is invoked, it reaches out to cluster in order to receive information about partitions, topics and brokers. when the metadata is received, the send process goes through the following phases.

  1. Serialization: using the defined serializers
  2. Targeting Partition: determining to which partition the record shall be sent.
  3. Record Accumulation: putting ProducerRecords into RecordAccumulators
  4. Transmission: as well as receiving a RecordMetadata object with the receive statistics.

Consumer

is a long running application
it always looks for and process new messages by polling the Kafka cluster
this loop shall be interrupted only for valid reasons
they can have only one thread an one poll-loop
it affects how much you can reasonable expect a kafka consumer to do
too much time on processing the records will have big implications
slow consumer does not have impact on the cluster, producer or other consumers
it forces parallelism of message consumption in another more scalable way

Why polling

diversity of consumers
different needs
different capabilities
scalability in case of large number of consumers
less performance degradation
different consumption model
real-time processing
batch mode processing
Because Kafka consumers pull data from the topic, different consumers can consume the messages at different pace.
You can have one consumer processing the messages at real-time and another consumer processing the messages in batch mode

similar to creating a producer

the consumer’s subscribe() method is used to subscribe to a topic
takes a collection of strings for a list of topics
can use an overload for passing regular expressions for batch-subscription (subscribing to a series of topics “my-*“)
subscription shall happen in a single action.
any subsequent call on the subscribe() method will overwrite the previous subscription

unsubscribe() method

this cause unsubscribing from all topics.
the same effect can be achieved by subscribing to an empty list
automatic or dynamic partition assignment happens as a result of invoking subscribe()
enlisting the consumer to poll from all partitions
for multiple topics, the consumer will poll all partitions within those topics.
this could be a heavy action
the partition management is done behind the scene
when a new partition is added to an existing topic
the metadata about the cluster will have changed and will be sent to consumer
consumer realizes if any of its subscriptions is affected
automatically adds the new partition

SubscriptionState consumer maintains an internal object that manages its subscriptions
it contains all details related to the topics and partitions this consumer instance is subscribed or assigned to
it also plays an important role with the ConsumerCoordinator in managing the offsets

assign() method

in contrast, a consumer my want to have full control over the partitions from which it will poll messages.

it’s possible to assign the consumer to individual partitions using assign() method
in other words assigning specific partitions to a single consumer instance
the consumer will start polling from those partitions regardless of the topics to which they belong.
it takes a list of TopicPartitions and invoking it multiple times will have the same of overwrite effect.
a TopicPartition is a type safe data structure to represent individual partitions within a topic

if a partition is added
the consumer will be notified as a result of the protocol for retrieving metadata from the cluster

the list which is passed to the assign/subscribe method is used to create a SubscriptionState object

consumer-topic interaction

poll(timeout)

timeout: representing the number of milliseconds that the client will spend polling the cluster for messages before returning
it specifies the amount of time the message retrieval cycle will take
when the timeout expires the batch of records are returned
added to an in-memory buffer
parsed
deserialized
grouped into ConsumerRecords
by topic and partitioned

this method or the polling action in general, is the most critical process in the entire consumer component is the poll loop
continuously polling brokers for data in a reliable fashion
it returns an object of type ConsumerRecords containing all the records it could retrieve from the broker
as a result of the poll operation the metadata about the cluster is also requested.
the communication toward the cluster is done when consumer’s Fetcher object communicates with the cluster through Consumer Network Client and TCP packets
particularly, a Fetcher object in the consumer is responsible for most of communcation between the consumer and the cluster
with the client open, the consumer starts sending heartbeats (to let the cluster know which consumers are connected)
Fetcher should know what topics or individual partitions it should be asking for which is defined in the SubscriptionState object
when the timeout expires, Fetcher will return the grouped ConsumerRecords
as a poll operation opens network resources, it’s a good idea to enclose it with structured exception handling and make sure it closes in the end.

options for developing and configuring consumer applications at scale

enable.auto.commit: Kafka accepts the responsibility to decide when current position offsets are upgraded to full committed offsets (a blind approach, might not be a good choice in case of failures). Kafka does this using a frequence set via auto.commit.interval

single broker
two topics
three partitions per topic

add new partition
kafka/bin/kafka-topics.sh –zookeeper localhost:2181 –alter –topic biology –partitions 4

Share Comments