HBase Capacity Planning – Part 1

HBase is a distributed, fault-tolerant, persistent, NoSQL store in Hadoop Stack.  Hadoop has been in the market for a long time now, but I could not find any simple guide for HBase Capacity Planning for our usage.  I am sure there exist great many articles on internet that discuss Hadoop Capacity Planning (including HBase), but somehow they did not cover the details I was looking for.  Here is my version of it. 

This is going to be lengthy blog post as I have tried to explain what’s happening inside HBase with each knob and side effects of changing it.  I would like to first convey my sincere thanks to all those who have blogged about different parameters and the their experiences playing with them for their requirements in their deployment.  With out them, this blog post is nothing.  Where possible, I have given a reference URL.  If I have missed any reference, please pardon me.

HBase is not just storage node, but also a compute node.  Hence, HBase Capacity Planning has to consider not just storage requirements but compute requirements too.  That means, all the standard resources such as: disk, CPU Cores, Memory, Network.  In fact, as you go through this blog you will realize that it is Memory that demands more capacity than disk size (at least, in our case).  But, Java G1 GC has come to rescue!

I am assuming you know how HBase works (esp. Regions, Region Servers, Column Family, HFiles, MemStore, HMaster, etc.).

Never the less, here are some basic details you should know.

– Each Column Family data is stored in a separate HFile (also called as store file).  Each Region, will have it’s own HFiles (at least one) and one MemStore

– With in a Region data gets reorganized from time to time with a process called ‘Compaction’ (like Checkpoints in Relational Database World).  Minor Compaction is a process of merging multiple small HFiles into one bigger HFile.  Major Compaction merges all HFiles of a Region into one big HFile. 

Read this if you can: http://hbase.apache.org/book.html#_compaction

I will first discuss different parameters of HBase and the design considerations around it to set a proper value.

Config ID Configuration Name (Property Name) Description and Design Considerations Optimal Value Reference URL
C1 HBase: Max Region Size (regions.hbase.hregion.max.filesize)

If we have very large value for Max Region Size (and so Max HFile Size), a major compaction can result in huge disk write that could be as much as Max Region Size. 

15 GB Click this
C2 HBase:: Heap Percentage (hbase.regionserver.global.memstore.lowerLimit)
(see also: hbase.regionserver.global.memstore.upperLimit, hbase.hstore.blockingStoreFiles)

This knob (lower limit) limits the memory to be used by MemStores.   Flush thread starts flushing the MemStores when memory usage is above lower limit.  It stops until the memory usage falls below the lower limit.  However, there is a catch, flusher thread also has to consider the no. of HFiles on disk (hbase.hstore.blockingStoreFiles). 

The upper limit controls when to block the writes to a region.

0.4 Click this and this 

C3 HBase: MemStore Cache Size before flush (in a way, Max MemStore Size)(hbase.hregion.memstore.flush.size)

As no. of regions grow, the no. of MemStores grow in equal proportion.  That means, memory requirement grows too due to no. of MemStores plus the Max Size of MemStore .  If you keep this low with memory in mind, many HFiles get created resulting in many compactions.  Not only that, the amount of HLog to keep also increases as Max MemStore Size. 

128 MB  
C4 HBase: Compact Threshold HFiles (hbase.hstore.compactionThreshold)

This knob controls when to run Minor Compaction (which indirectly controls Major Compactions).  

Each HFile (or store file) consumes resources  like file handles, heap for holding metadata, bloom filters, etc..

Not just that, no. of looks ups in bloom filters increase (though in memory) and so the reads become slow.

3  
C5 HBase: Max HFiles on disk for a region(hbase.hstore.blockingStoreFiles)

If there are already these many store files of a region on the disk, flusher thread is blocked.  If we block the flusher thread, eventually MemStore for the region fills up.  If the MemStore fills up, writes are blocked to that region

15  
C6 HBase: Minor Compaction Max HFiles
(hbase.hstore.compaction.max)

Max no. of files for minor compaction, it can be less but not more

10  
C7 HBase: Major Compaction Max HFiles
(hbase.hregion.majorcompaction)

Max no. of files for major compaction, it can be less but not more

  Click this
NA Combination Check

hbase.regionserver.global.memstore.lowerLimit <= (hbase.regionserver.hlog.blocksize *
hbase.regionserver.logroll.multiplier * hbase.regionserver.maxlogs)

NA Click this
NA Combination Check

hbase.hstore.blockingStoreFiles >= (2 * hbase.hstore.compactionThreshold)

NA  
C8 HBase: Block Cache Percentage (hfile.block.cache.size)

Block cache is a LRU Cache of blocks and is used by reads.  If your read requires more than Block cache, LRU policy is applied and old cache blocks are evicted.  High value of Block Cache reduces cache evictions, mostly improves read performance, but increases GC time.  Low value of Block Cache increases cache evictions, degrades read performance, but reduces GC time.  (Similar to Page Cache in RDBMS Systems)

0.25 Click this
C9 HBase: Client side Write Buffer Size (hbase.client.write.buffer)

HBase Client side buffer size for write caching.  Till this memory size is reached (or a flush is called), HBase Client does not send the writes to HBase Server.  High value increases the risk of data loss, but reduces load on HBase Cluster.  Low value reduces the risk of data loss but increases load on HBase Cluster.  This size also effects the HBase Region Server as that much data is sent by Client in single go and has to be processed by server.  Total memory requirement of HBase Server = hbase.client.write.buffer * hbase.regionserver.handler.count

2 MB Click this
NA HBase: Column Family Names, Qualifier Names

Keep them very short (1 to 3 characters).  People often refer to Row Compression (FAST_DIFF) as a savior to not worry about length of the names.  It is definitely true for on-disk representation (i.e. HFile).  However, when the data resides in memory MemStores, etc.  rows are stored in full form w/o any compression.  That means a lengthy names are going to waste lot of memory, resulting in less no. of rows.

NA Click this
C10 HDFS: Replication Factor None 3  

 

Now that we have discussed different configuration parameters let’s get into data points.  Data Point Values depend completely on your business scenario, HBase Table Schema, etc..

Data Point ID Data Point Name Data Point Description Data Point Value
D1 Seed Days No. of Days HBase pre-populated offline (seed data) before we bring it online 365 days
D2 Daily Row Count No. of HBase Rows that are inserted daily into the table 100 Million
D3 Row Size

Rough size of a row measured from your trials (heavily depends on on your schema, data value size, etc.).  Insert about 10K records in the table, issue flush and major compact from HBase Shell, then get HFile Size from HDFS.  Enable all the options like Row Compression, Bloom Filters, Prefix Encoding, Snappy Compression of Data, etc. for your exercise of measuring the row size. 

1024 Bytes
D4 Column Family Count No. of Column Families in your schema definition 2
D5 Data Write Throughput per Core

No. of rows that can be inserted in to the HBase Table per Core.  Our observation given in value column.   

There is also another reference: ~500 KB (93.5 MB/sec with 16 Nodes * 2 Processors / Node * 6 Cores / Processor)

(http://hypertable.com/why_hypertable/hypertable_vs_hbase_2/)

As you can see this is highly dependent on your schema.  For us, we could just observe only 250 KB per core.

250 KB

 

Let’s define hardware specification (per Node) (Example: Sun X42L Server)

 

Resource Name Actual Resource Specification Usable Resource Percentage Effective Available 
(= Actual Resource * Usable Resource / 100)
Disk No. of Disks * Disk Size = 10 * 24 TB = 240 TB 60 % 131481 GB (approx.)
CPU Cores No. of Processors * No. of Cores Per Processor = 2 * 4 = 8 Cores 60 % 4 Cores
Physical Memory (RAM) 240 GB 60 % 148 GB
JVM Memory

Hadoop stack is Java Code base and so all services are JVMs.  JVM is limited to Max Heap Size.  Max Heap Size is heavily dependent on how good Garbage Collection works. 
Till Java 6, the most reliable GC was Concurrent Mark Sweep (CMS) GC.    The Max Heap a CMS GC can work well was only 24 GB. 

With Java 7, G1 GC has been introduced.  G1 GC has exhibited to work well with 100 GB Max Heap.

http://blog.cloudera.com/blog/2014/12/tuning-java-garbage-collection-for-hbase/

80% 19.2 GB (CMS)
80 GB (G1)

 

Let’s calculate the resource requirements

Disk:

If we have to plan for 1 year ahead, Total No. of Days = Seed Days (D1) + 365 Upcoming Days = 730 Days

No. of Rows = No. of Days *  Daily Row Count (D2) = 730 * 100 Million = 73000 Million

Disk Size w/o Replication = No. of Rows * Row Size (D3) = 73000 Million * 1 KB = 73000 GB

Disk Size w/ Replication = Replication Factor (C10) * Disk Size w/o Replication = 3 * 73000 GB = 219000 GB

Memory:

No. of Regions = Disk Size w/o Replication / Max Region Size (C1) = 73000 GB / 15 GB = 4867

No. of MemStores (one MemStore per Region per Column Family) = No. of Regions * No. of Column Families (D4) = 4867 * 2 = 9734

Memory Size of MemStores = No. of MemStores * Max MemStore Size (C3) = 9734 * 128 MB = 1217 GB

Memory Size = Memory Size of MemStores / Heap Percentage (C2) = 1217 GB / 0.4 = 3043 GB

CPU:

Row Count per Second = Daily Row Count (D2) / No. of Seconds in a Day = 100 Million / (24 * 60 * 60) = ~1200 Rows

Incoming Volume per Second = Row Count per Second * Row Size (D3) = 1200 * 1 KB = 1200 KB

CPU Core Count = Incoming Volume per Second / Data Write Throughput per Core (D5) = 1200 KB / 250 KB = ~5 Cores (approx.)

 

Let’s find no. of nodes …

Node Count w.r.t. Disk = Total Disk Size (Required) / Usable Disk Size per Node = 219000 GB / 131481 GB = 2 (Ceiled)

Node Count w.r.t.  CPU = Total CPU Cores (Required) / Usable Core Count per Node = 5 Cores / 4 Cores = 2 (Ceiled)

IMPORTANT: Node Count w.r.t. Memory is limited by JVM Memory.  Please note that, we can not run multiple HBase Region Servers on a single node.  That is, one Region Server JVM per one Node.  That means, the memory usable on a node is a minimum of JVM Memory and Physical Memory.

Node Count. w.r.t. Memory (CMS GC) = Total Memory Size (Required) / Min (JVM Memory, Physical Memory) = 3043 GB / Min(19.2 GB, 148 GB) = 159(Ceiled)

Node Count. w.r.t. Memory (G1 GC) = Total Memory Size (Required) / Min (JVM Memory, Physical Memory) = 3043 GB / Min(80 GB, 148 GB) = 39 (Ceiled)

Node Count w.r.t. all resources = Max (Node Count w/ each Resource) = 39 (G1 GC), 159 (CMS GC)

As you can see it is the Memory that demands more nodes than disk.  Note also that, has G1 GC did not increase the ability to have a big JVM Heap – lot of memory on the node would have been lying idle.

 

Does virtualization (VMs) help?

HBase uses Log Structured Merge (LSM) Trees for its indexes in HFile.  LSM demands that disk is not shared with any other process. As a result, Hadoop experts do not recommend VMs as the disk gets shared between VMs and performance degrades very badly.  But wait, if we have multiple disks on the same that can help.  By multiple disk, I am not talking about virtual disk, but physical spindles.   If your hardware (bare metal node) has multiple disks spindles, and you can create VMs such a way that no two VMs share the same physical spindle/disk, then by all means one can go for virtualization.

 

Further work

This article does not discuss about Reads.  Reads are affected by C8 and C9 configurations.  But in our case, read workload did not change much of our capacity planning.

 

Thanks,

Laxmi Narsimha Rao Oruganti

 

Update-1: Memory Size calculation was wrong.  Should consider only Disk Size w/o Replication, but was considering Disk Size w/ Replication.  Fixed it now.

Apache Kafka: Case of mysterious rebalances

We (Dinesh Kumar Ashokkumar and I) have recently debugged another issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not an issue per se but learning things hard way which is a side effects of a Kafka design choice.

I am assuming you know about Apache Kafka.  If not, you may want to read my other post on Kafka which has short brief on it.

Problem: Clients of a topic rebalance every now and then, even if there are no connections or disconnections

Before we get into the details, let me lay down the context and conditions.

Apache Kafka uses Apache Zoo Keeper in different ways.  Brokers use it for state management, and partition leader uses it for election and Consumer Clients use it for detecting the connects and disconnects of other Consumer Clients.  It is Consumer Client usage that I am going to discuss here.

In Apache Kafka v0.8.2 (and prior versions), Consumer Clients are “thick” and “smart” clients in the sense that they coordinate between themselves for partition allocation (or assignment) among all the consumer connectors.

Apache Kafka High Level Consumer API, supports a single consumer connector to receive data of a given consumer group across multiple topics.  That means, if you have completely different topics but have same consumer group name, you can use one connector to receive the data from all the topics.  While this is a powerful feature when data has to be retrieved from multiple similar topics, it became a deadly feature for us.

Here are Zoo Keeper paths used by Kafka Consumers (kafka.consumer.ZookeeperConsumerConnector is the class that deals with zookeeper primarily):

Purpose Zoo Keeper Path Value
Consumer Partition Owner Registry /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] consumer_connector_id
Consumer Commit Offsets Registry /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] offset_counter_value
Consumer Identifiers Registry /consumers/[group_id]/ids/[consumer_connector_id]  

Every consumer creates an ephemeral node under “Consumer Identifiers Registry” tree and registers itself as a watcher on the tree root.  This helps a consumer client know about any new consumer client connects and any old consumer client disconnects.

As you can see this registry path is different from other registry paths of the consumer.  Especially, Identifiers Registry is not tied to a topic but others are.  This difference comes from the feature discussed above.  If a consumer client connects with a consumer group name plus a set of topics, then that consumer client needs to be informed of client connects and disconnects in that consumer group across any topic.  To facilitate that, Kafka team has chosen to not have topic in the Identifiers Registry Path.

Now, let’s dig little deep into the bug

Our topics are named aptly and we have put an effort to make sure the topic name clearly identifies its place in our architecture.

For a given topic, we have roughly 120+ partitions, and we chose to have separate consumer client (either a thread or a process) for each partition.  That means, for a given topic and a given consumer group, we have about 120+ consumer clients.

For a given topic, we have a main consumer and it connects to that topic with consumer group name as “main”.  Of course, there are other consumers of the same topic that connect to it with a different consumer group name.  Like this we have completely different topics and each has different consumer clients connecting to it.

Now, in this situation if any topic has a new client or an existing client has disconnected, it is perfectly fine to have a rebalance of consumer clients of that particular topic.  Instead what we have observed is that consumer clients of all topics have started rebalancing.  While it is easy to say now (post finding the issue), it was not the same when the rebalances are happening, esp. since we did not know that all are nodes are rebalancing.   While we were debugging the issue, we focused only on one topic and consumer group combo and tried to find a node is going into rebalance and never looked a macro picture.  But after few days of debugging, we observed that whole gamut of consumer clients are rebalancing not just of one topic.  We first suspected something seriously wrong with cluster (just a hunch, no real proof), so we have shutdown the Kafka cluster and brought it up.  The problem disappeared!, hooray!

Well the problem did not disappear for long, it started again when one client of a topic disconnected, all clients across all topics started rebalancing again.  We have understood that it is beyond a topic plus consumer combo logic that is tricking us in.

Feature or Trap?

This happens because of the Consumer Identifier Registry design.  We have 10+ topics, with 120+ consumer clients using the consumer group name “main” for each topic, tallies to 1200+ consumers using consumer group name “main”.  These many consumer clients get registered under Zoo Keeper path “/consumers/main/ids/” and every consumer client registers itself as a watcher on this whole tree.

If any consumer client of any topic disconnects, an ephemeral node of that consumer client under this Zoo Keeper path gets deleted and Zoo Keeper triggers all watchers of the tree path.  That means, all 1200+ consumer clients (minus just disconnected consumer client) get notified by Zoo Keeper.  Apache Kafka Client triggers rebalance as a response to this Watch event trigger.

As you can see, this is a well meaning feature for simplifying consumers to receive data from similar topics with same consumer group name has turned out to be disastrous for us.

What is the way out?

Solution is simple, just use a unique consumer group name.  Avoid generic consumer group names by all means.

We had put a very good time in naming topics but not in consumer group names.  This experience has led us to put good time in naming the consumer groups not only well, but unique.

Thanks,

Laxmi Narsimha Rao Oruganti

 

Updates:

Grammar corrections (September 22, 2015)

Apache Kafka: Case of Large Messages, Many Partitions, Few Consumers

We (Dinesh Kumar Ashokkumar, Rushiraj Chavan, and I) have recently debugged an issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not peculiar and yet it is very interesting to find that we are ones to use Kafka with that configuration and have hit it.

Problem: Consumer does not receive data even when there is lot of lag in the partitions of a topic

Before we go into the bug details, let me lay down the basic design of Kafka.

Kafka is a distributed, fault-tolerant, persistent message queue (or a key-value store).  Kafka supports named queues namely topics.  Like other key-value distributed systems, key space is divided into partitions and incoming data of a topic is stored in different partitions.  Most important point to note in Kafka is that no. of partitions are configured by user explicitly and not dynamic, that is Kafka does not automatically split or merge partitions.  However, user can increase the partition count of a topic, but that might require a downtime for that topic.

From Broker side of view: A partition is owned by a broker (dubbed as: leader of a partition) and it takes the responsibility to replicate the partition with other brokers who act as replicas.  If you want to understand different replication models in distributed systems, take a detour.  In Kafka, there is no fixed synchronous and asynchronous replication.  User makes a choice and decides to go with synchronous and asynchronous replication.  Producer can enforce the broker behavior on broker replication using request.require.acks and producer.type = sync. 

From Producer side of view: A partition for an incoming message is identified by a key derived either from metadata if available or from message.  For now, you can trust that in-built Kafka key derivation is good enough to achieve event distribution of data among the partitions.

From Consumer side of view: A partition can only be assigned to a single consumer.  That is, data from a partition can not be retrieved by multiple consumers.  While this seems highly restrictive for a distributed system, taking a macro view where in we hide ‘partition’ concept and look at distributed queue level, there are still multiple consumers who would be able to retrieve in parallel.  There is a reason for this restriction from broker side of view, that broker does not have deal with synchronization with in a partition and can avoid all the over complications.  Because of this association of a single consumer for a partition, the no. of consumers is limited by the no. of partitions.  Note that, if the no. of consumers are less than the no. of partitions, then a single consumer would be assigned multiple partitions.  If no. of consumers are more than no. of partitions, then extra consumers would be just idling and do not receive any data.

In Kafka, maximum message size (= Size of Message + Size of Metadata) has to be configured and bounded.  It is controlled by different configuration parameters in different layers.  In Broker, it is message.max.bytes; In Consumer, it is fetch.message.max.bytes.

Now, let’s go little deep into details that lead to this bug.

For every Consumer Connector, client library internally forks a fetcher thread for each Broker.  Library also forks other threads such as fetcher manager threads, and a leader thread, but those are not relevant for this discussion.   A fetcher thread for each Broker is responsible to retrieve data from that Broker. Consumer knows which partitions are assigned to it, it then divides the partitions into a separate set for each broker based on which partition is lead by which broker.  Fetcher thread then takes the partition set of this consumer and the broker it is attached to and makes a data request.  Upon receiving a data request from a fetcher, Broker tries to package  one chunk for each partition (#BrokerResponsePacking).  Chunk size is nothing but message size.  Kafka Broker is limited to pack maximum of 2 GB data (Maximum value of Signed 32-bit integer) (#BrokerMaxResponseSize) in a single response to fetcher.

In this very scenario, we have hit the bug: https://issues.apache.org/jira/browse/KAFKA-1196

To understand the bug, let’s revisit some points:

– Each consumer is assigned a set of partitions on connecting to the cluster (assuming even distribution

No. of partitions assigned to a consumer f(p, consumer) = Total Partition Count of a Topic f (p, topic) / Total Consumer Count f(c, total)

– In the worst case, all partitions assigned to a consumer are lead by one single broker.  That means, one fetcher thread of that consumer connector requests all partitions from a single broker.  Broker then tries to respond to fetcher by packing one chunk for each partition

Broker Response Size f(b, response_size) = f(p, consumer) * Maximum Message Size f(m, max_size)

– As mentioned above (#BrokerMaxResponseSize)

f(b, response_size) <= 2 GB

Implies, f(p, consumer) * f(m, max_size) < 2 GB

Implies, f(m, max_size)  * f(p, consumer)  < 2 GB

Implies, f(m, max_size)  * f (p, topic)  /  f(c, total) < 2 GB

If one does not follow the above equation, he/she will into trouble.  Broker does not check 2GB, integer overflow happens.  Check the above bug for more details.

Why did we land into this trouble?

In our case, we have a topic that is consumed by different types of processes and hosts big messages (Max Message Size 300 MB).  One is the actual message processing system and so has to be highly scalable.  Hence, higher partition count has been chosen.  However, there are other consumers to the same topic (of course, different consumer group) which just do very light weight processing (ex: book keeping).  Book keeping takes very less CPU, so was hoping to run just one consumer for this group and for the whole topic.

Max Message Size = f(m, max_size) = 300 MB

Total Partition Count of a Topic = f(p, topic) = 360

Total Consumer Count  = 1 (for book keeping consumer group)

No. of partitions assigned to a consumer = f(p, consumer) = 360 / 1 = 360

Broker Response Size = f(b, response_size) = f(m, max_size)  * f(p, consumer)  = 300 MB * 360 = 108000 MB = 105 GB (approximately)

As you can see, broker response size is way beyond the max response size of 2 GB.  As a result, integer overflow happened in broker and that lead to in deterministic behavior in broker resulting in broker not sending any response.  Fetcher threads keep sending data requests but never get any response.  So, even though there are messages fetcher threads do not get any message.

What is the solution?

Thankfully, Kafka support message compression.  Thankfully again, max message size in all the above equations corresponds to compressed size.  Luckily, in our case the messages are text messages and the compression ratio was superb.  Our 300 MB message came to 50 MB after GZIP compression.   So, we have enabled compression.

We have also observed that most of the messages are not really big.  The typically 80-20 rules applied to message sizes in our case.  80% of messages are less than 10 MB (uncompressed), 20% messages are more than 10 MB (uncompressed).  So, we have split the data to go into two separate topics such as small-messages and big-messages.  Topic small-messages is configured to have many partitions (i.e. 360) and Topic big-messages is configured to have few partitions (i.e. 72).

Well the story is not over yet.  We then got into issues with Java Max Heap and Kafka Buffers.

As you know that Sun’s (now Oracle’s) JVM implementation requires Max Heap Size to be specified up front and heap can’t go beyond this.  Application has to account for the heap requirements of not just their application code but all the libraries it is using.

In case of Kafka Client, Consumer Connector allocates buffers for partitions assigned to it.   No. of buffers allocated for each partition is configurable using queued.max.message.chunks.

No. of buffers allocated for each Partition = f(buf, partition)

No. of buffers allocated for each Consumer Connector = f(p, consumer) * f(buf, partition)

Unfortunately, Kafka Client library does not take into account multi-threaded applications.  That is, if there is a Kafka application that has multiple threads each with its own Consumer Connector, then Kafka Client allocates buffers for each Consumer connector separately though they are all in same JVM and buffers are protected for thread-safety due to parallel access by fetcher threads and application thread.

No. of threads (and so consumers) per JVM = f(c, jvm)

No. of buffers allocated per JVM = f(c, jvm) * f(p, consumer) * f(buf, partition)

Total heap memory required by Kafka = No. of buffers allocated per JVM * Maximum Buffer Size = f(c, jvm) * f(p, consumer) * f(buf, partition) * f(m, max_size)

If you don’t account for this much of memory, you will keep getting into OutOfMemoryError even though your application memory requirements are quite less.

In our case of light weight book keeping application,

f(c, jvm) = 8, f(p, consumer) = 45, f(buf, partition) = 1, f(m, max_size) = 50 MB

Total heap memory required by Kafka = 8 * 45 * 1 * 50 MB = 18 GB (approximately)

Yes, that’s a shocking size, but that’s what we have to keep aside for Kafka to operate.  We just required 2 GB for application and so totally we have configured the Max Heap as 24 GB.

Thanks,

Laxmi Narsimha Rao Oruganti