Apache Kafka: Case of mysterious rebalances

We (Dinesh Kumar Ashokkumar and I) have recently debugged another issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not an issue per se but learning things hard way which is a side effects of a Kafka design choice.

I am assuming you know about Apache Kafka.  If not, you may want to read my other post on Kafka which has short brief on it.

Problem: Clients of a topic rebalance every now and then, even if there are no connections or disconnections

Before we get into the details, let me lay down the context and conditions.

Apache Kafka uses Apache Zoo Keeper in different ways.  Brokers use it for state management, and partition leader uses it for election and Consumer Clients use it for detecting the connects and disconnects of other Consumer Clients.  It is Consumer Client usage that I am going to discuss here.

In Apache Kafka v0.8.2 (and prior versions), Consumer Clients are “thick” and “smart” clients in the sense that they coordinate between themselves for partition allocation (or assignment) among all the consumer connectors.

Apache Kafka High Level Consumer API, supports a single consumer connector to receive data of a given consumer group across multiple topics.  That means, if you have completely different topics but have same consumer group name, you can use one connector to receive the data from all the topics.  While this is a powerful feature when data has to be retrieved from multiple similar topics, it became a deadly feature for us.

Here are Zoo Keeper paths used by Kafka Consumers (kafka.consumer.ZookeeperConsumerConnector is the class that deals with zookeeper primarily):

Purpose Zoo Keeper Path Value
Consumer Partition Owner Registry /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] consumer_connector_id
Consumer Commit Offsets Registry /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] offset_counter_value
Consumer Identifiers Registry /consumers/[group_id]/ids/[consumer_connector_id]  

Every consumer creates an ephemeral node under “Consumer Identifiers Registry” tree and registers itself as a watcher on the tree root.  This helps a consumer client know about any new consumer client connects and any old consumer client disconnects.

As you can see this registry path is different from other registry paths of the consumer.  Especially, Identifiers Registry is not tied to a topic but others are.  This difference comes from the feature discussed above.  If a consumer client connects with a consumer group name plus a set of topics, then that consumer client needs to be informed of client connects and disconnects in that consumer group across any topic.  To facilitate that, Kafka team has chosen to not have topic in the Identifiers Registry Path.

Now, let’s dig little deep into the bug

Our topics are named aptly and we have put an effort to make sure the topic name clearly identifies its place in our architecture.

For a given topic, we have roughly 120+ partitions, and we chose to have separate consumer client (either a thread or a process) for each partition.  That means, for a given topic and a given consumer group, we have about 120+ consumer clients.

For a given topic, we have a main consumer and it connects to that topic with consumer group name as “main”.  Of course, there are other consumers of the same topic that connect to it with a different consumer group name.  Like this we have completely different topics and each has different consumer clients connecting to it.

Now, in this situation if any topic has a new client or an existing client has disconnected, it is perfectly fine to have a rebalance of consumer clients of that particular topic.  Instead what we have observed is that consumer clients of all topics have started rebalancing.  While it is easy to say now (post finding the issue), it was not the same when the rebalances are happening, esp. since we did not know that all are nodes are rebalancing.   While we were debugging the issue, we focused only on one topic and consumer group combo and tried to find a node is going into rebalance and never looked a macro picture.  But after few days of debugging, we observed that whole gamut of consumer clients are rebalancing not just of one topic.  We first suspected something seriously wrong with cluster (just a hunch, no real proof), so we have shutdown the Kafka cluster and brought it up.  The problem disappeared!, hooray!

Well the problem did not disappear for long, it started again when one client of a topic disconnected, all clients across all topics started rebalancing again.  We have understood that it is beyond a topic plus consumer combo logic that is tricking us in.

Feature or Trap?

This happens because of the Consumer Identifier Registry design.  We have 10+ topics, with 120+ consumer clients using the consumer group name “main” for each topic, tallies to 1200+ consumers using consumer group name “main”.  These many consumer clients get registered under Zoo Keeper path “/consumers/main/ids/” and every consumer client registers itself as a watcher on this whole tree.

If any consumer client of any topic disconnects, an ephemeral node of that consumer client under this Zoo Keeper path gets deleted and Zoo Keeper triggers all watchers of the tree path.  That means, all 1200+ consumer clients (minus just disconnected consumer client) get notified by Zoo Keeper.  Apache Kafka Client triggers rebalance as a response to this Watch event trigger.

As you can see, this is a well meaning feature for simplifying consumers to receive data from similar topics with same consumer group name has turned out to be disastrous for us.

What is the way out?

Solution is simple, just use a unique consumer group name.  Avoid generic consumer group names by all means.

We had put a very good time in naming topics but not in consumer group names.  This experience has led us to put good time in naming the consumer groups not only well, but unique.

Thanks,

Laxmi Narsimha Rao Oruganti

 

Updates:

Grammar corrections (September 22, 2015)

Advertisements

Apache Kafka: Case of Large Messages, Many Partitions, Few Consumers

We (Dinesh Kumar Ashokkumar, Rushiraj Chavan, and I) have recently debugged an issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not peculiar and yet it is very interesting to find that we are ones to use Kafka with that configuration and have hit it.

Problem: Consumer does not receive data even when there is lot of lag in the partitions of a topic

Before we go into the bug details, let me lay down the basic design of Kafka.

Kafka is a distributed, fault-tolerant, persistent message queue (or a key-value store).  Kafka supports named queues namely topics.  Like other key-value distributed systems, key space is divided into partitions and incoming data of a topic is stored in different partitions.  Most important point to note in Kafka is that no. of partitions are configured by user explicitly and not dynamic, that is Kafka does not automatically split or merge partitions.  However, user can increase the partition count of a topic, but that might require a downtime for that topic.

From Broker side of view: A partition is owned by a broker (dubbed as: leader of a partition) and it takes the responsibility to replicate the partition with other brokers who act as replicas.  If you want to understand different replication models in distributed systems, take a detour.  In Kafka, there is no fixed synchronous and asynchronous replication.  User makes a choice and decides to go with synchronous and asynchronous replication.  Producer can enforce the broker behavior on broker replication using request.require.acks and producer.type = sync. 

From Producer side of view: A partition for an incoming message is identified by a key derived either from metadata if available or from message.  For now, you can trust that in-built Kafka key derivation is good enough to achieve event distribution of data among the partitions.

From Consumer side of view: A partition can only be assigned to a single consumer.  That is, data from a partition can not be retrieved by multiple consumers.  While this seems highly restrictive for a distributed system, taking a macro view where in we hide ‘partition’ concept and look at distributed queue level, there are still multiple consumers who would be able to retrieve in parallel.  There is a reason for this restriction from broker side of view, that broker does not have deal with synchronization with in a partition and can avoid all the over complications.  Because of this association of a single consumer for a partition, the no. of consumers is limited by the no. of partitions.  Note that, if the no. of consumers are less than the no. of partitions, then a single consumer would be assigned multiple partitions.  If no. of consumers are more than no. of partitions, then extra consumers would be just idling and do not receive any data.

In Kafka, maximum message size (= Size of Message + Size of Metadata) has to be configured and bounded.  It is controlled by different configuration parameters in different layers.  In Broker, it is message.max.bytes; In Consumer, it is fetch.message.max.bytes.

Now, let’s go little deep into details that lead to this bug.

For every Consumer Connector, client library internally forks a fetcher thread for each Broker.  Library also forks other threads such as fetcher manager threads, and a leader thread, but those are not relevant for this discussion.   A fetcher thread for each Broker is responsible to retrieve data from that Broker. Consumer knows which partitions are assigned to it, it then divides the partitions into a separate set for each broker based on which partition is lead by which broker.  Fetcher thread then takes the partition set of this consumer and the broker it is attached to and makes a data request.  Upon receiving a data request from a fetcher, Broker tries to package  one chunk for each partition (#BrokerResponsePacking).  Chunk size is nothing but message size.  Kafka Broker is limited to pack maximum of 2 GB data (Maximum value of Signed 32-bit integer) (#BrokerMaxResponseSize) in a single response to fetcher.

In this very scenario, we have hit the bug: https://issues.apache.org/jira/browse/KAFKA-1196

To understand the bug, let’s revisit some points:

– Each consumer is assigned a set of partitions on connecting to the cluster (assuming even distribution

No. of partitions assigned to a consumer f(p, consumer) = Total Partition Count of a Topic f (p, topic) / Total Consumer Count f(c, total)

– In the worst case, all partitions assigned to a consumer are lead by one single broker.  That means, one fetcher thread of that consumer connector requests all partitions from a single broker.  Broker then tries to respond to fetcher by packing one chunk for each partition

Broker Response Size f(b, response_size) = f(p, consumer) * Maximum Message Size f(m, max_size)

– As mentioned above (#BrokerMaxResponseSize)

f(b, response_size) <= 2 GB

Implies, f(p, consumer) * f(m, max_size) < 2 GB

Implies, f(m, max_size)  * f(p, consumer)  < 2 GB

Implies, f(m, max_size)  * f (p, topic)  /  f(c, total) < 2 GB

If one does not follow the above equation, he/she will into trouble.  Broker does not check 2GB, integer overflow happens.  Check the above bug for more details.

Why did we land into this trouble?

In our case, we have a topic that is consumed by different types of processes and hosts big messages (Max Message Size 300 MB).  One is the actual message processing system and so has to be highly scalable.  Hence, higher partition count has been chosen.  However, there are other consumers to the same topic (of course, different consumer group) which just do very light weight processing (ex: book keeping).  Book keeping takes very less CPU, so was hoping to run just one consumer for this group and for the whole topic.

Max Message Size = f(m, max_size) = 300 MB

Total Partition Count of a Topic = f(p, topic) = 360

Total Consumer Count  = 1 (for book keeping consumer group)

No. of partitions assigned to a consumer = f(p, consumer) = 360 / 1 = 360

Broker Response Size = f(b, response_size) = f(m, max_size)  * f(p, consumer)  = 300 MB * 360 = 108000 MB = 105 GB (approximately)

As you can see, broker response size is way beyond the max response size of 2 GB.  As a result, integer overflow happened in broker and that lead to in deterministic behavior in broker resulting in broker not sending any response.  Fetcher threads keep sending data requests but never get any response.  So, even though there are messages fetcher threads do not get any message.

What is the solution?

Thankfully, Kafka support message compression.  Thankfully again, max message size in all the above equations corresponds to compressed size.  Luckily, in our case the messages are text messages and the compression ratio was superb.  Our 300 MB message came to 50 MB after GZIP compression.   So, we have enabled compression.

We have also observed that most of the messages are not really big.  The typically 80-20 rules applied to message sizes in our case.  80% of messages are less than 10 MB (uncompressed), 20% messages are more than 10 MB (uncompressed).  So, we have split the data to go into two separate topics such as small-messages and big-messages.  Topic small-messages is configured to have many partitions (i.e. 360) and Topic big-messages is configured to have few partitions (i.e. 72).

Well the story is not over yet.  We then got into issues with Java Max Heap and Kafka Buffers.

As you know that Sun’s (now Oracle’s) JVM implementation requires Max Heap Size to be specified up front and heap can’t go beyond this.  Application has to account for the heap requirements of not just their application code but all the libraries it is using.

In case of Kafka Client, Consumer Connector allocates buffers for partitions assigned to it.   No. of buffers allocated for each partition is configurable using queued.max.message.chunks.

No. of buffers allocated for each Partition = f(buf, partition)

No. of buffers allocated for each Consumer Connector = f(p, consumer) * f(buf, partition)

Unfortunately, Kafka Client library does not take into account multi-threaded applications.  That is, if there is a Kafka application that has multiple threads each with its own Consumer Connector, then Kafka Client allocates buffers for each Consumer connector separately though they are all in same JVM and buffers are protected for thread-safety due to parallel access by fetcher threads and application thread.

No. of threads (and so consumers) per JVM = f(c, jvm)

No. of buffers allocated per JVM = f(c, jvm) * f(p, consumer) * f(buf, partition)

Total heap memory required by Kafka = No. of buffers allocated per JVM * Maximum Buffer Size = f(c, jvm) * f(p, consumer) * f(buf, partition) * f(m, max_size)

If you don’t account for this much of memory, you will keep getting into OutOfMemoryError even though your application memory requirements are quite less.

In our case of light weight book keeping application,

f(c, jvm) = 8, f(p, consumer) = 45, f(buf, partition) = 1, f(m, max_size) = 50 MB

Total heap memory required by Kafka = 8 * 45 * 1 * 50 MB = 18 GB (approximately)

Yes, that’s a shocking size, but that’s what we have to keep aside for Kafka to operate.  We just required 2 GB for application and so totally we have configured the Max Heap as 24 GB.

Thanks,

Laxmi Narsimha Rao Oruganti