We (Dinesh Kumar Ashokkumar, Rushiraj Chavan, and I) have recently debugged an issue related to Apache Kafka v0.8.2 (also exists in prior versions). The issue is not peculiar and yet it is very interesting to find that we are ones to use Kafka with that configuration and have hit it.
Problem: Consumer does not receive data even when there is lot of lag in the partitions of a topic
Before we go into the bug details, let me lay down the basic design of Kafka.
Kafka is a distributed, fault-tolerant, persistent message queue (or a key-value store). Kafka supports named queues namely topics. Like other key-value distributed systems, key space is divided into partitions and incoming data of a topic is stored in different partitions. Most important point to note in Kafka is that no. of partitions are configured by user explicitly and not dynamic, that is Kafka does not automatically split or merge partitions. However, user can increase the partition count of a topic, but that might require a downtime for that topic.
From Broker side of view: A partition is owned by a broker (dubbed as: leader of a partition) and it takes the responsibility to replicate the partition with other brokers who act as replicas. If you want to understand different replication models in distributed systems, take a detour. In Kafka, there is no fixed synchronous and asynchronous replication. User makes a choice and decides to go with synchronous and asynchronous replication. Producer can enforce the broker behavior on broker replication using request.require.acks and producer.type = sync.
From Producer side of view: A partition for an incoming message is identified by a key derived either from metadata if available or from message. For now, you can trust that in-built Kafka key derivation is good enough to achieve event distribution of data among the partitions.
From Consumer side of view: A partition can only be assigned to a single consumer. That is, data from a partition can not be retrieved by multiple consumers. While this seems highly restrictive for a distributed system, taking a macro view where in we hide ‘partition’ concept and look at distributed queue level, there are still multiple consumers who would be able to retrieve in parallel. There is a reason for this restriction from broker side of view, that broker does not have deal with synchronization with in a partition and can avoid all the over complications. Because of this association of a single consumer for a partition, the no. of consumers is limited by the no. of partitions. Note that, if the no. of consumers are less than the no. of partitions, then a single consumer would be assigned multiple partitions. If no. of consumers are more than no. of partitions, then extra consumers would be just idling and do not receive any data.
In Kafka, maximum message size (= Size of Message + Size of Metadata) has to be configured and bounded. It is controlled by different configuration parameters in different layers. In Broker, it is message.max.bytes; In Consumer, it is fetch.message.max.bytes.
Now, let’s go little deep into details that lead to this bug.
For every Consumer Connector, client library internally forks a fetcher thread for each Broker. Library also forks other threads such as fetcher manager threads, and a leader thread, but those are not relevant for this discussion. A fetcher thread for each Broker is responsible to retrieve data from that Broker. Consumer knows which partitions are assigned to it, it then divides the partitions into a separate set for each broker based on which partition is lead by which broker. Fetcher thread then takes the partition set of this consumer and the broker it is attached to and makes a data request. Upon receiving a data request from a fetcher, Broker tries to package one chunk for each partition (#BrokerResponsePacking). Chunk size is nothing but message size. Kafka Broker is limited to pack maximum of 2 GB data (Maximum value of Signed 32-bit integer) (#BrokerMaxResponseSize) in a single response to fetcher.
In this very scenario, we have hit the bug: https://issues.apache.org/jira/browse/KAFKA-1196
To understand the bug, let’s revisit some points:
– Each consumer is assigned a set of partitions on connecting to the cluster (assuming even distribution
No. of partitions assigned to a consumer f(p, consumer) = Total Partition Count of a Topic f (p, topic) / Total Consumer Count f(c, total)
– In the worst case, all partitions assigned to a consumer are lead by one single broker. That means, one fetcher thread of that consumer connector requests all partitions from a single broker. Broker then tries to respond to fetcher by packing one chunk for each partition
Broker Response Size f(b, response_size) = f(p, consumer) * Maximum Message Size f(m, max_size)
– As mentioned above (#BrokerMaxResponseSize)
f(b, response_size) <= 2 GB
Implies, f(p, consumer) * f(m, max_size) < 2 GB
Implies, f(m, max_size) * f(p, consumer) < 2 GB
Implies, f(m, max_size) * f (p, topic) / f(c, total) < 2 GB
If one does not follow the above equation, he/she will into trouble. Broker does not check 2GB, integer overflow happens. Check the above bug for more details.
Why did we land into this trouble?
In our case, we have a topic that is consumed by different types of processes and hosts big messages (Max Message Size 300 MB). One is the actual message processing system and so has to be highly scalable. Hence, higher partition count has been chosen. However, there are other consumers to the same topic (of course, different consumer group) which just do very light weight processing (ex: book keeping). Book keeping takes very less CPU, so was hoping to run just one consumer for this group and for the whole topic.
Max Message Size = f(m, max_size) = 300 MB
Total Partition Count of a Topic = f(p, topic) = 360
Total Consumer Count = 1 (for book keeping consumer group)
No. of partitions assigned to a consumer = f(p, consumer) = 360 / 1 = 360
Broker Response Size = f(b, response_size) = f(m, max_size) * f(p, consumer) = 300 MB * 360 = 108000 MB = 105 GB (approximately)
As you can see, broker response size is way beyond the max response size of 2 GB. As a result, integer overflow happened in broker and that lead to in deterministic behavior in broker resulting in broker not sending any response. Fetcher threads keep sending data requests but never get any response. So, even though there are messages fetcher threads do not get any message.
What is the solution?
Thankfully, Kafka support message compression. Thankfully again, max message size in all the above equations corresponds to compressed size. Luckily, in our case the messages are text messages and the compression ratio was superb. Our 300 MB message came to 50 MB after GZIP compression. So, we have enabled compression.
We have also observed that most of the messages are not really big. The typically 80-20 rules applied to message sizes in our case. 80% of messages are less than 10 MB (uncompressed), 20% messages are more than 10 MB (uncompressed). So, we have split the data to go into two separate topics such as small-messages and big-messages. Topic small-messages is configured to have many partitions (i.e. 360) and Topic big-messages is configured to have few partitions (i.e. 72).
Well the story is not over yet. We then got into issues with Java Max Heap and Kafka Buffers.
As you know that Sun’s (now Oracle’s) JVM implementation requires Max Heap Size to be specified up front and heap can’t go beyond this. Application has to account for the heap requirements of not just their application code but all the libraries it is using.
In case of Kafka Client, Consumer Connector allocates buffers for partitions assigned to it. No. of buffers allocated for each partition is configurable using queued.max.message.chunks.
No. of buffers allocated for each Partition = f(buf, partition)
No. of buffers allocated for each Consumer Connector = f(p, consumer) * f(buf, partition)
Unfortunately, Kafka Client library does not take into account multi-threaded applications. That is, if there is a Kafka application that has multiple threads each with its own Consumer Connector, then Kafka Client allocates buffers for each Consumer connector separately though they are all in same JVM and buffers are protected for thread-safety due to parallel access by fetcher threads and application thread.
No. of threads (and so consumers) per JVM = f(c, jvm)
No. of buffers allocated per JVM = f(c, jvm) * f(p, consumer) * f(buf, partition)
Total heap memory required by Kafka = No. of buffers allocated per JVM * Maximum Buffer Size = f(c, jvm) * f(p, consumer) * f(buf, partition) * f(m, max_size)
If you don’t account for this much of memory, you will keep getting into OutOfMemoryError even though your application memory requirements are quite less.
In our case of light weight book keeping application,
f(c, jvm) = 8, f(p, consumer) = 45, f(buf, partition) = 1, f(m, max_size) = 50 MB
Total heap memory required by Kafka = 8 * 45 * 1 * 50 MB = 18 GB (approximately)
Yes, that’s a shocking size, but that’s what we have to keep aside for Kafka to operate. We just required 2 GB for application and so totally we have configured the Max Heap as 24 GB.
Laxmi Narsimha Rao Oruganti