Technical Story Telling: Oracle Data Cloud

 

I have been used to regular reorganizations (reorgs) as part of Microsoft stint.  After 15 Months into Oracle, just when I was wondering how come there are no reorgs in Oracle Corporation, I got a mail in a weeks time in May, 2014 that we have been reorganized into a new division named Oracle Data Cloud (ODC) headed by Oracle’s latest acquisition Blue Kai head Omar Tawakol.  Telepathy was in works, I suppose Smile.

Omar had a clear business vision on how the ODC should evolve and he has worked tirelessly to acquire another company Datalogix which augmented the product scope very well.  With three companies, namely Blue Kai – Datalogix – Collective Intellect, Oracle Data Cloud has got a solid product scope, huge market share and till today occupies leader quadrant. 

Vijay Amrit Agrawal has been recruited back into Oracle just after an year and has been tasked to setup a team in India under ODC, namely ODC India Team.  Here is a technical story that I have used while recruiting developers into Oracle Data Cloud.  Below, I have given a long story, but usually I have shortened the story appropriately based on the candidate’s experience and exposure.

 

Story – Blue Kai:

Google was a well known brand ~15 years ago.  Everyone who wanted to research about anything, went straight to google.com and searched.  It is this behavior that made Google a leader in digital ad domain.   As people visited the sites via Google search and results, Google knew exactly what the user was looking for and gave a high quality advertisements in the web sites ad panels or areas.  Google has been respected because of their non-intrusive simple-text ads because their quality was so good.  Around ~5 years back, FlipKart, Amazon, SnapDeal became well known brands.  Users who wanted to buy any product, went directly into these sites and searched leaving Google (or any other search engine) clueless on what the user was looking for.   Slowly, the ad quality of Google reduced as more and more sites became well known.  Google also became increasingly clueless, as browsers became advanced and helped the user hit the sites directly because they already have them in search history or have them bookmarked.  Google realized this soon and came-up with a new browser Google Chrome so that it can capture every site visited directly from browser w/o Google search.  While this worked for logged in users, it did not work for others.  Many sites feared sharing the sites search data to Google as they could be eaten by Google the giant.  Blue Kai realized this lack of trust between digital world is a huge opportunity to tap into.  Blue Kai came up with a smart data-sharing win-win proposition and silently worked with many online properties (sites) to buy into this business model.  Before we get into that business model, let’s ask few questions so that we can appreciate what is the unique business model that Blue Kai has offered that many bought into.

Would Amazon, FlipKart, Google share their search data to each other

Your answer would most probably be “No” to above questions.  But, with Blue Kai in the game, the answer is “Yes”!.  How? you may ask!

You share your data to me and I will give you all others data.  In this data sharing, the source site of the data is not maintained.  That is, when I share you others data I can’t tell you from where I got it.  Similarly, when I share your data to the world, I can’t tell them from where it is received.  But, I can tell you which user machine the search data is for.

Soon, Blue Kai has become the online data sharing hub for many online sites (guess, what that “may” means in number – millions of sites!).   OK, how does it work?

You go to Amazon.in and search for a TV.  You open another tab and hit FlipKart.  How would it be, if FlipKart displayed a set of TV Offers?  You open another tab and browsed a technical blog on Apache Kafka, and on a side panel Google displayed TV ads.  How would that be?

That’s the power that Blue Kai brings to all these sites.  It is not some offline data sharing, but real-time web-scale few micro seconds away sharing of the data.  Google has to process only its search data, Bing has to process its own search data, we at Blue Kai has to process every internet sites every search data in real-time and share the data with in few micro seconds to all others when asked.

Thanks to Blue Kai, Google ad quality has problem has been solved!  Once we have so much data, we surely know how to make money out of it.  We do charge the sites in this data-sharing model based on different business scenarios.

 

Story – Datalogix:

Chief Marketing Officers (CMOs) around the world started doubting the whole digital advertisement spend and it’s yield.  In case of TV ads, user’s attention is guaranteed as long as the user stays on the channel.  Where as, with non-intrusive ads in online world, it is not very clear for a CMO if the ad has been exciting for the users.  Google countered it with pay-per-click model of pricing where advertiser has to pay only when the ad is clicked and not just when it is merely displayed.  While this prove that user did see the ad based on click stats, it is still not clear if there is any increase in sales, especially if the advertisements are related to Offline world.  For example an ad like “Reliance Digital offers 35% discount on all Sony TVs”.  Where it is very hard to assess the offline store sales of Sony TVs  CMOs started asking why should we put in so much money if there is no increase in sales? If there is indeed increase in sales, what is the volume?  Is it worth?  How can one compute the net increase in sales as a result of digital ad campaign? 

Welcome to Datalogix, we at Datalogix solve this problem.  Datalogix is an interesting company in that it acts as a bridge between online and offline worlds.  Datalogix buys offline stores sales data in aggregated fashion without any identify of the buyer.  Here is an example record from Reliance Digital Store:

Store Location, Company Name, Product Name, Model, Week Number, Sales Volume

Kondapur, Sony, LED TV, Bravia 1234, 1, 10

Kondapur, Sony, Smart TV, Bravia 5678, 1, 5

Which store would not be happy to make money by giving such data which is not revealing any buyer identity?  Datalogix got this offline sales data from every offline store possible.

Now, it started pitching in to advertisement platforms such as Google, Bing, etc. that you share me your advertisement footprint, I would prove (or disprove) whether your advertisement footprints translates to online + offline sales.  Of course, advertisement platforms would be happy to be proved that digital advertisement works so that they can take this proof to CMOs. 

When a user browses any site where advertisements are displayed.  Assuming, the ads are by Google. Google would share the footprint of the ad such as

Location of Browsing Computer, Advertiser Company, Advertised Product Advertisement Served Time

Kondapur, Reliance Digital, LED TV, 2016-01-01 01:01

Kondapur, Reliance Digital, LED TV, 2016-01-01 02:02

Datalogix does a big-data-join between offline sales data and online advertisement foot print and proves (or disproves) if there is any correlation between online advertisement footprints volume vs. sales volume (by region).  This big-data-join, as you can see, involves aggregating data by region, product, week, etc. 

Advertiser can verify the Datalogix findings by talking to offline stores in the area that Datalogix proves/claims has seen increase in sales volume.    Datalogix being another company not associated with any advertisement platform is regarded well for its non-partiality and advertiser ability to verify the findings, make Datalogix a trust worthy.

We at Oracle Data Cloud are smart in making money.  We make money pre-advertisement by sharing the ad target data (Blue Kai) and we make money post-advertisement by helping prove sales yield and so advertisement quality (Datalogix). 

 

Story – Collective Intellect:

Blue Kai and Datalogix work very well as long as things are searched.  However, all that system fails if there is no searching involved, but just a textual discussion in online world – be it discussion boards, forums, social sites, etc.  That gap is filled by Collective Intellect.  Which I have covered in my previous blog post here.

 

(Disclaimer: Brands used in this post are just an example)

Technical Story Telling: Collective Intellect

 

Collective Intellect is a Boulder, CO, US start-up that Oracle has acquired in June, 2012.  I quit Microsoft and joined Oracle to work for this team in January, 2013.  Here is a technical story that I have used while recruiting developers into the team.

Story:

Bhargavi wanted to buy a Television (TV), so she went into doing a research by searching, reading articles, browsing different sites, different comparisons such as features, technologies.  She being a money-conscious person compared prices between different e-commerce sites such as FlipKart.com, Amazon.in, SnapDeal.com, etc.  She being a thorough researcher also researched which e-commerce sites are better, which seller is better, etc.  Finally, arrived at a TV model Samsung Smart LED TV 1357, e-commerce site, seller and purchased it.  Everyone around her appreciated her decision.

Bhavani also wants to buy TV, but she is not a researcher and trusts friends and family.  She reached out to her friends on Facebook about her desire to buy a TV as “Hi Friends, I want to buy a TV.  Any recommendations?”.  Bhargavi is a friend of Bhavani, saw the post and responded with details of her recent research, and recommendation of TV Samsung Smart LED TV 1357.

Saraswathi is another friend of Bhavani, saw this conversation and she chimed-in and responded about her ordeal  with he TV she has recently purchased.  “Hi Bhavani – I recently bought Sony Smart LED TV 2468 and I strictly recommend that you *not* go for this model”.  

 

Problem(s):

Businesses around the world want to improve their sales.  To improve their sales, they need to improve products.  To improve the products, businesses need feedback on their products.  What’s good and what’s bad about their product.  Businesses also want to reduce the damage due to their “bad” side of the product (of previous version).  Learning what’s not going good about their product in non-internet age was through feedback forms, etc.  In early internet age, businesses have sent e-mails after some time of the purchase (typically a month or two) to fill an online form.   The problem with this feedback is at the instant the form was filled, which might wary due to ongoing usage of product and it’s performance.  For example, a customer may be very happy after 1 month of usage.  But, after 6-months the same customer might be completely upset about the purchase because of other issues that have cropped up.  Knowing these issues and addressing them is very important for a business to succeed.  Reaching out periodically over an e-mail may not work out as it may be regarded as overreaching and or even spamming!.  The unhappy customers are vocal and not only the business has lost him/her as a customer, but because of their vocal nature potential future customers also refrain from their products.  In the current internet age, customers share their feedback in variety of ways – blogs, discussion boards and forums, review sites, ranking sites, social sites, etc. at the very instant they are unhappy about.  If a business can address the unhappy customer at the right time, damage can be controlled by a great margin.  Imagine United Airlines had a way to get notified about this video posted by their customer whose guitar was mishandled plus staff indifference towards the issue before going viral, how powerful would that be?

The problem with online world is there is too much of data for any company to handle.  Lot of that data is not relevant to a business.  Extracting out the relevant data (signal) out of so much data (noise) is a software problem and not a business problem (unless business is also about a software). 

Adding to this, issues of a product may not be global but local, may be due to the local environment or manufacturing site, or some other.  Aggregating and drilling into this information at ease would be a huge plus for any business.  

 

Solution:

Collective Intellects collects the textual data from all the online media such as blogs, news, forums, boards, review sites, social sites, etc.   Analyze all the data (mostly noise) to extract important information (signal).  In this process, the product drops most of the data as the online is full of conversations that are not related to businesses using Natural Language Processing (NLP) domain algorithms. 

In the above story, the discussion is around Entity “TV”.  Bhavani’s post contains “Purchase” language ,  Saraswathi’s response contains a “Support” language, Bhargavi’s response contains “Promotion” language.  It is these meanings that Collective Intellect identifies and then shares the information to TV Companies if they are our customers.   Customer’s can then route this information to appropriate departments with in their company, such as “Support” language events becoming a Support ticket, “Purchase” Language becoming a “Sales” lead, “Promotion” language becoming a “Loyalty” Program.

To give you an idea of this working in real world, have you observed this in Facebook?

Laxmi: I am completely fed-up with my Vodafone connection.  While my SIM Card works well, my wife’s SIM Card does not work in the same house.

Vodafone Customer Care:  Dear Laxmi, We are really sorry for the inconvenience caused.  Can you please share more details of the problem such as which SIM Card is working and which is not,  the locality, etc. so that we can dig deep.

Laxmi: Here are the details.  Working SIM: 1234567890, Failing SIM: 9876543210, Location: Hyderabad

Vodafone Customer Care:  Based on our backend analysis, we think that SIM Card is corrupted.  We dispatched a new SIM Card to you, please try and let us know. 

Laxmi: I received the SIM Card and it works well.  Thanks for resolving the issue.

Wondered, how can Vodafone Customer Care know that Laxmi has posted about them of all the Facebook users and also a particular post that is targeting them of all the posts Laxmi made?  The magic behind that is the products like Collective Intellect.

Technical Story Telling

I have been part of recruiting engineers into different engineering teams I have worked for or associated with.  During the interviews, we have to explain what a company and/or a product does/offers, being technical people we tend to use many concepts of the functional domain.  While it works in some cases, it does not work all the times.  Especially if the candidate is not from the same domain, this job-sale-pitch results in a flop-show than a hit-show.  We are all humans first before we become engineers, so I found story telling technique works for technical stuff very well.  Trying to create a story for all our technical stuff not only helps in recruitment but also in explaining our work/company/product to family, relatives, friends.  This is an introduction article for my blogs on Technical Story Telling.  Actual stories would come soon.

Thanks,

Laxmi Narsimha Rao Oruganti

HBase Capacity Planning – Part 1

HBase is a distributed, fault-tolerant, persistent, NoSQL store in Hadoop Stack.  Hadoop has been in the market for a long time now, but I could not find any simple guide for HBase Capacity Planning for our usage.  I am sure there exist great many articles on internet that discuss Hadoop Capacity Planning (including HBase), but somehow they did not cover the details I was looking for.  Here is my version of it. 

This is going to be lengthy blog post as I have tried to explain what’s happening inside HBase with each knob and side effects of changing it.  I would like to first convey my sincere thanks to all those who have blogged about different parameters and the their experiences playing with them for their requirements in their deployment.  With out them, this blog post is nothing.  Where possible, I have given a reference URL.  If I have missed any reference, please pardon me.

HBase is not just storage node, but also a compute node.  Hence, HBase Capacity Planning has to consider not just storage requirements but compute requirements too.  That means, all the standard resources such as: disk, CPU Cores, Memory, Network.  In fact, as you go through this blog you will realize that it is Memory that demands more capacity than disk size (at least, in our case).  But, Java G1 GC has come to rescue!

I am assuming you know how HBase works (esp. Regions, Region Servers, Column Family, HFiles, MemStore, HMaster, etc.).

Never the less, here are some basic details you should know.

– Each Column Family data is stored in a separate HFile (also called as store file).  Each Region, will have it’s own HFiles (at least one) and one MemStore

– With in a Region data gets reorganized from time to time with a process called ‘Compaction’ (like Checkpoints in Relational Database World).  Minor Compaction is a process of merging multiple small HFiles into one bigger HFile.  Major Compaction merges all HFiles of a Region into one big HFile. 

Read this if you can: http://hbase.apache.org/book.html#_compaction

I will first discuss different parameters of HBase and the design considerations around it to set a proper value.

Config ID Configuration Name (Property Name) Description and Design Considerations Optimal Value Reference URL
C1 HBase: Max Region Size (regions.hbase.hregion.max.filesize)

If we have very large value for Max Region Size (and so Max HFile Size), a major compaction can result in huge disk write that could be as much as Max Region Size. 

15 GB Click this
C2 HBase:: Heap Percentage (hbase.regionserver.global.memstore.lowerLimit)
(see also: hbase.regionserver.global.memstore.upperLimit, hbase.hstore.blockingStoreFiles)

This knob (lower limit) limits the memory to be used by MemStores.   Flush thread starts flushing the MemStores when memory usage is above lower limit.  It stops until the memory usage falls below the lower limit.  However, there is a catch, flusher thread also has to consider the no. of HFiles on disk (hbase.hstore.blockingStoreFiles). 

The upper limit controls when to block the writes to a region.

0.4 Click this and this 

C3 HBase: MemStore Cache Size before flush (in a way, Max MemStore Size)(hbase.hregion.memstore.flush.size)

As no. of regions grow, the no. of MemStores grow in equal proportion.  That means, memory requirement grows too due to no. of MemStores plus the Max Size of MemStore .  If you keep this low with memory in mind, many HFiles get created resulting in many compactions.  Not only that, the amount of HLog to keep also increases as Max MemStore Size. 

128 MB  
C4 HBase: Compact Threshold HFiles (hbase.hstore.compactionThreshold)

This knob controls when to run Minor Compaction (which indirectly controls Major Compactions).  

Each HFile (or store file) consumes resources  like file handles, heap for holding metadata, bloom filters, etc..

Not just that, no. of looks ups in bloom filters increase (though in memory) and so the reads become slow.

3  
C5 HBase: Max HFiles on disk for a region(hbase.hstore.blockingStoreFiles)

If there are already these many store files of a region on the disk, flusher thread is blocked.  If we block the flusher thread, eventually MemStore for the region fills up.  If the MemStore fills up, writes are blocked to that region

15  
C6 HBase: Minor Compaction Max HFiles
(hbase.hstore.compaction.max)

Max no. of files for minor compaction, it can be less but not more

10  
C7 HBase: Major Compaction Max HFiles
(hbase.hregion.majorcompaction)

Max no. of files for major compaction, it can be less but not more

  Click this
NA Combination Check

hbase.regionserver.global.memstore.lowerLimit <= (hbase.regionserver.hlog.blocksize *
hbase.regionserver.logroll.multiplier * hbase.regionserver.maxlogs)

NA Click this
NA Combination Check

hbase.hstore.blockingStoreFiles >= (2 * hbase.hstore.compactionThreshold)

NA  
C8 HBase: Block Cache Percentage (hfile.block.cache.size)

Block cache is a LRU Cache of blocks and is used by reads.  If your read requires more than Block cache, LRU policy is applied and old cache blocks are evicted.  High value of Block Cache reduces cache evictions, mostly improves read performance, but increases GC time.  Low value of Block Cache increases cache evictions, degrades read performance, but reduces GC time.  (Similar to Page Cache in RDBMS Systems)

0.25 Click this
C9 HBase: Client side Write Buffer Size (hbase.client.write.buffer)

HBase Client side buffer size for write caching.  Till this memory size is reached (or a flush is called), HBase Client does not send the writes to HBase Server.  High value increases the risk of data loss, but reduces load on HBase Cluster.  Low value reduces the risk of data loss but increases load on HBase Cluster.  This size also effects the HBase Region Server as that much data is sent by Client in single go and has to be processed by server.  Total memory requirement of HBase Server = hbase.client.write.buffer * hbase.regionserver.handler.count

2 MB Click this
NA HBase: Column Family Names, Qualifier Names

Keep them very short (1 to 3 characters).  People often refer to Row Compression (FAST_DIFF) as a savior to not worry about length of the names.  It is definitely true for on-disk representation (i.e. HFile).  However, when the data resides in memory MemStores, etc.  rows are stored in full form w/o any compression.  That means a lengthy names are going to waste lot of memory, resulting in less no. of rows.

NA Click this
C10 HDFS: Replication Factor None 3  

 

Now that we have discussed different configuration parameters let’s get into data points.  Data Point Values depend completely on your business scenario, HBase Table Schema, etc..

Data Point ID Data Point Name Data Point Description Data Point Value
D1 Seed Days No. of Days HBase pre-populated offline (seed data) before we bring it online 365 days
D2 Daily Row Count No. of HBase Rows that are inserted daily into the table 100 Million
D3 Row Size

Rough size of a row measured from your trials (heavily depends on on your schema, data value size, etc.).  Insert about 10K records in the table, issue flush and major compact from HBase Shell, then get HFile Size from HDFS.  Enable all the options like Row Compression, Bloom Filters, Prefix Encoding, Snappy Compression of Data, etc. for your exercise of measuring the row size. 

1024 Bytes
D4 Column Family Count No. of Column Families in your schema definition 2
D5 Data Write Throughput per Core

No. of rows that can be inserted in to the HBase Table per Core.  Our observation given in value column.   

There is also another reference: ~500 KB (93.5 MB/sec with 16 Nodes * 2 Processors / Node * 6 Cores / Processor)

(http://hypertable.com/why_hypertable/hypertable_vs_hbase_2/)

As you can see this is highly dependent on your schema.  For us, we could just observe only 250 KB per core.

250 KB

 

Let’s define hardware specification (per Node) (Example: Sun X42L Server)

 

Resource Name Actual Resource Specification Usable Resource Percentage Effective Available 
(= Actual Resource * Usable Resource / 100)
Disk No. of Disks * Disk Size = 10 * 24 TB = 240 TB 60 % 131481 GB (approx.)
CPU Cores No. of Processors * No. of Cores Per Processor = 2 * 4 = 8 Cores 60 % 4 Cores
Physical Memory (RAM) 240 GB 60 % 148 GB
JVM Memory

Hadoop stack is Java Code base and so all services are JVMs.  JVM is limited to Max Heap Size.  Max Heap Size is heavily dependent on how good Garbage Collection works. 
Till Java 6, the most reliable GC was Concurrent Mark Sweep (CMS) GC.    The Max Heap a CMS GC can work well was only 24 GB. 

With Java 7, G1 GC has been introduced.  G1 GC has exhibited to work well with 100 GB Max Heap.

http://blog.cloudera.com/blog/2014/12/tuning-java-garbage-collection-for-hbase/

80% 19.2 GB (CMS)
80 GB (G1)

 

Let’s calculate the resource requirements

Disk:

If we have to plan for 1 year ahead, Total No. of Days = Seed Days (D1) + 365 Upcoming Days = 730 Days

No. of Rows = No. of Days *  Daily Row Count (D2) = 730 * 100 Million = 73000 Million

Disk Size w/o Replication = No. of Rows * Row Size (D3) = 73000 Million * 1 KB = 73000 GB

Disk Size w/ Replication = Replication Factor (C10) * Disk Size w/o Replication = 3 * 73000 GB = 219000 GB

Memory:

No. of Regions = Disk Size w/o Replication / Max Region Size (C1) = 73000 GB / 15 GB = 4867

No. of MemStores (one MemStore per Region per Column Family) = No. of Regions * No. of Column Families (D4) = 4867 * 2 = 9734

Memory Size of MemStores = No. of MemStores * Max MemStore Size (C3) = 9734 * 128 MB = 1217 GB

Memory Size = Memory Size of MemStores / Heap Percentage (C2) = 1217 GB / 0.4 = 3043 GB

CPU:

Row Count per Second = Daily Row Count (D2) / No. of Seconds in a Day = 100 Million / (24 * 60 * 60) = ~1200 Rows

Incoming Volume per Second = Row Count per Second * Row Size (D3) = 1200 * 1 KB = 1200 KB

CPU Core Count = Incoming Volume per Second / Data Write Throughput per Core (D5) = 1200 KB / 250 KB = ~5 Cores (approx.)

 

Let’s find no. of nodes …

Node Count w.r.t. Disk = Total Disk Size (Required) / Usable Disk Size per Node = 219000 GB / 131481 GB = 2 (Ceiled)

Node Count w.r.t.  CPU = Total CPU Cores (Required) / Usable Core Count per Node = 5 Cores / 4 Cores = 2 (Ceiled)

IMPORTANT: Node Count w.r.t. Memory is limited by JVM Memory.  Please note that, we can not run multiple HBase Region Servers on a single node.  That is, one Region Server JVM per one Node.  That means, the memory usable on a node is a minimum of JVM Memory and Physical Memory.

Node Count. w.r.t. Memory (CMS GC) = Total Memory Size (Required) / Min (JVM Memory, Physical Memory) = 3043 GB / Min(19.2 GB, 148 GB) = 159(Ceiled)

Node Count. w.r.t. Memory (G1 GC) = Total Memory Size (Required) / Min (JVM Memory, Physical Memory) = 3043 GB / Min(80 GB, 148 GB) = 39 (Ceiled)

Node Count w.r.t. all resources = Max (Node Count w/ each Resource) = 39 (G1 GC), 159 (CMS GC)

As you can see it is the Memory that demands more nodes than disk.  Note also that, has G1 GC did not increase the ability to have a big JVM Heap – lot of memory on the node would have been lying idle.

 

Does virtualization (VMs) help?

HBase uses Log Structured Merge (LSM) Trees for its indexes in HFile.  LSM demands that disk is not shared with any other process. As a result, Hadoop experts do not recommend VMs as the disk gets shared between VMs and performance degrades very badly.  But wait, if we have multiple disks on the same that can help.  By multiple disk, I am not talking about virtual disk, but physical spindles.   If your hardware (bare metal node) has multiple disks spindles, and you can create VMs such a way that no two VMs share the same physical spindle/disk, then by all means one can go for virtualization.

 

Further work

This article does not discuss about Reads.  Reads are affected by C8 and C9 configurations.  But in our case, read workload did not change much of our capacity planning.

 

Thanks,

Laxmi Narsimha Rao Oruganti

 

Update-1: Memory Size calculation was wrong.  Should consider only Disk Size w/o Replication, but was considering Disk Size w/ Replication.  Fixed it now.

Apache Kafka: Case of mysterious rebalances

We (Dinesh Kumar Ashokkumar and I) have recently debugged another issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not an issue per se but learning things hard way which is a side effects of a Kafka design choice.

I am assuming you know about Apache Kafka.  If not, you may want to read my other post on Kafka which has short brief on it.

Problem: Clients of a topic rebalance every now and then, even if there are no connections or disconnections

Before we get into the details, let me lay down the context and conditions.

Apache Kafka uses Apache Zoo Keeper in different ways.  Brokers use it for state management, and partition leader uses it for election and Consumer Clients use it for detecting the connects and disconnects of other Consumer Clients.  It is Consumer Client usage that I am going to discuss here.

In Apache Kafka v0.8.2 (and prior versions), Consumer Clients are “thick” and “smart” clients in the sense that they coordinate between themselves for partition allocation (or assignment) among all the consumer connectors.

Apache Kafka High Level Consumer API, supports a single consumer connector to receive data of a given consumer group across multiple topics.  That means, if you have completely different topics but have same consumer group name, you can use one connector to receive the data from all the topics.  While this is a powerful feature when data has to be retrieved from multiple similar topics, it became a deadly feature for us.

Here are Zoo Keeper paths used by Kafka Consumers (kafka.consumer.ZookeeperConsumerConnector is the class that deals with zookeeper primarily):

Purpose Zoo Keeper Path Value
Consumer Partition Owner Registry /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] consumer_connector_id
Consumer Commit Offsets Registry /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] offset_counter_value
Consumer Identifiers Registry /consumers/[group_id]/ids/[consumer_connector_id]  

Every consumer creates an ephemeral node under “Consumer Identifiers Registry” tree and registers itself as a watcher on the tree root.  This helps a consumer client know about any new consumer client connects and any old consumer client disconnects.

As you can see this registry path is different from other registry paths of the consumer.  Especially, Identifiers Registry is not tied to a topic but others are.  This difference comes from the feature discussed above.  If a consumer client connects with a consumer group name plus a set of topics, then that consumer client needs to be informed of client connects and disconnects in that consumer group across any topic.  To facilitate that, Kafka team has chosen to not have topic in the Identifiers Registry Path.

Now, let’s dig little deep into the bug

Our topics are named aptly and we have put an effort to make sure the topic name clearly identifies its place in our architecture.

For a given topic, we have roughly 120+ partitions, and we chose to have separate consumer client (either a thread or a process) for each partition.  That means, for a given topic and a given consumer group, we have about 120+ consumer clients.

For a given topic, we have a main consumer and it connects to that topic with consumer group name as “main”.  Of course, there are other consumers of the same topic that connect to it with a different consumer group name.  Like this we have completely different topics and each has different consumer clients connecting to it.

Now, in this situation if any topic has a new client or an existing client has disconnected, it is perfectly fine to have a rebalance of consumer clients of that particular topic.  Instead what we have observed is that consumer clients of all topics have started rebalancing.  While it is easy to say now (post finding the issue), it was not the same when the rebalances are happening, esp. since we did not know that all are nodes are rebalancing.   While we were debugging the issue, we focused only on one topic and consumer group combo and tried to find a node is going into rebalance and never looked a macro picture.  But after few days of debugging, we observed that whole gamut of consumer clients are rebalancing not just of one topic.  We first suspected something seriously wrong with cluster (just a hunch, no real proof), so we have shutdown the Kafka cluster and brought it up.  The problem disappeared!, hooray!

Well the problem did not disappear for long, it started again when one client of a topic disconnected, all clients across all topics started rebalancing again.  We have understood that it is beyond a topic plus consumer combo logic that is tricking us in.

Feature or Trap?

This happens because of the Consumer Identifier Registry design.  We have 10+ topics, with 120+ consumer clients using the consumer group name “main” for each topic, tallies to 1200+ consumers using consumer group name “main”.  These many consumer clients get registered under Zoo Keeper path “/consumers/main/ids/” and every consumer client registers itself as a watcher on this whole tree.

If any consumer client of any topic disconnects, an ephemeral node of that consumer client under this Zoo Keeper path gets deleted and Zoo Keeper triggers all watchers of the tree path.  That means, all 1200+ consumer clients (minus just disconnected consumer client) get notified by Zoo Keeper.  Apache Kafka Client triggers rebalance as a response to this Watch event trigger.

As you can see, this is a well meaning feature for simplifying consumers to receive data from similar topics with same consumer group name has turned out to be disastrous for us.

What is the way out?

Solution is simple, just use a unique consumer group name.  Avoid generic consumer group names by all means.

We had put a very good time in naming topics but not in consumer group names.  This experience has led us to put good time in naming the consumer groups not only well, but unique.

Thanks,

Laxmi Narsimha Rao Oruganti

 

Updates:

Grammar corrections (September 22, 2015)

Consistency != Correctness

We had a healthy lunch table discussion on Consistency Vs. Correctness in context of data stores (ex: relation databases and NoSQL stores).  Where many times people assumed that if data store guarantees Consistency, the solution is correct.  I came from Database Engine development background can clearly distinguish the difference.  But I learnt today over lunch table discussion that how one can interpret the article discussing this domain differently by a non-database-engine developer.  Thanks to Mustafa Hussain who made me understand the other way of looking at things.

Coming to the main topic, there was a good article on CAP Twelve Years Later: How the “Rules” Have Changed.

Why do banks have their own Correctness system when Relational Databases provide Consistency?

Banking solution correctness involves many non-database parts.  Not just non-database parts, but non-software parts.  A simple ATM transaction is not equal to Database transaction.  Because, a database transaction involves making sure that ACID properties are adhered to data with in the database.  Where as an ATM transaction involves databases, teller machine, physical currency notes, dispenser, unpicked-money-pull-back, currency loading, physical book-keeping, etc.  As you can see, by just using database that guarantees Consistency, Banking solution does not become correct.  Much more need to be done and hence Consistency != Correctness.

Well, you may then ask:

Why should Financial Banks use costly Relational Databases over cheap NoSQL stores?

I am talking about a case where if brand new financial banking software were to be developed, why should it use Relation Databases over NoSQL Stores.

Note that, I am talking about NoSQL stores that chose to support Availability and Partition Tolerance and forfeited Consistency per famous CAP theorem that the above cited article revisited. 

While Availability is important in the ATM scenario author chose to describe in the above cited article, he did not mean to forfeit ‘C’ completely.  We still need consistency with in a database in a particular site (ex: ATM room) across rows, tables, etc.  For example, money transfer between two accounts using ATM requires the consistency guaranty from data store.  NoSQL stores of today do not provide it.  Hence, the author cited it as “Another aspect of CAP confusion is the hidden cost of forfeiting consistency” . 

So, NoSQL stores have their own strong scenarios but not necessarily they fit for all scenarios.  Banking is one such scenario. 

As cited article author discussed, when we have NoSQL Stores that support C and A, we can then think of Banking like workloads to move over to such stores. 

 

Thanks,

Laxmi Narsimha Rao Oruganti

Apache Kafka: Case of Large Messages, Many Partitions, Few Consumers

We (Dinesh Kumar Ashokkumar, Rushiraj Chavan, and I) have recently debugged an issue related to Apache Kafka v0.8.2 (also exists in prior versions).  The issue is not peculiar and yet it is very interesting to find that we are ones to use Kafka with that configuration and have hit it.

Problem: Consumer does not receive data even when there is lot of lag in the partitions of a topic

Before we go into the bug details, let me lay down the basic design of Kafka.

Kafka is a distributed, fault-tolerant, persistent message queue (or a key-value store).  Kafka supports named queues namely topics.  Like other key-value distributed systems, key space is divided into partitions and incoming data of a topic is stored in different partitions.  Most important point to note in Kafka is that no. of partitions are configured by user explicitly and not dynamic, that is Kafka does not automatically split or merge partitions.  However, user can increase the partition count of a topic, but that might require a downtime for that topic.

From Broker side of view: A partition is owned by a broker (dubbed as: leader of a partition) and it takes the responsibility to replicate the partition with other brokers who act as replicas.  If you want to understand different replication models in distributed systems, take a detour.  In Kafka, there is no fixed synchronous and asynchronous replication.  User makes a choice and decides to go with synchronous and asynchronous replication.  Producer can enforce the broker behavior on broker replication using request.require.acks and producer.type = sync. 

From Producer side of view: A partition for an incoming message is identified by a key derived either from metadata if available or from message.  For now, you can trust that in-built Kafka key derivation is good enough to achieve event distribution of data among the partitions.

From Consumer side of view: A partition can only be assigned to a single consumer.  That is, data from a partition can not be retrieved by multiple consumers.  While this seems highly restrictive for a distributed system, taking a macro view where in we hide ‘partition’ concept and look at distributed queue level, there are still multiple consumers who would be able to retrieve in parallel.  There is a reason for this restriction from broker side of view, that broker does not have deal with synchronization with in a partition and can avoid all the over complications.  Because of this association of a single consumer for a partition, the no. of consumers is limited by the no. of partitions.  Note that, if the no. of consumers are less than the no. of partitions, then a single consumer would be assigned multiple partitions.  If no. of consumers are more than no. of partitions, then extra consumers would be just idling and do not receive any data.

In Kafka, maximum message size (= Size of Message + Size of Metadata) has to be configured and bounded.  It is controlled by different configuration parameters in different layers.  In Broker, it is message.max.bytes; In Consumer, it is fetch.message.max.bytes.

Now, let’s go little deep into details that lead to this bug.

For every Consumer Connector, client library internally forks a fetcher thread for each Broker.  Library also forks other threads such as fetcher manager threads, and a leader thread, but those are not relevant for this discussion.   A fetcher thread for each Broker is responsible to retrieve data from that Broker. Consumer knows which partitions are assigned to it, it then divides the partitions into a separate set for each broker based on which partition is lead by which broker.  Fetcher thread then takes the partition set of this consumer and the broker it is attached to and makes a data request.  Upon receiving a data request from a fetcher, Broker tries to package  one chunk for each partition (#BrokerResponsePacking).  Chunk size is nothing but message size.  Kafka Broker is limited to pack maximum of 2 GB data (Maximum value of Signed 32-bit integer) (#BrokerMaxResponseSize) in a single response to fetcher.

In this very scenario, we have hit the bug: https://issues.apache.org/jira/browse/KAFKA-1196

To understand the bug, let’s revisit some points:

– Each consumer is assigned a set of partitions on connecting to the cluster (assuming even distribution

No. of partitions assigned to a consumer f(p, consumer) = Total Partition Count of a Topic f (p, topic) / Total Consumer Count f(c, total)

– In the worst case, all partitions assigned to a consumer are lead by one single broker.  That means, one fetcher thread of that consumer connector requests all partitions from a single broker.  Broker then tries to respond to fetcher by packing one chunk for each partition

Broker Response Size f(b, response_size) = f(p, consumer) * Maximum Message Size f(m, max_size)

– As mentioned above (#BrokerMaxResponseSize)

f(b, response_size) <= 2 GB

Implies, f(p, consumer) * f(m, max_size) < 2 GB

Implies, f(m, max_size)  * f(p, consumer)  < 2 GB

Implies, f(m, max_size)  * f (p, topic)  /  f(c, total) < 2 GB

If one does not follow the above equation, he/she will into trouble.  Broker does not check 2GB, integer overflow happens.  Check the above bug for more details.

Why did we land into this trouble?

In our case, we have a topic that is consumed by different types of processes and hosts big messages (Max Message Size 300 MB).  One is the actual message processing system and so has to be highly scalable.  Hence, higher partition count has been chosen.  However, there are other consumers to the same topic (of course, different consumer group) which just do very light weight processing (ex: book keeping).  Book keeping takes very less CPU, so was hoping to run just one consumer for this group and for the whole topic.

Max Message Size = f(m, max_size) = 300 MB

Total Partition Count of a Topic = f(p, topic) = 360

Total Consumer Count  = 1 (for book keeping consumer group)

No. of partitions assigned to a consumer = f(p, consumer) = 360 / 1 = 360

Broker Response Size = f(b, response_size) = f(m, max_size)  * f(p, consumer)  = 300 MB * 360 = 108000 MB = 105 GB (approximately)

As you can see, broker response size is way beyond the max response size of 2 GB.  As a result, integer overflow happened in broker and that lead to in deterministic behavior in broker resulting in broker not sending any response.  Fetcher threads keep sending data requests but never get any response.  So, even though there are messages fetcher threads do not get any message.

What is the solution?

Thankfully, Kafka support message compression.  Thankfully again, max message size in all the above equations corresponds to compressed size.  Luckily, in our case the messages are text messages and the compression ratio was superb.  Our 300 MB message came to 50 MB after GZIP compression.   So, we have enabled compression.

We have also observed that most of the messages are not really big.  The typically 80-20 rules applied to message sizes in our case.  80% of messages are less than 10 MB (uncompressed), 20% messages are more than 10 MB (uncompressed).  So, we have split the data to go into two separate topics such as small-messages and big-messages.  Topic small-messages is configured to have many partitions (i.e. 360) and Topic big-messages is configured to have few partitions (i.e. 72).

Well the story is not over yet.  We then got into issues with Java Max Heap and Kafka Buffers.

As you know that Sun’s (now Oracle’s) JVM implementation requires Max Heap Size to be specified up front and heap can’t go beyond this.  Application has to account for the heap requirements of not just their application code but all the libraries it is using.

In case of Kafka Client, Consumer Connector allocates buffers for partitions assigned to it.   No. of buffers allocated for each partition is configurable using queued.max.message.chunks.

No. of buffers allocated for each Partition = f(buf, partition)

No. of buffers allocated for each Consumer Connector = f(p, consumer) * f(buf, partition)

Unfortunately, Kafka Client library does not take into account multi-threaded applications.  That is, if there is a Kafka application that has multiple threads each with its own Consumer Connector, then Kafka Client allocates buffers for each Consumer connector separately though they are all in same JVM and buffers are protected for thread-safety due to parallel access by fetcher threads and application thread.

No. of threads (and so consumers) per JVM = f(c, jvm)

No. of buffers allocated per JVM = f(c, jvm) * f(p, consumer) * f(buf, partition)

Total heap memory required by Kafka = No. of buffers allocated per JVM * Maximum Buffer Size = f(c, jvm) * f(p, consumer) * f(buf, partition) * f(m, max_size)

If you don’t account for this much of memory, you will keep getting into OutOfMemoryError even though your application memory requirements are quite less.

In our case of light weight book keeping application,

f(c, jvm) = 8, f(p, consumer) = 45, f(buf, partition) = 1, f(m, max_size) = 50 MB

Total heap memory required by Kafka = 8 * 45 * 1 * 50 MB = 18 GB (approximately)

Yes, that’s a shocking size, but that’s what we have to keep aside for Kafka to operate.  We just required 2 GB for application and so totally we have configured the Max Heap as 24 GB.

Thanks,

Laxmi Narsimha Rao Oruganti

Distributed File System – Part 1 (Retake)

This blog post series tries to take the user from traditional one-node file system to a distributed file system. This is first post of the series.

File System (of a scale-up world or in a single node):

The basic responsibilities of file system are:

  1. Support user objects and operations (outwardly)
    1. Directories – Create, Delete, Rename
    2. Files – Create, Rename, Read, Write, Expand, Shrink, Truncate, Delete
  2. Manage Disk Space (inwardly)
    1. Structure and organization of data on disk
    2. Allocation and de-allocation of disk space

* There are many more responsibilities that file systems own.  Such as: file permissions, symbolic links, etc.  They are all deliberately excluded for keeping things short (Need I say sweet Smile).

File System metadata typically contains:

  • Which data blocks constitute a file and their order
  • Which directories contains which files and the hierarchy

Let us take an example to get clarity.  In case of, Unix-like OS file systems:

  1. The very first block on the disk is usually Master Boot Record (MBR)
    1. LILO and GRUB are the popular boot loaders
  2. MBR contains the details of Disk Partitions (namely: drives)
  3. Disk Layout: MBR | Partition 1 | Partition 2 | …

image

Figure 1: Disk Layout

At disk level, VFS (virtual file system) operates.  It then mounts the drives (based on the file system to which the drive is setup for).

File System divides the whole drive space into blocks. The block is usually configurable and is part of OS installation configuration. The typical block size is 4 KB (for long time), recently it has moved up to 8 KB. Of these blocks, some blocks are used for file systems own metadata and the rest can be left for use by applications (or data).

image

Figure 2: ext2 File System – Disk Partition (or Drive) Layout

image

Figure 3: ext2 File System – Block Group Layout

  1. Drive Layout: Super Block | Block Group 1 | Block Group 2 | …
    1. SuperBlock: Contains the file system info, and drive info as a whole (like what type of file system it is, how many blocks are in use, how many blocks are free)
  2. Block Group Layout: Group Descriptors | Block Bitmap | iNode Bitmap | iNode Table | Data Blocks
    1. Group Descriptors: number of free and used blocks in this block group, number of free and used iNode count, block number of Block Bitmap, iNode Bitmap
    2. Block Bitmap: Each bit represents that particular block is free/used
    3. iNode Bitmap: Each bit represents that particular iNode is free/used
    4. Data Blocks

  1. Each file system object (directory, file, symbolic link) are represented in a metadata structure named iNode.
  2. Internally all iNodes are addressed by numbers (namely, iNode Number) – starting with 1
  3. iNode structure typically contains:
    1. Block Pointers – 12 Level 0, 1 Level 1, 1 Level 2, 1 Level 3
      1. Level 0 – 12 Pointers to Data Blocks
      2. Level 1 – Pointer to Block of Pointers to Data Blocks
      3. Level 2 – Pointer to Block of Pointers to Blocks of Pointers to Data Blocks
      4. Level 3 – Pointer to Block of Pointers to Blocks of Pointers to Blocks of Pointers to Data Blocks
    2. In case of Directory, Data Blocks contain details of immediate sub-directories and files.  For each item, there is an iNode number
    3. In case of Files, Data Blocks contain actual user data

Since, the first directory to create in the system is root “/”.  It usually comes in the very beginning (I think, iNode Number is 2).

The work flow to open a file “/usr/laxminro/olnrao.txt”

  • Get the iNode for “/”
  • Find the Data Block details from this iNode
  • From “/” Data Block, find the iNode Number of sub-directory “usr”
  • Get the iNode for “usr” (with iNode Number found above)
  • Find the Data Block details from this iNode
  • From “usr” Data Block, find the iNode Number of sub-directory “laxminro”
  • Get the iNode for “laxminro” (with iNode Number found above)
  • Find the Data Block details from this iNode
  • From “laxminro” Data Block, find the iNode Number of file “olnrao.txt”
  • Get the iNode for “olnrao.txt” (with iNode Number found above)
  • Find the Data Block details from this iNode
  • These data blocks contains the actual data (i.e. contents of “olnrao.txt” file)

Now that we have talked at very high level what a typical File System does and how it has implemented, let us talk about file system resources and their limitations (esp. Hard disks), usage patterns, and design choices made accordingly.

  • Hard disks are mechanical devices
    • Disk space was a scarce resource
      • Disk fragmentation was a serious concern
    • Disk bandwidth was and is a scarce resource
    • Seek time is in order of milliseconds
      • Random Reads and Random writes incur seek
    • Small writes and small reads were the norm

Applications tried to make sure they use very less disk space and bandwidth.  So, amount of data read from and written to disk was small.  Note that, each read or write could potentially incur a seek.  As a result, file systems introduced buffering.  Read buffering to serve the next possible read from buffers.  Write buffering to make sure to accumulate enough data before writing to disk.  Buffering also helped order the writes in such a way that seek is in one direction than to and fro.  Database Systems (esp. RDBMS) are one of the heavy users of file systems and if one delves into design choices made in RDBMS, one can easily how much a file system design choice and hardware limitations need to be dealt.  I will take a short detour to get a sense:

  • B+ Trees were the common form of on-disk storage of tables and indexes
    • For the record, heap based tables and hash based indexes do exist
  • Within DB file, space is allocated in terms of extents.  An extent is about 32/64/128 KB.
    • Extent system assumes that disk space is together (ex: on same platter)
    • Each extent is usually exclusive to a B+ Tree so that
    • Read and Write of a B+ Tree data is collocated as extent is on same platter, otherwise one page could be on one platter and another page of B+ Tree could be on different platter
  • A typical transaction adds/updates one/two rows in different tables
    • Ex: An order made by a customer, would result in one Order Table row, few Order Details Table rows, Payments table row, etc.
    • Even with extent model this above typical usage pattern demands that we need to touch multiple B+ Trees, which means small writes in different locations
    • Solution: Write ahead logging – Log files serve multiple purposes
      • Bringing ‘append’ and ‘chunk’ semantics of file system usage
      • Atomic way of committing or aborting a transaction
        • Either Commit Transaction or Abort Transaction
        • Atomicity is still possible with other techniques such as ‘Shadow Paging’ – but they are not successful because of limitations with Hard disks
      • Actual B+ Tree structures on-disk are updated as part of Checkpoint

Pardon me for the detour, coming back to disks.  We discussed mostly about optimizations on ‘write’.  At scale and for reads, the introduction of ‘cache tier’ came into play.

Apart from the heavy user like RDBMS, many applications use file system in different ways and most of the time write buffering and read buffering helped contain the problem.  It is also important to note that general file systems served wide variety of applications, work loads, and hence their design choices were limited.  That may be the reason, there are many file systems that have been designed for the tailor workloads.

The newer persistent memory technologies such as ‘Solid State Drives’, an electronic device, did not make things any better.  Why? you may ask.  While random reads are not a concern, random writes are.  Random small writes incur ‘erase block’ semantics of SSD.  Erase block is costly because it requires charge pump.  There are other problems such as Wear Leveling, etc.  This whole story is famously known as Write amplification.

Interested readers may read the following to get a pulse of hot trends in persistent memory world: Phase-change memory, Memristor,

File Systems continues to be an area of research with newer storage devices coming up.

Hopefully that has given the gist of a File System in one-node world.  That’s all for now, shall come back soon with more on the same topic.  Thanks for reading.

 

Thanks,

Laxmi Narsimha Rao Oruganti (alias: LaxmiNRO, alias: OLNRao)

Distributed and Consistent Hashing – Part 3

Windows Azure Cache (WA Cache) is an distributed in-memory cache. WA Cache provides a simple <Key, Value> based API. Like, Cache.Put (key, value), Cache.Get (key). You can correlate WA Cache API to that of a Hash. This blog post series tries to take the user from traditional one-node Hash to a distributed Hash. This is third post of the series (and assumes you have read first and second posts).

Distributed Systems are gaining momentum because of its promise to economies of scale.  The economy is possible due to the use of ‘commodity’ hardware.  The commodity hardware is cheap, but they pose higher degree of failure when compared to reliable (and costly) hardware.  As a result, distributed systems have been designed to work with different types of failures. 

Hardware Failures: Electrical Power Supply outages (#1),  Power Socket Failures, Network Switch Failures, Network Router Failures, Network Card Failures, Hard Disk Failures, Memory Chip Failures.

Hardware Failures in Windows Azure are discussed in Inside Windows Azure – Build 2011 Talk

Software Failures: Crashes due to bugs

Failures due to misconfiguration : Simple network misconfigurations can put node/rack/datacenter network communication in trouble.

#1 Electrical Power Supply Outages:

While Uninterrupted Power Supply (UPS) helps in case of power failures, it can only help as an alternative power source for short time (~1 hour).  Any long power outage or when UPS is not available, this problem needs to be dealt at upper layers. 

At hardware level: Redundancy is the mantra for many of the problems – redundancy in power supply, redundancy in communication paths, etc. 

At software level: Replication of data, replication of responsibility, and right integration with hardware redundancy.

Solutions must be well integrated across layers to get best results (or sometimes even the desired results).  The importance of integration becomes evident with an example.

Let us say a web server is deployed on two nodes.  So in case one node faults there exists another node to serve the requests.  If both these nodes are connected to same power socket, any fault in power socket would result both the nodes go down at the same time.  This essentially means that in spite of having two web server nodes, we landed in trouble.  Imagine if we have a way to make sure that two nodes are always connected to distinct set of resources, it would be much better model. 

The system of categorizing resources into group/set is thus necessary.  These groups are called Fault Domains (FD).  No two FDs share same power source, network switch, etc.  With these FDs in picture, at software level any redundancy system just have to make sure to place the redundancy across FDs. 

We have discussed redundancy as a solution at software layer to deal with faults.  In case of stateless software programs, just having another node would be sufficient.  Where as in case of stateful software programs, there is much more to be done.  For example, database systems.  Traditionally in scale-up world, RAID systems were used to make sure to protect against bad sectors (checksums), hard disk crashes (multiple copies on different disks), etc.  RAID storage is costly so can’t be the choice for distributed systems.  The other scale-up world technique has been data replication.  Replication is typically the place where FD knowledge is required to place copies of data in different FDs.

The moment replication comes into discussion it is important to call out the terminology:

Primary – The node is responsible to ‘own’ replication and interface with client.  Primary is usually only one.

Secondary – The node is responsible to ‘cooperate’ replication.  There can be multiple secondary nodes. 

Replication is a vast subject, but I will keep it short.  Depending on how Primary and Secondary agree to replication there are multiple methods.

Asynchronous Replication – In this model, changes in primary are committed and client is acknowledged without waiting for the secondary nodes to be updated.  Replication with secondary is either triggered or a background task takes up the responsibility of bringing secondary nodes up-to-date.  In short, replication happens asynchronously.  In this mode, if primary node fails, there could be data loss as secondary nodes are not up-to-date.  If the data is not super critical and ok to lose, this model is apt.

Synchronous Replication – In this model, for every change in primary it is synchronously replicated to secondary. Till secondary responds, changes on primary are not ‘committed’ (and so, client is not acknowledged). In short, replication happens synchronously. If secondary node is down, writes are blocked till the node is brought back up.

The no. of copies to be maintained is referred as ‘Replication Factor’.

If data is important, admin would opt for synchronous replication.  The higher the replication factor, the better fault tolerance.  But keeping client on hold till all secondary copies are updated makes an admin to chose lower replication factor.  It can be argued that admin is forced to think deeply about the replication factor with this model.  There are places where, one might need higher replication factor but not at the risk of increased operation times.  There comes the need for mixture of both synchronous and asynchronous replication models, namely Hybrid Replication.  In this model, one can chose no. of secondary nodes to be in synchronous mode and no. secondary nodes to be in asynchronous mode.  Here again, there are two choices where one can designate ‘fixed set’  of secondary nodes for synchronous replication or live with ‘any N’ secondary nodes to acknowledge.

It is also important to note that, a node can act as both primary and secondary for different replication units.  In case of database systems, each database is a replication unit.  So, a node can act as primary for database 1 and as a secondary for database 2. 

In case of Routing Client, client would typically know the primary to reach out for each replication unit.  Some systems allow clients to read from secondary.  Depending on which secondary nodes a client is allowed to read from, results in different levels of consistency.  Werner Vogels wrote great blog about different consistency models (Blog Post, Wikipedia Link).

Primary and Secondary communicate as part of replication and there are multiple models.

Pipeline model – The replication data flow is like a chain – Primary sends the data to first secondary node, first secondary nodes then sends the data to next secondary and so on so forth.  Windows Azure Storage, Hadoop Distributed File System use this model

Multi-Unicast Model – Primary sends replication data to all ‘synchronous secondary’ nodes separately and waits for acknowledgement from everyone

Multicast Model – Incase of hybrid replication with ‘any N’ acknowledgements – Primary sends replication data to all ‘secondary’ nodes (both synchronous and asynchronous secondary nodes) and wait for any N secondary nodes to acknowledge.  The set of N nodes that acknowledge vary for every data block or chunk or packet.

One major advantage of Windows Azure Cache over MemCacheD is it supports High Availability (the public name for replication).  Windows Azure Cache supports synchronous replication model (and no. of secondary nodes is fixed to 1 – and communication model is multi-unicast) and each partition is a replication unit. Cache is in an in-memory system and so the replication is limited to in-memory replication.   And, that is the catch!.  In any stateful system, a node reboot does not lose the state as the state is persisted on disk (either locally or remotely).  However, in case of in-memory system like Windows Azure Cache – a node reboot results in state loss.  Synchronous replication and node-reboot-leads-to-state-loss made us (Windows Azure Cache) to let clients commit when all secondary nodes are down as they don’t have any data that can be said would go out-of-date by allowing writes.  Windows Azure Cache (as on 1.8 SDK release) does not allow client to read from secondary nodes.

Many a times cache is used for data that need to be processed.  Processing involves code to be run.  It is a well known and established fact that keeping the code and data near makes systems complete tasks fast. 

In case of database systems, stored procedures help bring the code (or business logic) near to data. 

In Windows Azure Cache, we have Append, Prepend, Increment, Decrement API to help process value.  It would have been lovely if we had ‘stored procedure’ model instead of these individual API.  That way, any processing can be pushed to these ‘stored procedures’ and we could have simply shipped Append, Prepend, Increment, Decrement as ‘Microsoft owned’ stored procedures.

This is the last post of the series “Distributed and Consistent Hashing”.  Here are the important lessons/techniques to remember (deliberately explained in generic terms so as to carry forward them to use in other discussions).

– Data Nodes – The cache server nodes that actually store data

– Control Nodes or Metadata Nodes – Partition Manager is one such control node we have discussed that helps manage the data nodes and their partition ownership. 

– Routing Clients – Make client intelligent to talk to data nodes directly without the need to go thru control node

– Read from Secondary – Load balance, allowing different levels of consistency,

– Code and Data Distance – If code and data are near, tasks can be completed faster 

 

That is all for now, will come back with another series on taking a system from scale-up world to scale-out (or distributed) world.

  

Thanks,

Laxmi Narsimha Rao Oruganti (alias: LaxmiNRO, alias: OLNRao)

Integration – [Processor, Memory] Vs. [Visiting Places, People]

Today I want to try out explaining the integration between processor and memory using visiting places and people as the reference point.

Have you ever observed the in and out queues at various visiting places like large Zoo, famous Temple?  Can you reason out why they are the way they are? 

Zoo – Multiple entry gates and multiple exit gates

Temple – One entry gate and one exit gate

Why does not a temple have multiple gates?  Why does not a Zoo have only gate?

Zoo – After the entry gate, the possible ‘views’ are many.  One can go to animals view, another can go to birds view, yet another can go to trees view.  The more the views possible, the better consumption of people into views.  So, allowing more people at a stretch does not hurt but makes the system better.  Less entry gates only increase the queue lengths and results in insufficient Zoo usage.

Temple – The only ‘view’ is holy deity.  There is only one ‘view’.  So, allowing more people is going to make the situation worse.  You know how good the humans are at self disciplineSmile (Of course, there are exceptions). 

What does that observation bring, the in-flow and out-flow must be designed with actual ‘view’ or ‘consumption’ system in mind.  Any superior in-flow (many entry gates) designed without thinking of main consumer (temple) is going to create a mess. Any inferior in-flow (few entry gates) when main system (Zoo) is heavy consumption ready reduces the usage efficiency.

When it comes to computers, processor and memory are designed the same way. 

Processors are designed with ‘words’ pattern than ‘bytes’ pattern.  For example, you hear 16-bit processors, 32-bit processors, 64-bit processors.  So, 16-bit, 32-bit, 64-bit are words.  Processors process a word at a time.  The registers, algorithmic logic unit, accumulator, etc. all are in sync with ‘word’ pattern. 

Let us come to the memory and see a bit more into it. 

Byte addressable memory is a memory technology where every byte can be read/written individually w/o requiring to touch other bytes.  This technology is better for software as multiple types can be supported with ease.  For example: extra small int (1 byte), small int (2 bytes), int (4 bytes), long int (8 bytes), extra long int (16 bytes) can all be supported with just ‘length of the type in memory’ as design point.  No alignment issues, like small int must be on 2-byte address boundary, int must be on 4-byte address boundary, and so on so forth.  Surely, from the software point of view, byte addressable memory is a right technology.  But this memory is a bad choice for processor integration. 

Word addressable memory is a memory technology where one can read/write only word at a time.  This is better for processor integration as processors are design with ‘word’ consumption.  But, it suffers from memory alignment issue being surfaced to software and have to be dealt at these layers.  They also bring challenges like Endianness problem with different ‘word’ patterns (in processors).

From processor-memory integration, ‘word addressing’ wins.  From software-memory integration, ‘byte-addressing’ wins.

Hardware is manufactured in factories (and is hard to change post the fabrication).  Where as, software is more tunable/changeable/adaptable – change one line of code and recompile,  the change is ready in hands (deployment is a separate issue though).  So, the choice is on our face.  That is, choose the right memory for processors and let the problems be solved at upper layers like software, compilers.

So, compilers came up with techniques like padding.  Compilers also support packing to help developers make choice and override compiler inherent padding behaviors.

With all that understanding, let us take an example of simple primitive and reason to understand all these design choices.

Memory Copy:  Copy byte values from one memory location to another memory location

Signature: memcpy(source, sourceOffset, target, targetOffset, count)

It is very common for any program require copying of bytes from one location to another location (network stack is famous example).  In a simplistic code, memory copy primitive should be like (data types, bounds checking, etc. are excluded for brevity):

for (int offset = 0; offset < count; offset++) 
    target[targetOffset + offset] = source[sourceOffset + offset]

As a software programmer w/o knowing underlying design details, this looks like correct and performant code.  Well, software engineers are smart Smile and would love to learn.  We know that SDRAM is the memory technology, and the hardware is ‘word’ based.  That means, even if I were to read byte at address ‘x’ – the underlying hardware is going to fetch ‘word’ at a time into processor.  Processor then extracts the required byte (typically using ALU registers) from that word and passes the byte to the software program. 

What does this mean to the above code?

Assume source, target offsets are aligned on word boundary.  Let us say, word is 64-bit.

When for loop offset = 0, target memory location bytes from sourceOffset + offset to sourceOffset + offset + 8 are read (that is, one word).  Because the software requires (or asks) only first byte, it is extracted and other bytes are thrown away.  Again when for loop offset = 1, same location is read again from RAM, but a different byte (second byte) is extracted and given to software.  So on so forth, till offset = 7.

So, for offset = 0 to offset = 7 – the code is inherently reading the same word from RAM for 8 times.  So, why not fetch only once and use it in a single shot. Well that is what, memcpy primitive code does (a learned programmer’s code).  Here is a modified version:

// Copy as many ‘words’ as possible

for (int offset = 0; offset < (count – 8); offset+= 8)
    *(extra long int *) (target + targetOffset + offset) = *(extra long int *) (source + sourceOffset + offset)

// Copy remaining bytes that do not complete a ‘word’

for (/* continue offset value */; offset < count; offset++)
    target[targetOffset + offset] = source[sourceOffset + offset]

 

Well in reality, the memcpy code is not as simple as above.  Because, target and source offsets might be such a way that they are not word aligned.  If I am not wrong, memcpy could actually have assembly code directly (and some implementation does have assembly code).  After all, it is all about mov (to move word), and add (to increment offset) instructions (I remember my 8086 assembly programming lab sessions!).

This padding and packing also are super-important when one is worried about performance.  Padding helps in having the content/data/variables aligned.  Otherwise, efficient code like above won’t be useful at all and results in performance issues.

That is all for now, thanks for reading.  If you like it, let me know through your comments on blog.  Encouragement is the secret of my energy Smile.

Thanks,

Laxmi Narsimha Rao Oruganti (alias: OLNRao, alias: LaxmiNRO)