Distributed and Consistent Hashing – Part 3

Windows Azure Cache (WA Cache) is an distributed in-memory cache. WA Cache provides a simple <Key, Value> based API. Like, Cache.Put (key, value), Cache.Get (key). You can correlate WA Cache API to that of a Hash. This blog post series tries to take the user from traditional one-node Hash to a distributed Hash. This is third post of the series (and assumes you have read first and second posts).

Distributed Systems are gaining momentum because of its promise to economies of scale.  The economy is possible due to the use of ‘commodity’ hardware.  The commodity hardware is cheap, but they pose higher degree of failure when compared to reliable (and costly) hardware.  As a result, distributed systems have been designed to work with different types of failures. 

Hardware Failures: Electrical Power Supply outages (#1),  Power Socket Failures, Network Switch Failures, Network Router Failures, Network Card Failures, Hard Disk Failures, Memory Chip Failures.

Hardware Failures in Windows Azure are discussed in Inside Windows Azure – Build 2011 Talk

Software Failures: Crashes due to bugs

Failures due to misconfiguration : Simple network misconfigurations can put node/rack/datacenter network communication in trouble.

#1 Electrical Power Supply Outages:

While Uninterrupted Power Supply (UPS) helps in case of power failures, it can only help as an alternative power source for short time (~1 hour).  Any long power outage or when UPS is not available, this problem needs to be dealt at upper layers. 

At hardware level: Redundancy is the mantra for many of the problems – redundancy in power supply, redundancy in communication paths, etc. 

At software level: Replication of data, replication of responsibility, and right integration with hardware redundancy.

Solutions must be well integrated across layers to get best results (or sometimes even the desired results).  The importance of integration becomes evident with an example.

Let us say a web server is deployed on two nodes.  So in case one node faults there exists another node to serve the requests.  If both these nodes are connected to same power socket, any fault in power socket would result both the nodes go down at the same time.  This essentially means that in spite of having two web server nodes, we landed in trouble.  Imagine if we have a way to make sure that two nodes are always connected to distinct set of resources, it would be much better model. 

The system of categorizing resources into group/set is thus necessary.  These groups are called Fault Domains (FD).  No two FDs share same power source, network switch, etc.  With these FDs in picture, at software level any redundancy system just have to make sure to place the redundancy across FDs. 

We have discussed redundancy as a solution at software layer to deal with faults.  In case of stateless software programs, just having another node would be sufficient.  Where as in case of stateful software programs, there is much more to be done.  For example, database systems.  Traditionally in scale-up world, RAID systems were used to make sure to protect against bad sectors (checksums), hard disk crashes (multiple copies on different disks), etc.  RAID storage is costly so can’t be the choice for distributed systems.  The other scale-up world technique has been data replication.  Replication is typically the place where FD knowledge is required to place copies of data in different FDs.

The moment replication comes into discussion it is important to call out the terminology:

Primary – The node is responsible to ‘own’ replication and interface with client.  Primary is usually only one.

Secondary – The node is responsible to ‘cooperate’ replication.  There can be multiple secondary nodes. 

Replication is a vast subject, but I will keep it short.  Depending on how Primary and Secondary agree to replication there are multiple methods.

Asynchronous Replication – In this model, changes in primary are committed and client is acknowledged without waiting for the secondary nodes to be updated.  Replication with secondary is either triggered or a background task takes up the responsibility of bringing secondary nodes up-to-date.  In short, replication happens asynchronously.  In this mode, if primary node fails, there could be data loss as secondary nodes are not up-to-date.  If the data is not super critical and ok to lose, this model is apt.

Synchronous Replication – In this model, for every change in primary it is synchronously replicated to secondary. Till secondary responds, changes on primary are not ‘committed’ (and so, client is not acknowledged). In short, replication happens synchronously. If secondary node is down, writes are blocked till the node is brought back up.

The no. of copies to be maintained is referred as ‘Replication Factor’.

If data is important, admin would opt for synchronous replication.  The higher the replication factor, the better fault tolerance.  But keeping client on hold till all secondary copies are updated makes an admin to chose lower replication factor.  It can be argued that admin is forced to think deeply about the replication factor with this model.  There are places where, one might need higher replication factor but not at the risk of increased operation times.  There comes the need for mixture of both synchronous and asynchronous replication models, namely Hybrid Replication.  In this model, one can chose no. of secondary nodes to be in synchronous mode and no. secondary nodes to be in asynchronous mode.  Here again, there are two choices where one can designate ‘fixed set’  of secondary nodes for synchronous replication or live with ‘any N’ secondary nodes to acknowledge.

It is also important to note that, a node can act as both primary and secondary for different replication units.  In case of database systems, each database is a replication unit.  So, a node can act as primary for database 1 and as a secondary for database 2. 

In case of Routing Client, client would typically know the primary to reach out for each replication unit.  Some systems allow clients to read from secondary.  Depending on which secondary nodes a client is allowed to read from, results in different levels of consistency.  Werner Vogels wrote great blog about different consistency models (Blog Post, Wikipedia Link).

Primary and Secondary communicate as part of replication and there are multiple models.

Pipeline model – The replication data flow is like a chain – Primary sends the data to first secondary node, first secondary nodes then sends the data to next secondary and so on so forth.  Windows Azure Storage, Hadoop Distributed File System use this model

Multi-Unicast Model – Primary sends replication data to all ‘synchronous secondary’ nodes separately and waits for acknowledgement from everyone

Multicast Model – Incase of hybrid replication with ‘any N’ acknowledgements – Primary sends replication data to all ‘secondary’ nodes (both synchronous and asynchronous secondary nodes) and wait for any N secondary nodes to acknowledge.  The set of N nodes that acknowledge vary for every data block or chunk or packet.

One major advantage of Windows Azure Cache over MemCacheD is it supports High Availability (the public name for replication).  Windows Azure Cache supports synchronous replication model (and no. of secondary nodes is fixed to 1 – and communication model is multi-unicast) and each partition is a replication unit. Cache is in an in-memory system and so the replication is limited to in-memory replication.   And, that is the catch!.  In any stateful system, a node reboot does not lose the state as the state is persisted on disk (either locally or remotely).  However, in case of in-memory system like Windows Azure Cache – a node reboot results in state loss.  Synchronous replication and node-reboot-leads-to-state-loss made us (Windows Azure Cache) to let clients commit when all secondary nodes are down as they don’t have any data that can be said would go out-of-date by allowing writes.  Windows Azure Cache (as on 1.8 SDK release) does not allow client to read from secondary nodes.

Many a times cache is used for data that need to be processed.  Processing involves code to be run.  It is a well known and established fact that keeping the code and data near makes systems complete tasks fast. 

In case of database systems, stored procedures help bring the code (or business logic) near to data. 

In Windows Azure Cache, we have Append, Prepend, Increment, Decrement API to help process value.  It would have been lovely if we had ‘stored procedure’ model instead of these individual API.  That way, any processing can be pushed to these ‘stored procedures’ and we could have simply shipped Append, Prepend, Increment, Decrement as ‘Microsoft owned’ stored procedures.

This is the last post of the series “Distributed and Consistent Hashing”.  Here are the important lessons/techniques to remember (deliberately explained in generic terms so as to carry forward them to use in other discussions).

– Data Nodes – The cache server nodes that actually store data

– Control Nodes or Metadata Nodes – Partition Manager is one such control node we have discussed that helps manage the data nodes and their partition ownership. 

– Routing Clients – Make client intelligent to talk to data nodes directly without the need to go thru control node

– Read from Secondary – Load balance, allowing different levels of consistency,

– Code and Data Distance – If code and data are near, tasks can be completed faster 


That is all for now, will come back with another series on taking a system from scale-up world to scale-out (or distributed) world.



Laxmi Narsimha Rao Oruganti (alias: LaxmiNRO, alias: OLNRao)

Distributed and Consistent Hashing – Part 2

Windows Azure Cache (WA Cache) is an distributed in-memory cache. WA Cache provides a simple <Key, Value> based API. Like, Cache.Put (key, value), Cache.Get (key). You can correlate WA Cache API to that of a Hash. This blog post series tries to take the user from traditional one-node Hash to a distributed Hash.  This is second post of the series (and assumes you have read first post).

We will find variety of ways to distribute if clients and servers do not have common understanding.  If we setup a way to learn key to node mapping, then the decoupling does not result in any problem.

In all the discussion we had, the key range was dynamic in size and changes as the cluster shrinks or expands.  Let us divide the whole key range into equal sized finite portions.  For example, if the key range is 1 – 20, we could define range size as 5 and we would then have four ranges namely: 1-5, 6-10, 11-15, 16-20.  In reality, the key range is much larger based on data type for Hash Code.  That is, it can be ulong.min to ulong.max (or uint.min to uint.max). Let us call each key range as ‘Partition’.  Let us also place some important restriction/assumption on the Partition that “A partition is restricted to a single node/server of cluster (i.e. a partition can’t span two nodes)”.  With this restriction in place, we don’t want to end-up blowing up a machine in case where one particular partition is heavily populated.  One simplest way to reduce this probability is to keep the key-range very small.  That means, no. of key ranges (or Partitions) are high.  

Partition Map:

As discussed each partition would be restricted to one node.  The node on which a partition resides is called ‘Partition Owner’.   As there are many partitions and fewer nodes, each node can own more than one partition.  The information of which node owns what partitions is referred to as ‘Partition Map’.

How this partition map is created, updated, maintained is a big topic, but we will have short discussion to get clarity.  Let us say we designate a server node for this responsibility.  This designation is called ‘Partition Manager (PM)’.  PM keeps the partition map up-to-date, responds to partition map queries, updates partition map in response to node up/downs.  When nodes come-up/go-down, cluster expands/shrinks, PM moves the partitions around (also called as, Load Balancing).  As PM node itself can go down, there should also be a mechanism to any other node to become PM (after making sure the actual PM has died).  Note that, we never want to have two nodes acting as PM, as that would mean there are two decision makers on who owns which partitions, etc.  PM’s partition map is called as Global Partition Map (GPM).  A copy of partition map is available at every server node and is called as Local Partition Map (LPM).  PM communicates appropriately the GPM updates so that server nodes can update LPM accordingly.

Simple Client – Server Gateway:

In this model, client are unaware of all this partition intelligence.  Clients would simply talk to some server (either by round robin or any other simple unrelated mechanism).  Server in turn would use LPM to lookup the actual owner of the Partition to which the key belongs to and work as a mediator between client and partition owner. As there is a mediation in this model, there would be performance cost due to two-hop model.  This model usually works or preferred when parts of the system are legacy or not owned.  For example, MemCacheD clients are not partition aware.  Hence, Windows Azure cache supports this Gateway model when MemCacheD clients want to use MemCacheD protocol against Windows Azure Cache.  We refer to this mechanism of deployment as ‘Server Shim’ (or Gateway Shim). 

Routing Client – Simple Server:

In this model, clients also keep a local copy of GPM (namely, Local Partition Map LPM).  Clients could query PM node (or any other server node) to get a copy of GPM.  Client takes the input key (from user), derives Hash Code, maps the Hash Code to a range/Partition, then uses the LPM to find the owner server node.  Client would then interact directly with server to get/put on that key.  As client is doing all the routing-style logic of finding the right server node, it is called ‘Routing Client’.  As discussed before, PM would be updating GPM as and when required.  However, routing clients have a copy that essentially needs to be invalidated.  So, every time client uses LPM and talks to a server node for any cache operation – it would also validate the LPM version w.r.t. GPM version.  If client has an old copy, it would initiate a LPM refresh.  If a server found using its LPM was already dead or not the primary any more, that is also an indication that its’ LPM is outdated.

Initially there is only one server-node cluster and data is 6,12,18.  Key range size is 5 and total range is limited to 1-20 (all for example purposes only).



Figure 1: Routing Client – One Server

Now, let us insert more data into cache, namely: 3, 9, 15.


Figure 2: Routing Client – One Server – More data

Let us say admin saw the cache usage is approaching the capacity and adds one more server.  At this time, before the partitions are moved to balance the load on nodes (by Load Balancer), here is how the cluster looks:


Figure 3: Routing Client – Two Servers – Prior to load balance activity

Note that, there is no data movement required to be done to make the client queries work.  Client keeps talking to Server S1 for all its cache requests as its LPM (or GPM) says so.  What we have just described is nothing but ‘Consistent Hashing’.  That is, your data distribution is not altered because of node up/downs, cluster expansions/contractions.

Though server S2 is part of cache cluster, it is not used at all as all partitions are owned by S1.  Not just for ‘get’, but ‘put’ is served by server S1.   While we have solved ‘consistent hashing’ with partition maps (routing tables), we are not effectively using resources (i.e. server S2).  As we discussed, PM also takes the ownership of balancing load.  Eventually, PM would detect the S2 is under-utilized and initiates the partition movement.  After PM finishes load balancing, it updates the GPM and keeps every server informed of the latest GPM.  However, clients still would have old version partition map.  Here is how the cluster looks:


Figure 4: Routing Client – Two Servers – Post load balance activity

Note that, GPM is updated, version bumped on server. Client still has old version. 

If client tries to operate on partitions P1 (1-5), P2 (6-10) those operations would succeed.  However, when it tries to operate on partitions P3 (11-15), P4 (16-20) – it would try to talk to S1 (as per its own LPM).  S1 would then react back ‘I am not Primary’ per its own GPM.  Client takes that as a hint and initiates the LPM update.  Here is how the cluster looks like post the LPM update.



Figure 5: Routing Client – Two Servers – Post LPM update

Once the LPM is updated, client would then talk to S2 for operations on P3 and P4 – which would be successful.  The advantage of this model is LB can be done at any appropriate time than immediately.   Partition Movement can be made efficient to reduce the ‘unavailability’ of partition, which is a separate topic of its own.

Windows Azure Cache clients are ‘routing’ clients by default.  If MemCacheD clients are used with Windows Azure Cache server, there is also another mechanism called ‘Client Shim’.  The client shim is nothing but a sibling module on partition unaware MemCacheD clients to bring partition intelligence.



Laxmi Narsimha Rao Oruganti (alias: LaxmiNRO, alias: OLNRao)


Distributed and Consistent Hashing – Part 1

Windows Azure Cache (WA Cache) is an distributed in-memory cache. WA Cache provides a simple <Key, Value> based API. Like, Cache.Put (key, value), Cache.Get (key). You can correlate WA Cache API to that of a Hash. This blog post series tries to take the user from traditional one-node Hash to a distributed Hash.  This is first post of the series.


Hash is an in-memory data structure indexed by key. Hash is generally implemented as an array of collections. Array is referred to as Hash Directory or directory. Each collection is referred to as Hash Bucket or simply Bucket. The array index for a bucket is called as Bucket Id.

Since, key can be of any data type; key is converted to a number namely HashCode. There exists variety of schemes to generate a HashCode for a given key. HashCode is then converted to bucket id by different scheme. The simplest scheme is, BucketIndex = HashCode % BucketCount.

In all the examples of this article, we will use data as numbers. Plus, HashCode=Key.

Blue boxes are Hash directories and Green boxes are Hash buckets. The Hash looks like the diagram below if we insert 0 to 10.


Figure 1 : Normal Hash

What I have described (above) is a single level Hashing. Let us now delve into multi-level hashing. In multi-level hashing, leaf level has hash buckets and all other levels are hash directories.

An example of two-level hash is given below. Data is 0 to 10.


Figure 2 : Multi Level Hash

Thus far, we have discussed in-memory Hashing (of different levels) where all memory is within a node. If we were to Hash of data whose size is in Gigabytes (and Terabytes), then the only to use this hashing is to have a machine with that much memory/RAM; which you know is super costly. The immediate thing that strikes is why not use memory from multiple nodes? Namely, distributed hashing. The capacity of distributed hash is the sum of the capacities of node hashes.

Now with multiple nodes coming into picture, hashing has to have a provision to identify ‘node’ on which to place/retrieve data. Hence, we need (and assume) to have a unique identifier for each node – Node Id.

Distributed Hashing API is not different from the regular Hashing API. That means we need a way to find the node on which the key would be stored. Once we identify the node, we then use the regular Hashing with in that node. As you can see, we have a communication over a network with typical Client-Server system. Each node runs the server side and the actual user application would run the client side.

Instead of jumping into perfect solution, let us go step-by-step with improvements.

Simple Client – Simple Server:

In this scheme, Client identifies the node by applying modulo operator with Node Count.

Node Id = Hash Code % Node Count

Client talks to Server identified, and Server would then uses its own hashing scheme to get/put the user supplied key.


Figure 3 : Distributed Hash – Simple Client and Simple Server

Simple, is not it? Let us find problems in this model.

Application has been inserting data into the Hash and distributed hash has run out of capacity (how does one detect that hash went out of capacity is altogether a separate topic and I will not digress into it. Let us assume, there is a mechanism to detect and inform the user). To increase the capacity, one more server has been added. Just after adding one more server, the system looks like below:


Figure 4 : Distributed Hash – Simple Client and Simple Server – Cluster Expansion

Now Client wants to get Key 3. Node Id = 3 (Hash Code) % 3 (Node Count) = 0. Client then talks to server 0; server 0 can’t find the key ‘3’. Client now gets confused on ‘Key does not exist’ answer, as it has put the key ‘3’ into hash. What’s wrong? Client identification of node is tied with server count.

Remedy 1– Reapply hashing scheme on existing data, whenever there is a change in server/node count, and redo the placement. Let us call this procedure as, ‘data movement’. Here is how the distributed hash looks like, after data movement:


Figure 5 : Distributed Hash – Simple Client and Simple Server – Data Movement

Now, the Client’s get of key ‘3’ would succeed as the Server 0 has the key and returns. Let us look at the problems with this.

‘Red’ colored data represents moved data. As you can observe, around 70%+ of data has participated in data movement. That’s huge amount and is inefficient. The problem gets aggravated if nodes go down and come up frequently.

Taking a step back and rethinking of placement of data in nodes may yield better results.

Scheme: Divide hash code space in to ‘Node Count’ ranges. Each range is owned by a node whose id is nearest to that hash code. Client also does the same when doing get/put to identify the node.

Hash Code is a number and if we consider it as 32-bit integer, the hash space is MIN_INT32 to MAX_INT32. For simplicity of our discussion, let us take Hash Code as 4-bit integer that translates the hash code space as [0, 15].

When there is only one node, whole range is owned by that one node (Server 0).


Figure 6 : Distributed Hash – Range Based – One Node Cluster

Let us add one more node. As the number of ranges is equal to the number of nodes, we will have to divide the hash code space into two ranges. As a result, server 0 now owns [0-7] and server 1 owns [8-15].


Figure 7 : Distributed Hash – Range Based – Two Node Cluster

We retained the practice of ‘Red’ coloring data items that participated in data movement. The data moved now is less. Let us add one more node again. As a result, we need to divide the whole range into three sub-ranges and move the data.


Figure 8 : Distributed Hash – Range Based – Two Node Cluster – Data Movement

We have reduced the data movement volume, but data distribution is skewed. I will introduce you to another way of distributing data, which may/may not solve the distribution skew.

Instead of Node Id of Servers as contiguous, let us assign identifiers with gap but within the hash code space. In our example, HashCode space is [0, 15]. Let us give server id as 4, 10, and 15. The key owner is identified by the server near to the key’s HashCode. The hash with two nodes ids 3, 9 can be depicted as:


Figure 9 : Distributed Hash – Sparse Node Id and Nearest Node Id

Let us introduce another node with id 15. We would treat HashCode space as circular and hence ‘0’ is nearer to Server 15 than Server 4. With this scheme of distribution, the hash is:


Figure 10 : Distributed Hash – Sparse Node Id and Nearest Node Id – Data Movement

The beauty of data distribution schemes discussed so far is that client’s way of identifying the ‘server’ where data should be placed/found is same as server’s way of distributing data. In other words, Client and Server have common and agreed way of distributing data. If we get rid of this common understanding, we will find other ways of data distribution. The decoupling and how clients learn server’s distribution model is the next blog post.