Technical Story Telling: Oracle Data Cloud

 

I have been used to regular reorganizations (reorgs) as part of Microsoft stint.  After 15 Months into Oracle, just when I was wondering how come there are no reorgs in Oracle Corporation, I got a mail in a weeks time in May, 2014 that we have been reorganized into a new division named Oracle Data Cloud (ODC) headed by Oracle’s latest acquisition Blue Kai head Omar Tawakol.  Telepathy was in works, I suppose Smile.

Omar had a clear business vision on how the ODC should evolve and he has worked tirelessly to acquire another company Datalogix which augmented the product scope very well.  With three companies, namely Blue Kai – Datalogix – Collective Intellect, Oracle Data Cloud has got a solid product scope, huge market share and till today occupies leader quadrant. 

Vijay Amrit Agrawal has been recruited back into Oracle just after an year and has been tasked to setup a team in India under ODC, namely ODC India Team.  Here is a technical story that I have used while recruiting developers into Oracle Data Cloud.  Below, I have given a long story, but usually I have shortened the story appropriately based on the candidate’s experience and exposure.

 

Story – Blue Kai:

Google was a well known brand ~15 years ago.  Everyone who wanted to research about anything, went straight to google.com and searched.  It is this behavior that made Google a leader in digital ad domain.   As people visited the sites via Google search and results, Google knew exactly what the user was looking for and gave a high quality advertisements in the web sites ad panels or areas.  Google has been respected because of their non-intrusive simple-text ads because their quality was so good.  Around ~5 years back, FlipKart, Amazon, SnapDeal became well known brands.  Users who wanted to buy any product, went directly into these sites and searched leaving Google (or any other search engine) clueless on what the user was looking for.   Slowly, the ad quality of Google reduced as more and more sites became well known.  Google also became increasingly clueless, as browsers became advanced and helped the user hit the sites directly because they already have them in search history or have them bookmarked.  Google realized this soon and came-up with a new browser Google Chrome so that it can capture every site visited directly from browser w/o Google search.  While this worked for logged in users, it did not work for others.  Many sites feared sharing the sites search data to Google as they could be eaten by Google the giant.  Blue Kai realized this lack of trust between digital world is a huge opportunity to tap into.  Blue Kai came up with a smart data-sharing win-win proposition and silently worked with many online properties (sites) to buy into this business model.  Before we get into that business model, let’s ask few questions so that we can appreciate what is the unique business model that Blue Kai has offered that many bought into.

Would Amazon, FlipKart, Google share their search data to each other

Your answer would most probably be “No” to above questions.  But, with Blue Kai in the game, the answer is “Yes”!.  How? you may ask!

You share your data to me and I will give you all others data.  In this data sharing, the source site of the data is not maintained.  That is, when I share you others data I can’t tell you from where I got it.  Similarly, when I share your data to the world, I can’t tell them from where it is received.  But, I can tell you which user machine the search data is for.

Soon, Blue Kai has become the online data sharing hub for many online sites (guess, what that “may” means in number – millions of sites!).   OK, how does it work?

You go to Amazon.in and search for a TV.  You open another tab and hit FlipKart.  How would it be, if FlipKart displayed a set of TV Offers?  You open another tab and browsed a technical blog on Apache Kafka, and on a side panel Google displayed TV ads.  How would that be?

That’s the power that Blue Kai brings to all these sites.  It is not some offline data sharing, but real-time web-scale few micro seconds away sharing of the data.  Google has to process only its search data, Bing has to process its own search data, we at Blue Kai has to process every internet sites every search data in real-time and share the data with in few micro seconds to all others when asked.

Thanks to Blue Kai, Google ad quality has problem has been solved!  Once we have so much data, we surely know how to make money out of it.  We do charge the sites in this data-sharing model based on different business scenarios.

 

Story – Datalogix:

Chief Marketing Officers (CMOs) around the world started doubting the whole digital advertisement spend and it’s yield.  In case of TV ads, user’s attention is guaranteed as long as the user stays on the channel.  Where as, with non-intrusive ads in online world, it is not very clear for a CMO if the ad has been exciting for the users.  Google countered it with pay-per-click model of pricing where advertiser has to pay only when the ad is clicked and not just when it is merely displayed.  While this prove that user did see the ad based on click stats, it is still not clear if there is any increase in sales, especially if the advertisements are related to Offline world.  For example an ad like “Reliance Digital offers 35% discount on all Sony TVs”.  Where it is very hard to assess the offline store sales of Sony TVs  CMOs started asking why should we put in so much money if there is no increase in sales? If there is indeed increase in sales, what is the volume?  Is it worth?  How can one compute the net increase in sales as a result of digital ad campaign? 

Welcome to Datalogix, we at Datalogix solve this problem.  Datalogix is an interesting company in that it acts as a bridge between online and offline worlds.  Datalogix buys offline stores sales data in aggregated fashion without any identify of the buyer.  Here is an example record from Reliance Digital Store:

Store Location, Company Name, Product Name, Model, Week Number, Sales Volume

Kondapur, Sony, LED TV, Bravia 1234, 1, 10

Kondapur, Sony, Smart TV, Bravia 5678, 1, 5

Which store would not be happy to make money by giving such data which is not revealing any buyer identity?  Datalogix got this offline sales data from every offline store possible.

Now, it started pitching in to advertisement platforms such as Google, Bing, etc. that you share me your advertisement footprint, I would prove (or disprove) whether your advertisement footprints translates to online + offline sales.  Of course, advertisement platforms would be happy to be proved that digital advertisement works so that they can take this proof to CMOs. 

When a user browses any site where advertisements are displayed.  Assuming, the ads are by Google. Google would share the footprint of the ad such as

Location of Browsing Computer, Advertiser Company, Advertised Product Advertisement Served Time

Kondapur, Reliance Digital, LED TV, 2016-01-01 01:01

Kondapur, Reliance Digital, LED TV, 2016-01-01 02:02

Datalogix does a big-data-join between offline sales data and online advertisement foot print and proves (or disproves) if there is any correlation between online advertisement footprints volume vs. sales volume (by region).  This big-data-join, as you can see, involves aggregating data by region, product, week, etc. 

Advertiser can verify the Datalogix findings by talking to offline stores in the area that Datalogix proves/claims has seen increase in sales volume.    Datalogix being another company not associated with any advertisement platform is regarded well for its non-partiality and advertiser ability to verify the findings, make Datalogix a trust worthy.

We at Oracle Data Cloud are smart in making money.  We make money pre-advertisement by sharing the ad target data (Blue Kai) and we make money post-advertisement by helping prove sales yield and so advertisement quality (Datalogix). 

 

Story – Collective Intellect:

Blue Kai and Datalogix work very well as long as things are searched.  However, all that system fails if there is no searching involved, but just a textual discussion in online world – be it discussion boards, forums, social sites, etc.  That gap is filled by Collective Intellect.  Which I have covered in my previous blog post here.

 

(Disclaimer: Brands used in this post are just an example)

Technical Story Telling: Collective Intellect

 

Collective Intellect is a Boulder, CO, US start-up that Oracle has acquired in June, 2012.  I quit Microsoft and joined Oracle to work for this team in January, 2013.  Here is a technical story that I have used while recruiting developers into the team.

Story:

Bhargavi wanted to buy a Television (TV), so she went into doing a research by searching, reading articles, browsing different sites, different comparisons such as features, technologies.  She being a money-conscious person compared prices between different e-commerce sites such as FlipKart.com, Amazon.in, SnapDeal.com, etc.  She being a thorough researcher also researched which e-commerce sites are better, which seller is better, etc.  Finally, arrived at a TV model Samsung Smart LED TV 1357, e-commerce site, seller and purchased it.  Everyone around her appreciated her decision.

Bhavani also wants to buy TV, but she is not a researcher and trusts friends and family.  She reached out to her friends on Facebook about her desire to buy a TV as “Hi Friends, I want to buy a TV.  Any recommendations?”.  Bhargavi is a friend of Bhavani, saw the post and responded with details of her recent research, and recommendation of TV Samsung Smart LED TV 1357.

Saraswathi is another friend of Bhavani, saw this conversation and she chimed-in and responded about her ordeal  with he TV she has recently purchased.  “Hi Bhavani – I recently bought Sony Smart LED TV 2468 and I strictly recommend that you *not* go for this model”.  

 

Problem(s):

Businesses around the world want to improve their sales.  To improve their sales, they need to improve products.  To improve the products, businesses need feedback on their products.  What’s good and what’s bad about their product.  Businesses also want to reduce the damage due to their “bad” side of the product (of previous version).  Learning what’s not going good about their product in non-internet age was through feedback forms, etc.  In early internet age, businesses have sent e-mails after some time of the purchase (typically a month or two) to fill an online form.   The problem with this feedback is at the instant the form was filled, which might wary due to ongoing usage of product and it’s performance.  For example, a customer may be very happy after 1 month of usage.  But, after 6-months the same customer might be completely upset about the purchase because of other issues that have cropped up.  Knowing these issues and addressing them is very important for a business to succeed.  Reaching out periodically over an e-mail may not work out as it may be regarded as overreaching and or even spamming!.  The unhappy customers are vocal and not only the business has lost him/her as a customer, but because of their vocal nature potential future customers also refrain from their products.  In the current internet age, customers share their feedback in variety of ways – blogs, discussion boards and forums, review sites, ranking sites, social sites, etc. at the very instant they are unhappy about.  If a business can address the unhappy customer at the right time, damage can be controlled by a great margin.  Imagine United Airlines had a way to get notified about this video posted by their customer whose guitar was mishandled plus staff indifference towards the issue before going viral, how powerful would that be?

The problem with online world is there is too much of data for any company to handle.  Lot of that data is not relevant to a business.  Extracting out the relevant data (signal) out of so much data (noise) is a software problem and not a business problem (unless business is also about a software). 

Adding to this, issues of a product may not be global but local, may be due to the local environment or manufacturing site, or some other.  Aggregating and drilling into this information at ease would be a huge plus for any business.  

 

Solution:

Collective Intellects collects the textual data from all the online media such as blogs, news, forums, boards, review sites, social sites, etc.   Analyze all the data (mostly noise) to extract important information (signal).  In this process, the product drops most of the data as the online is full of conversations that are not related to businesses using Natural Language Processing (NLP) domain algorithms. 

In the above story, the discussion is around Entity “TV”.  Bhavani’s post contains “Purchase” language ,  Saraswathi’s response contains a “Support” language, Bhargavi’s response contains “Promotion” language.  It is these meanings that Collective Intellect identifies and then shares the information to TV Companies if they are our customers.   Customer’s can then route this information to appropriate departments with in their company, such as “Support” language events becoming a Support ticket, “Purchase” Language becoming a “Sales” lead, “Promotion” language becoming a “Loyalty” Program.

To give you an idea of this working in real world, have you observed this in Facebook?

Laxmi: I am completely fed-up with my Vodafone connection.  While my SIM Card works well, my wife’s SIM Card does not work in the same house.

Vodafone Customer Care:  Dear Laxmi, We are really sorry for the inconvenience caused.  Can you please share more details of the problem such as which SIM Card is working and which is not,  the locality, etc. so that we can dig deep.

Laxmi: Here are the details.  Working SIM: 1234567890, Failing SIM: 9876543210, Location: Hyderabad

Vodafone Customer Care:  Based on our backend analysis, we think that SIM Card is corrupted.  We dispatched a new SIM Card to you, please try and let us know. 

Laxmi: I received the SIM Card and it works well.  Thanks for resolving the issue.

Wondered, how can Vodafone Customer Care know that Laxmi has posted about them of all the Facebook users and also a particular post that is targeting them of all the posts Laxmi made?  The magic behind that is the products like Collective Intellect.

Technical Story Telling

I have been part of recruiting engineers into different engineering teams I have worked for or associated with.  During the interviews, we have to explain what a company and/or a product does/offers, being technical people we tend to use many concepts of the functional domain.  While it works in some cases, it does not work all the times.  Especially if the candidate is not from the same domain, this job-sale-pitch results in a flop-show than a hit-show.  We are all humans first before we become engineers, so I found story telling technique works for technical stuff very well.  Trying to create a story for all our technical stuff not only helps in recruitment but also in explaining our work/company/product to family, relatives, friends.  This is an introduction article for my blogs on Technical Story Telling.  Actual stories would come soon.

Thanks,

Laxmi Narsimha Rao Oruganti

గణేశ పంచరత్నం

 

ముదాకరాత మోదకం సదావిముక్తిసాధకం, కలాధరావతంసకం విలాసిలోకరక్షకం,

అనాయకైకనాయకం వినాసితేభదైత్యకం, నతాసుభాసునాశకం నమామితం వినాయకం ||

 

నతేతరాతిభీకరం నవోదితార్క భాస్వరం, నమత్సురారినిర్జనం నతాధికాపదుద్ధరం,

సురేశ్వరం నిధీశ్వరం గజేశ్వరం గణేశ్వరం, మహేశ్వరం సమాశ్రయే పరాత్పరం నిరంతరం ||

 

సమస్తలోకశంకరం నిరస్తదైత్యకుంజరం, దరేదరోదరంవరం వరే భవక్త్రమక్షరం,

కృపాకరం క్షమాకరం ముదాకరం యశస్కరం, మనస్కరం నమస్కృతాం నమస్కరోమి భాస్వరం ||

 

అకించనార్తిమర్జనం చిరంతనోక్తిభాజనం, పురారి పూర్వనందనం సురారిగర్వ చర్వణం,

ప్రపంచనాశభీషణం ధనంజయాదిభూషణం, కపోలదానవారణం భజేపురాణవారణం ||

 

నితాంతికాంతదంతకాంతిమంతకాంతకాత్మజం, అచింత్యరూపమంతహీనమంతరాయక్రింతనం,

హృదంతరేనిరంతరం వసంతమేవయోగినాం, తమేకదంతమేవ తం విచింతయామి సంతతం ||

 

మహాగణేశ పంచరత్న మాదరేన  యోన్వహం, ప్రజల్పతి  ప్రభాతకే  హృదిస్మరణ్ గణేశ్వరం,

అరోగతామదోషతాం  సుసాహితీం సుపుత్రతాం, సమాహితాయురష్ట భూతి  మభ్యు పైతి శోచిరాత్ ||

మాటల తూటాలు: జనతా గ్యారేజ్ – పిల్లలు, ఆస్తి

 

GHMC ఆఫీసర్ తో, జూనియర్ NTR:
    మీ పిల్లలు మంచి వాళ్ళైతే, మీ ఆస్తిని కోరుకోరు
    మీ పిల్లలు చెడ్డ వాళ్ళైతే, మీ ఆస్తిని మిగల్చరు

HBase Capacity Planning – Part 1

HBase is a distributed, fault-tolerant, persistent, NoSQL store in Hadoop Stack.  Hadoop has been in the market for a long time now, but I could not find any simple guide for HBase Capacity Planning for our usage.  I am sure there exist great many articles on internet that discuss Hadoop Capacity Planning (including HBase), but somehow they did not cover the details I was looking for.  Here is my version of it. 

This is going to be lengthy blog post as I have tried to explain what’s happening inside HBase with each knob and side effects of changing it.  I would like to first convey my sincere thanks to all those who have blogged about different parameters and the their experiences playing with them for their requirements in their deployment.  With out them, this blog post is nothing.  Where possible, I have given a reference URL.  If I have missed any reference, please pardon me.

HBase is not just storage node, but also a compute node.  Hence, HBase Capacity Planning has to consider not just storage requirements but compute requirements too.  That means, all the standard resources such as: disk, CPU Cores, Memory, Network.  In fact, as you go through this blog you will realize that it is Memory that demands more capacity than disk size (at least, in our case).  But, Java G1 GC has come to rescue!

I am assuming you know how HBase works (esp. Regions, Region Servers, Column Family, HFiles, MemStore, HMaster, etc.).

Never the less, here are some basic details you should know.

– Each Column Family data is stored in a separate HFile (also called as store file).  Each Region, will have it’s own HFiles (at least one) and one MemStore

– With in a Region data gets reorganized from time to time with a process called ‘Compaction’ (like Checkpoints in Relational Database World).  Minor Compaction is a process of merging multiple small HFiles into one bigger HFile.  Major Compaction merges all HFiles of a Region into one big HFile. 

Read this if you can: http://hbase.apache.org/book.html#_compaction

I will first discuss different parameters of HBase and the design considerations around it to set a proper value.

Config ID Configuration Name (Property Name) Description and Design Considerations Optimal Value Reference URL
C1 HBase: Max Region Size (regions.hbase.hregion.max.filesize)

If we have very large value for Max Region Size (and so Max HFile Size), a major compaction can result in huge disk write that could be as much as Max Region Size. 

15 GB Click this
C2 HBase:: Heap Percentage (hbase.regionserver.global.memstore.lowerLimit)
(see also: hbase.regionserver.global.memstore.upperLimit, hbase.hstore.blockingStoreFiles)

This knob (lower limit) limits the memory to be used by MemStores.   Flush thread starts flushing the MemStores when memory usage is above lower limit.  It stops until the memory usage falls below the lower limit.  However, there is a catch, flusher thread also has to consider the no. of HFiles on disk (hbase.hstore.blockingStoreFiles). 

The upper limit controls when to block the writes to a region.

0.4 Click this and this 

C3 HBase: MemStore Cache Size before flush (in a way, Max MemStore Size)(hbase.hregion.memstore.flush.size)

As no. of regions grow, the no. of MemStores grow in equal proportion.  That means, memory requirement grows too due to no. of MemStores plus the Max Size of MemStore .  If you keep this low with memory in mind, many HFiles get created resulting in many compactions.  Not only that, the amount of HLog to keep also increases as Max MemStore Size. 

128 MB  
C4 HBase: Compact Threshold HFiles (hbase.hstore.compactionThreshold)

This knob controls when to run Minor Compaction (which indirectly controls Major Compactions).  

Each HFile (or store file) consumes resources  like file handles, heap for holding metadata, bloom filters, etc..

Not just that, no. of looks ups in bloom filters increase (though in memory) and so the reads become slow.

3  
C5 HBase: Max HFiles on disk for a region(hbase.hstore.blockingStoreFiles)

If there are already these many store files of a region on the disk, flusher thread is blocked.  If we block the flusher thread, eventually MemStore for the region fills up.  If the MemStore fills up, writes are blocked to that region

15  
C6 HBase: Minor Compaction Max HFiles
(hbase.hstore.compaction.max)

Max no. of files for minor compaction, it can be less but not more

10  
C7 HBase: Major Compaction Max HFiles
(hbase.hregion.majorcompaction)

Max no. of files for major compaction, it can be less but not more

  Click this
NA Combination Check

hbase.regionserver.global.memstore.lowerLimit <= (hbase.regionserver.hlog.blocksize *
hbase.regionserver.logroll.multiplier * hbase.regionserver.maxlogs)

NA Click this
NA Combination Check

hbase.hstore.blockingStoreFiles >= (2 * hbase.hstore.compactionThreshold)

NA  
C8 HBase: Block Cache Percentage (hfile.block.cache.size)

Block cache is a LRU Cache of blocks and is used by reads.  If your read requires more than Block cache, LRU policy is applied and old cache blocks are evicted.  High value of Block Cache reduces cache evictions, mostly improves read performance, but increases GC time.  Low value of Block Cache increases cache evictions, degrades read performance, but reduces GC time.  (Similar to Page Cache in RDBMS Systems)

0.25 Click this
C9 HBase: Client side Write Buffer Size (hbase.client.write.buffer)

HBase Client side buffer size for write caching.  Till this memory size is reached (or a flush is called), HBase Client does not send the writes to HBase Server.  High value increases the risk of data loss, but reduces load on HBase Cluster.  Low value reduces the risk of data loss but increases load on HBase Cluster.  This size also effects the HBase Region Server as that much data is sent by Client in single go and has to be processed by server.  Total memory requirement of HBase Server = hbase.client.write.buffer * hbase.regionserver.handler.count

2 MB Click this
NA HBase: Column Family Names, Qualifier Names

Keep them very short (1 to 3 characters).  People often refer to Row Compression (FAST_DIFF) as a savior to not worry about length of the names.  It is definitely true for on-disk representation (i.e. HFile).  However, when the data resides in memory MemStores, etc.  rows are stored in full form w/o any compression.  That means a lengthy names are going to waste lot of memory, resulting in less no. of rows.

NA Click this
C10 HDFS: Replication Factor None 3  

 

Now that we have discussed different configuration parameters let’s get into data points.  Data Point Values depend completely on your business scenario, HBase Table Schema, etc..

Data Point ID Data Point Name Data Point Description Data Point Value
D1 Seed Days No. of Days HBase pre-populated offline (seed data) before we bring it online 365 days
D2 Daily Row Count No. of HBase Rows that are inserted daily into the table 100 Million
D3 Row Size

Rough size of a row measured from your trials (heavily depends on on your schema, data value size, etc.).  Insert about 10K records in the table, issue flush and major compact from HBase Shell, then get HFile Size from HDFS.  Enable all the options like Row Compression, Bloom Filters, Prefix Encoding, Snappy Compression of Data, etc. for your exercise of measuring the row size. 

1024 Bytes
D4 Column Family Count No. of Column Families in your schema definition 2
D5 Data Write Throughput per Core

No. of rows that can be inserted in to the HBase Table per Core.  Our observation given in value column.   

There is also another reference: ~500 KB (93.5 MB/sec with 16 Nodes * 2 Processors / Node * 6 Cores / Processor)

(http://hypertable.com/why_hypertable/hypertable_vs_hbase_2/)

As you can see this is highly dependent on your schema.  For us, we could just observe only 250 KB per core.

250 KB

 

Let’s define hardware specification (per Node) (Example: Sun X42L Server)

 

Resource Name Actual Resource Specification Usable Resource Percentage Effective Available 
(= Actual Resource * Usable Resource / 100)
Disk No. of Disks * Disk Size = 10 * 24 TB = 240 TB 60 % 131481 GB (approx.)
CPU Cores No. of Processors * No. of Cores Per Processor = 2 * 4 = 8 Cores 60 % 4 Cores
Physical Memory (RAM) 240 GB 60 % 148 GB
JVM Memory

Hadoop stack is Java Code base and so all services are JVMs.  JVM is limited to Max Heap Size.  Max Heap Size is heavily dependent on how good Garbage Collection works. 
Till Java 6, the most reliable GC was Concurrent Mark Sweep (CMS) GC.    The Max Heap a CMS GC can work well was only 24 GB. 

With Java 7, G1 GC has been introduced.  G1 GC has exhibited to work well with 100 GB Max Heap.

http://blog.cloudera.com/blog/2014/12/tuning-java-garbage-collection-for-hbase/

80% 19.2 GB (CMS)
80 GB (G1)

 

Let’s calculate the resource requirements

Disk:

If we have to plan for 1 year ahead, Total No. of Days = Seed Days (D1) + 365 Upcoming Days = 730 Days

No. of Rows = No. of Days *  Daily Row Count (D2) = 730 * 100 Million = 73000 Million

Disk Size w/o Replication = No. of Rows * Row Size (D3) = 73000 Million * 1 KB = 73000 GB

Disk Size w/ Replication = Replication Factor (C10) * Disk Size w/o Replication = 3 * 73000 GB = 219000 GB

Memory:

No. of Regions = Disk Size w/o Replication / Max Region Size (C1) = 73000 GB / 15 GB = 4867

No. of MemStores (one MemStore per Region per Column Family) = No. of Regions * No. of Column Families (D4) = 4867 * 2 = 9734

Memory Size of MemStores = No. of MemStores * Max MemStore Size (C3) = 9734 * 128 MB = 1217 GB

Memory Size = Memory Size of MemStores / Heap Percentage (C2) = 1217 GB / 0.4 = 3043 GB

CPU:

Row Count per Second = Daily Row Count (D2) / No. of Seconds in a Day = 100 Million / (24 * 60 * 60) = ~1200 Rows

Incoming Volume per Second = Row Count per Second * Row Size (D3) = 1200 * 1 KB = 1200 KB

CPU Core Count = Incoming Volume per Second / Data Write Throughput per Core (D5) = 1200 KB / 250 KB = ~5 Cores (approx.)

 

Let’s find no. of nodes …

Node Count w.r.t. Disk = Total Disk Size (Required) / Usable Disk Size per Node = 219000 GB / 131481 GB = 2 (Ceiled)

Node Count w.r.t.  CPU = Total CPU Cores (Required) / Usable Core Count per Node = 5 Cores / 4 Cores = 2 (Ceiled)

IMPORTANT: Node Count w.r.t. Memory is limited by JVM Memory.  Please note that, we can not run multiple HBase Region Servers on a single node.  That is, one Region Server JVM per one Node.  That means, the memory usable on a node is a minimum of JVM Memory and Physical Memory.

Node Count. w.r.t. Memory (CMS GC) = Total Memory Size (Required) / Min (JVM Memory, Physical Memory) = 3043 GB / Min(19.2 GB, 148 GB) = 159(Ceiled)

Node Count. w.r.t. Memory (G1 GC) = Total Memory Size (Required) / Min (JVM Memory, Physical Memory) = 3043 GB / Min(80 GB, 148 GB) = 39 (Ceiled)

Node Count w.r.t. all resources = Max (Node Count w/ each Resource) = 39 (G1 GC), 159 (CMS GC)

As you can see it is the Memory that demands more nodes than disk.  Note also that, has G1 GC did not increase the ability to have a big JVM Heap – lot of memory on the node would have been lying idle.

 

Does virtualization (VMs) help?

HBase uses Log Structured Merge (LSM) Trees for its indexes in HFile.  LSM demands that disk is not shared with any other process. As a result, Hadoop experts do not recommend VMs as the disk gets shared between VMs and performance degrades very badly.  But wait, if we have multiple disks on the same that can help.  By multiple disk, I am not talking about virtual disk, but physical spindles.   If your hardware (bare metal node) has multiple disks spindles, and you can create VMs such a way that no two VMs share the same physical spindle/disk, then by all means one can go for virtualization.

 

Further work

This article does not discuss about Reads.  Reads are affected by C8 and C9 configurations.  But in our case, read workload did not change much of our capacity planning.

 

Thanks,

Laxmi Narsimha Rao Oruganti

 

Update-1: Memory Size calculation was wrong.  Should consider only Disk Size w/o Replication, but was considering Disk Size w/ Replication.  Fixed it now.

Apache Kafka: Case of mysterious rebalances

We (Dinesh Kumar Ashokkumar and I) have recently debugged another issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not an issue per se but learning things hard way which is a side effects of a Kafka design choice.

I am assuming you know about Apache Kafka.  If not, you may want to read my other post on Kafka which has short brief on it.

Problem: Clients of a topic rebalance every now and then, even if there are no connections or disconnections

Before we get into the details, let me lay down the context and conditions.

Apache Kafka uses Apache Zoo Keeper in different ways.  Brokers use it for state management, and partition leader uses it for election and Consumer Clients use it for detecting the connects and disconnects of other Consumer Clients.  It is Consumer Client usage that I am going to discuss here.

In Apache Kafka v0.8.2 (and prior versions), Consumer Clients are “thick” and “smart” clients in the sense that they coordinate between themselves for partition allocation (or assignment) among all the consumer connectors.

Apache Kafka High Level Consumer API, supports a single consumer connector to receive data of a given consumer group across multiple topics.  That means, if you have completely different topics but have same consumer group name, you can use one connector to receive the data from all the topics.  While this is a powerful feature when data has to be retrieved from multiple similar topics, it became a deadly feature for us.

Here are Zoo Keeper paths used by Kafka Consumers (kafka.consumer.ZookeeperConsumerConnector is the class that deals with zookeeper primarily):

Purpose Zoo Keeper Path Value
Consumer Partition Owner Registry /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] consumer_connector_id
Consumer Commit Offsets Registry /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] offset_counter_value
Consumer Identifiers Registry /consumers/[group_id]/ids/[consumer_connector_id]  

Every consumer creates an ephemeral node under “Consumer Identifiers Registry” tree and registers itself as a watcher on the tree root.  This helps a consumer client know about any new consumer client connects and any old consumer client disconnects.

As you can see this registry path is different from other registry paths of the consumer.  Especially, Identifiers Registry is not tied to a topic but others are.  This difference comes from the feature discussed above.  If a consumer client connects with a consumer group name plus a set of topics, then that consumer client needs to be informed of client connects and disconnects in that consumer group across any topic.  To facilitate that, Kafka team has chosen to not have topic in the Identifiers Registry Path.

Now, let’s dig little deep into the bug

Our topics are named aptly and we have put an effort to make sure the topic name clearly identifies its place in our architecture.

For a given topic, we have roughly 120+ partitions, and we chose to have separate consumer client (either a thread or a process) for each partition.  That means, for a given topic and a given consumer group, we have about 120+ consumer clients.

For a given topic, we have a main consumer and it connects to that topic with consumer group name as “main”.  Of course, there are other consumers of the same topic that connect to it with a different consumer group name.  Like this we have completely different topics and each has different consumer clients connecting to it.

Now, in this situation if any topic has a new client or an existing client has disconnected, it is perfectly fine to have a rebalance of consumer clients of that particular topic.  Instead what we have observed is that consumer clients of all topics have started rebalancing.  While it is easy to say now (post finding the issue), it was not the same when the rebalances are happening, esp. since we did not know that all are nodes are rebalancing.   While we were debugging the issue, we focused only on one topic and consumer group combo and tried to find a node is going into rebalance and never looked a macro picture.  But after few days of debugging, we observed that whole gamut of consumer clients are rebalancing not just of one topic.  We first suspected something seriously wrong with cluster (just a hunch, no real proof), so we have shutdown the Kafka cluster and brought it up.  The problem disappeared!, hooray!

Well the problem did not disappear for long, it started again when one client of a topic disconnected, all clients across all topics started rebalancing again.  We have understood that it is beyond a topic plus consumer combo logic that is tricking us in.

Feature or Trap?

This happens because of the Consumer Identifier Registry design.  We have 10+ topics, with 120+ consumer clients using the consumer group name “main” for each topic, tallies to 1200+ consumers using consumer group name “main”.  These many consumer clients get registered under Zoo Keeper path “/consumers/main/ids/” and every consumer client registers itself as a watcher on this whole tree.

If any consumer client of any topic disconnects, an ephemeral node of that consumer client under this Zoo Keeper path gets deleted and Zoo Keeper triggers all watchers of the tree path.  That means, all 1200+ consumer clients (minus just disconnected consumer client) get notified by Zoo Keeper.  Apache Kafka Client triggers rebalance as a response to this Watch event trigger.

As you can see, this is a well meaning feature for simplifying consumers to receive data from similar topics with same consumer group name has turned out to be disastrous for us.

What is the way out?

Solution is simple, just use a unique consumer group name.  Avoid generic consumer group names by all means.

We had put a very good time in naming topics but not in consumer group names.  This experience has led us to put good time in naming the consumer groups not only well, but unique.

Thanks,

Laxmi Narsimha Rao Oruganti

 

Updates:

Grammar corrections (September 22, 2015)

Consistency != Correctness

We had a healthy lunch table discussion on Consistency Vs. Correctness in context of data stores (ex: relation databases and NoSQL stores).  Where many times people assumed that if data store guarantees Consistency, the solution is correct.  I came from Database Engine development background can clearly distinguish the difference.  But I learnt today over lunch table discussion that how one can interpret the article discussing this domain differently by a non-database-engine developer.  Thanks to Mustafa Hussain who made me understand the other way of looking at things.

Coming to the main topic, there was a good article on CAP Twelve Years Later: How the “Rules” Have Changed.

Why do banks have their own Correctness system when Relational Databases provide Consistency?

Banking solution correctness involves many non-database parts.  Not just non-database parts, but non-software parts.  A simple ATM transaction is not equal to Database transaction.  Because, a database transaction involves making sure that ACID properties are adhered to data with in the database.  Where as an ATM transaction involves databases, teller machine, physical currency notes, dispenser, unpicked-money-pull-back, currency loading, physical book-keeping, etc.  As you can see, by just using database that guarantees Consistency, Banking solution does not become correct.  Much more need to be done and hence Consistency != Correctness.

Well, you may then ask:

Why should Financial Banks use costly Relational Databases over cheap NoSQL stores?

I am talking about a case where if brand new financial banking software were to be developed, why should it use Relation Databases over NoSQL Stores.

Note that, I am talking about NoSQL stores that chose to support Availability and Partition Tolerance and forfeited Consistency per famous CAP theorem that the above cited article revisited. 

While Availability is important in the ATM scenario author chose to describe in the above cited article, he did not mean to forfeit ‘C’ completely.  We still need consistency with in a database in a particular site (ex: ATM room) across rows, tables, etc.  For example, money transfer between two accounts using ATM requires the consistency guaranty from data store.  NoSQL stores of today do not provide it.  Hence, the author cited it as “Another aspect of CAP confusion is the hidden cost of forfeiting consistency” . 

So, NoSQL stores have their own strong scenarios but not necessarily they fit for all scenarios.  Banking is one such scenario. 

As cited article author discussed, when we have NoSQL Stores that support C and A, we can then think of Banking like workloads to move over to such stores. 

 

Thanks,

Laxmi Narsimha Rao Oruganti

Apache Kafka: Case of Large Messages, Many Partitions, Few Consumers

We (Dinesh Kumar Ashokkumar, Rushiraj Chavan, and I) have recently debugged an issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not peculiar and yet it is very interesting to find that we are ones to use Kafka with that configuration and have hit it.

Problem: Consumer does not receive data even when there is lot of lag in the partitions of a topic

Before we go into the bug details, let me lay down the basic design of Kafka.

Kafka is a distributed, fault-tolerant, persistent message queue (or a key-value store).  Kafka supports named queues namely topics.  Like other key-value distributed systems, key space is divided into partitions and incoming data of a topic is stored in different partitions.  Most important point to note in Kafka is that no. of partitions are configured by user explicitly and not dynamic, that is Kafka does not automatically split or merge partitions.  However, user can increase the partition count of a topic, but that might require a downtime for that topic.

From Broker side of view: A partition is owned by a broker (dubbed as: leader of a partition) and it takes the responsibility to replicate the partition with other brokers who act as replicas.  If you want to understand different replication models in distributed systems, take a detour.  In Kafka, there is no fixed synchronous and asynchronous replication.  User makes a choice and decides to go with synchronous and asynchronous replication.  Producer can enforce the broker behavior on broker replication using request.require.acks and producer.type = sync. 

From Producer side of view: A partition for an incoming message is identified by a key derived either from metadata if available or from message.  For now, you can trust that in-built Kafka key derivation is good enough to achieve event distribution of data among the partitions.

From Consumer side of view: A partition can only be assigned to a single consumer.  That is, data from a partition can not be retrieved by multiple consumers.  While this seems highly restrictive for a distributed system, taking a macro view where in we hide ‘partition’ concept and look at distributed queue level, there are still multiple consumers who would be able to retrieve in parallel.  There is a reason for this restriction from broker side of view, that broker does not have deal with synchronization with in a partition and can avoid all the over complications.  Because of this association of a single consumer for a partition, the no. of consumers is limited by the no. of partitions.  Note that, if the no. of consumers are less than the no. of partitions, then a single consumer would be assigned multiple partitions.  If no. of consumers are more than no. of partitions, then extra consumers would be just idling and do not receive any data.

In Kafka, maximum message size (= Size of Message + Size of Metadata) has to be configured and bounded.  It is controlled by different configuration parameters in different layers.  In Broker, it is message.max.bytes; In Consumer, it is fetch.message.max.bytes.

Now, let’s go little deep into details that lead to this bug.

For every Consumer Connector, client library internally forks a fetcher thread for each Broker.  Library also forks other threads such as fetcher manager threads, and a leader thread, but those are not relevant for this discussion.   A fetcher thread for each Broker is responsible to retrieve data from that Broker. Consumer knows which partitions are assigned to it, it then divides the partitions into a separate set for each broker based on which partition is lead by which broker.  Fetcher thread then takes the partition set of this consumer and the broker it is attached to and makes a data request.  Upon receiving a data request from a fetcher, Broker tries to package  one chunk for each partition (#BrokerResponsePacking).  Chunk size is nothing but message size.  Kafka Broker is limited to pack maximum of 2 GB data (Maximum value of Signed 32-bit integer) (#BrokerMaxResponseSize) in a single response to fetcher.

In this very scenario, we have hit the bug: https://issues.apache.org/jira/browse/KAFKA-1196

To understand the bug, let’s revisit some points:

– Each consumer is assigned a set of partitions on connecting to the cluster (assuming even distribution

No. of partitions assigned to a consumer f(p, consumer) = Total Partition Count of a Topic f (p, topic) / Total Consumer Count f(c, total)

– In the worst case, all partitions assigned to a consumer are lead by one single broker.  That means, one fetcher thread of that consumer connector requests all partitions from a single broker.  Broker then tries to respond to fetcher by packing one chunk for each partition

Broker Response Size f(b, response_size) = f(p, consumer) * Maximum Message Size f(m, max_size)

– As mentioned above (#BrokerMaxResponseSize)

f(b, response_size) <= 2 GB

Implies, f(p, consumer) * f(m, max_size) < 2 GB

Implies, f(m, max_size)  * f(p, consumer)  < 2 GB

Implies, f(m, max_size)  * f (p, topic)  /  f(c, total) < 2 GB

If one does not follow the above equation, he/she will into trouble.  Broker does not check 2GB, integer overflow happens.  Check the above bug for more details.

Why did we land into this trouble?

In our case, we have a topic that is consumed by different types of processes and hosts big messages (Max Message Size 300 MB).  One is the actual message processing system and so has to be highly scalable.  Hence, higher partition count has been chosen.  However, there are other consumers to the same topic (of course, different consumer group) which just do very light weight processing (ex: book keeping).  Book keeping takes very less CPU, so was hoping to run just one consumer for this group and for the whole topic.

Max Message Size = f(m, max_size) = 300 MB

Total Partition Count of a Topic = f(p, topic) = 360

Total Consumer Count  = 1 (for book keeping consumer group)

No. of partitions assigned to a consumer = f(p, consumer) = 360 / 1 = 360

Broker Response Size = f(b, response_size) = f(m, max_size)  * f(p, consumer)  = 300 MB * 360 = 108000 MB = 105 GB (approximately)

As you can see, broker response size is way beyond the max response size of 2 GB.  As a result, integer overflow happened in broker and that lead to in deterministic behavior in broker resulting in broker not sending any response.  Fetcher threads keep sending data requests but never get any response.  So, even though there are messages fetcher threads do not get any message.

What is the solution?

Thankfully, Kafka support message compression.  Thankfully again, max message size in all the above equations corresponds to compressed size.  Luckily, in our case the messages are text messages and the compression ratio was superb.  Our 300 MB message came to 50 MB after GZIP compression.   So, we have enabled compression.

We have also observed that most of the messages are not really big.  The typically 80-20 rules applied to message sizes in our case.  80% of messages are less than 10 MB (uncompressed), 20% messages are more than 10 MB (uncompressed).  So, we have split the data to go into two separate topics such as small-messages and big-messages.  Topic small-messages is configured to have many partitions (i.e. 360) and Topic big-messages is configured to have few partitions (i.e. 72).

Well the story is not over yet.  We then got into issues with Java Max Heap and Kafka Buffers.

As you know that Sun’s (now Oracle’s) JVM implementation requires Max Heap Size to be specified up front and heap can’t go beyond this.  Application has to account for the heap requirements of not just their application code but all the libraries it is using.

In case of Kafka Client, Consumer Connector allocates buffers for partitions assigned to it.   No. of buffers allocated for each partition is configurable using queued.max.message.chunks.

No. of buffers allocated for each Partition = f(buf, partition)

No. of buffers allocated for each Consumer Connector = f(p, consumer) * f(buf, partition)

Unfortunately, Kafka Client library does not take into account multi-threaded applications.  That is, if there is a Kafka application that has multiple threads each with its own Consumer Connector, then Kafka Client allocates buffers for each Consumer connector separately though they are all in same JVM and buffers are protected for thread-safety due to parallel access by fetcher threads and application thread.

No. of threads (and so consumers) per JVM = f(c, jvm)

No. of buffers allocated per JVM = f(c, jvm) * f(p, consumer) * f(buf, partition)

Total heap memory required by Kafka = No. of buffers allocated per JVM * Maximum Buffer Size = f(c, jvm) * f(p, consumer) * f(buf, partition) * f(m, max_size)

If you don’t account for this much of memory, you will keep getting into OutOfMemoryError even though your application memory requirements are quite less.

In our case of light weight book keeping application,

f(c, jvm) = 8, f(p, consumer) = 45, f(buf, partition) = 1, f(m, max_size) = 50 MB

Total heap memory required by Kafka = 8 * 45 * 1 * 50 MB = 18 GB (approximately)

Yes, that’s a shocking size, but that’s what we have to keep aside for Kafka to operate.  We just required 2 GB for application and so totally we have configured the Max Heap as 24 GB.

Thanks,

Laxmi Narsimha Rao Oruganti