In my previous post “The Apache Kafka Basics”, the Kafka’s essential component members (Producer, Broker & Consumer) were introduced and a simple producer/consumer setup was explained using shell programs. In this post we want to get our hands more into coding by creating and configuring a Kafka producer utilizing the Kafka’s Java API.
- Creating Topic
- Producer
- Properties Description
- Create and Dispatch
- Consumer
- Properties Description
- Create and Receive
Creating Topic
3 partitions
3 brokers
3 replication factor
Producer
Properties Description
a Kafka producer can be instantiated using a set of properties among which a few of them are required. The producer owns an instance of the ProducerConfig
class which contains the property keys to configure a producer.
|
|
bootstrap.servers
:
The list of brokers from which the producer connects to the first available one in order to receive information about partitions, leaders and etc.
“key.serializer
“: The message-key serializer.
“value.serializer
“: The message-value serializer.
“acks
“:
This is the level of acknowledgement demanded by the producer for its sent messages. It could have one of the values mentioned below:
0
Fire and Forget1
Leader acknowledged2
Quorum acknowledged
“retries
“:
how many times a producer shall retry sending a message before aborting
“retry.backoff.ms
“:
the wait period in milliseconds between retries
“batch.size
“:
number of bytes that can be buffered per each RecordBatch.
limit on the amount of records to be buffered
if this size is reached the batch is dispatched immediately.
Batching Classes
RecordBatch
is a batch ofProducerRecord
s to be sent to the broker that owns the assigned partitionRecordAccumulator
s which are in-memory queue-like structure allows for micro-batching high-volume dispatches. it contains a collection ofRecordBatch
s.RecordBuffer
asdlfj
“linger.ms
“:
the amount of waiting time in milliseconds after which the batch content is dispatched
“buffer.memory
“:
the total number of bytes in memory to buffer records waiting to be send to the brokers
“max.block.ms
“:
the amount of time in milliseconds during which the send() method will be blocked
used to force the back pressure on the producer
“max.in.flight.requests.per.connection
“:
the amount of simultaneous requests that can be made before the confirmation is received
Dispatch
|
|
when the send() method is invoked, it reaches out to cluster in order to receive information about partitions, topics and brokers. when the metadata is received, the send process goes through the following phases.
- Serialization: using the defined serializers
- Targeting Partition: determining to which partition the record shall be sent.
- Record Accumulation: putting
ProducerRecord
s intoRecordAccumulator
s - Transmission: as well as receiving a
RecordMetadata
object with the receive statistics.
Consumer
is a long running application
it always looks for and process new messages by polling the Kafka cluster
this loop shall be interrupted only for valid reasons
they can have only one thread an one poll-loop
it affects how much you can reasonable expect a kafka consumer to do
too much time on processing the records will have big implications
slow consumer does not have impact on the cluster, producer or other consumers
it forces parallelism of message consumption in another more scalable way
Why polling
diversity of consumers
different needs
different capabilities
scalability in case of large number of consumers
less performance degradation
different consumption model
real-time processing
batch mode processing
Because Kafka consumers pull data from the topic, different consumers can consume the messages at different pace.
You can have one consumer processing the messages at real-time and another consumer processing the messages in batch mode
similar to creating a producer
the consumer’s subscribe() method is used to subscribe to a topic
takes a collection of strings for a list of topics
can use an overload for passing regular expressions for batch-subscription (subscribing to a series of topics “my-*“)
subscription shall happen in a single action.
any subsequent call on the subscribe() method will overwrite the previous subscription
unsubscribe() method
this cause unsubscribing from all topics.
the same effect can be achieved by subscribing to an empty list
automatic or dynamic partition assignment happens as a result of invoking subscribe()
enlisting the consumer to poll from all partitions
for multiple topics, the consumer will poll all partitions within those topics.
this could be a heavy action
the partition management is done behind the scene
when a new partition is added to an existing topic
the metadata about the cluster will have changed and will be sent to consumer
consumer realizes if any of its subscriptions is affected
automatically adds the new partition
SubscriptionState
consumer maintains an internal object that manages its subscriptions
it contains all details related to the topics and partitions this consumer instance is subscribed or assigned to
it also plays an important role with the ConsumerCoordinator
in managing the offsets
assign() method
in contrast, a consumer my want to have full control over the partitions from which it will poll messages.
it’s possible to assign the consumer to individual partitions using assign() method
in other words assigning specific partitions to a single consumer instance
the consumer will start polling from those partitions regardless of the topics to which they belong.
it takes a list of TopicPartition
s and invoking it multiple times will have the same of overwrite effect.
a TopicPartition
is a type safe data structure to represent individual partitions within a topic
if a partition is added
the consumer will be notified as a result of the protocol for retrieving metadata from the cluster
the list which is passed to the assign/subscribe method is used to create a SubscriptionState
object
consumer-topic interaction
poll(timeout)
timeout: representing the number of milliseconds that the client will spend polling the cluster for messages before returning
it specifies the amount of time the message retrieval cycle will take
when the timeout expires the batch of records are returned
added to an in-memory buffer
parsed
deserialized
grouped into ConsumerRecord
s
by topic and partitioned
this method or the polling action in general, is the most critical process in the entire consumer component is the poll loop
continuously polling brokers for data in a reliable fashion
it returns an object of type ConsumerRecords
containing all the records it could retrieve from the broker
as a result of the poll operation the metadata about the cluster is also requested.
the communication toward the cluster is done when consumer’s Fetcher
object communicates with the cluster through Consumer Network Client and TCP packets
particularly, a Fetcher
object in the consumer is responsible for most of communcation between the consumer and the cluster
with the client open, the consumer starts sending heartbeats (to let the cluster know which consumers are connected)Fetcher
should know what topics or individual partitions it should be asking for which is defined in the SubscriptionState
object
when the timeout expires, Fetcher
will return the grouped ConsumerRecord
s
as a poll operation opens network resources, it’s a good idea to enclose it with structured exception handling and make sure it closes in the end.
options for developing and configuring consumer applications at scale
enable.auto.commit
: Kafka accepts the responsibility to decide when current position offsets are upgraded to full committed offsets (a blind approach, might not be a good choice in case of failures). Kafka does this using a frequence set via auto.commit.interval
single broker
two topics
three partitions per topic
add new partition
kafka/bin/kafka-topics.sh –zookeeper localhost:2181 –alter –topic biology –partitions 4