Fault tolerance

Completed

MapReduce's objective is to divide jobs into tasks that effectively exploit task parallelism and, consequently, complete jobs earlier. Although this approach is quite effective in theory, it exposes its own challenges in practice. For instance, it takes only one slow/faulty task to make the whole job consume significantly more time than expected. In reality, Hadoop MapReduce tasks fail and slow due to hardware degradation, software misconfiguration, heterogeneity, and/or data locality, to mention a few problems. Tolerating faulty and slow tasks in clouds is not easy. In particular, when the volumes of data flowing through an I/O system are as big as those processed by Hadoop, the chance of data pieces getting corrupted increases. Furthermore, when tasks and nodes operate in the thousands and beyond (typical for Hadoop), chances of failure increase.

Hadoop MapReduce applies two mechanisms to tolerate faults, data redundancy and task resiliency. Data redundancy is applied at the storage layer. Specifically, HDFS reliability retains HDFS blocks by maintaining multiple replicas per block (by default, three replicas) at physically separate machines. Clearly, this enables MapReduce to tolerate corrupted blocks and faulty nodes easily. If a block is lost due to a hardware or software failure, another replica at a different node can always be located and read in a way totally transparent to user jobs. HDFS computes checksums using a cyclic redundancy check (CRC-32) for all data written to it and, by default, verifies checksums when reading data from it.2 When a block error is detected or a node goes down, HDFS transparently brings back the replication factor to its default level of three.

Although it is possible that all the HDFS blocks of a job's dataset are error free, the job's tasks may still run slowly or simply fail. Clearly, a task slowdown or failure can lead to slowing an entire job or causing it to fail. To avoid such consequences and to achieve resiliency, Hadoop MapReduce allows replicating tasks and monitors tasks to detect and treat slow/faulty ones. To detect slow/faulty tasks, Hadoop MapReduce depends on the heartbeat mechanism. The JobTracker (JT) runs an expiry thread that checks each TaskTracker's (TT) heartbeats and decides whether the TT's tasks are dead or alive. If the expiry thread does not receive heartbeats confirming a task's health within 10 minutes (by default), the task is deemed dead. Otherwise, the task is marked alive.

Alive tasks can be slow ("stragglers" in Hadoop's parlance) or not slow. To measure slowness, the JT estimates task progress using a per-task score between 0 and 1. Map and reduce scores are computed differently. For a map task, the progress score is a function of the input HDFS block read so far. For a reduce task, the progress score is more involved. Hadoop MapReduce assumes that each of the reduce stages (shuffle, merge and sort, and reduce) accounts for one-third of a reduce task's score, and, for each stage, the score is the fraction of data processed so far. For instance, a reduce task that is halfway through the shuffle stage will have a progress score of $\frac{1}{3} \times \frac{1}{2} = \frac{1}{6}$. On the other hand, a reduce task that is halfway through the merge and sort stage will have a progress score of $\frac{1}{3} + (\frac{1}{2} \times \frac{1}{3}) = \frac{1}{2}$. Finally, a reduce task that is halfway through the reduce stage will have a progress score of $\frac{1}{3} + \frac{1}{3} + (\frac{1}{3} \times \frac{1}{2}) = \frac{5}{6}$.

When it detects a slow task, the JT runs a corresponding backup (speculative) task simultaneously. Hadoop allows one speculative task per original slow task, and the two compete. Whichever version finishes first is committed, and the other is killed. This task-resilience tactic is known in Hadoop MapReduce as speculative execution and is activated by default, although it can be enabled or disabled independently for map and reduce tasks, on a cluster-wide or per-job basis.

Hadoop MapReduce computes the average progress score across all tasks in each task category (e.g., all map tasks). In a category, any task scoring less than 80% of the mean (called the 20% progress-difference threshold) is marked a straggler. And as long as all original map and reduce tasks are already scheduled,9 the JT launches an equivalent, speculative task. All stragglers are treated as equally slow, and ties between them are broken by data locality. More precisely, if a map slot becomes free at a particular TT, and two map stragglers are detected, the one that uses an HDFS block stored at TT will be selected for speculative execution. If the two stragglers will both need HDFS blocks stored at TT, one can be chosen randomly. Dead tasks always get highest priority, and speculative tasks get the lowest. In particular, when the JT receives a TT heartbeat that includes a map or reduce task request, the JT replies with a task in the following order:

  1. A task that compensates for a dead or stopped task.
  2. An original, not yet scheduled task.
  3. A speculative task.

Hadoop MapReduce's task-resiliency approach works well in homogeneous environments but falters in heterogeneous ones for several reasons:1, 3

  • Heterogeneity can result from resource contention in virtualized clouds, in which the congestion may be only transient. In such cases, the JT may launch too many speculative tasks for originals that appear slow at the moment but are shortly thereafter identified as not-slow. Speculative tasks take resources away from originals, and excessive speculative executions can slow the entire cluster, especially if the network is overloaded with a great deal of unnecessary shuffling traffic.
  • Hadoop MapReduce also launches speculative tasks at TTs without considering how their current loads/speeds compare with those of TTs hosting the original tasks. Potentially, the JT could schedule a speculative task at a slow TT that subsequently becomes slower than even the corresponding original task.
  • Because the Hadoop scheduler uses data locality to break ties among map stragglers, the wrong stragglers can be selected for speculation. If the JT detects two stragglers, $S_{1}$ and $S_{2}$, of which the $S_{1}$ score is 70% of the average and the $S_{2}$ score is 20%, and if a TT hosting the $S_{1}$ input block becomes idle, $S_{1}$ could be speculated before $S_{2}$.
  • The 20% progress-difference threshold implies that tasks scoring over 80% of the average will never be speculated, despite necessity or potential efficiency gains.
  • Finally, Hadoop MapReduce divides the reduce phase score equally across its three constituent stages. This compromise is unrealistic in a typical MapReduce job, in which the shuffle stage is usually the slowest due to involving all pairs communicating over the network. In actuality, it is highly likely that after the shuffle stage, MapReduce jobs quickly finish the merge and sort stage and the reduce stage. Therefore, soon after the first few reduce tasks finish their shuffle stages, their progress scores will jump from $\frac{1}{3}$ to $1$. This will significantly increase the overall average score and potentially degrade the accuracy of speculation. In fact, as soon as 30% of reduce tasks commit, the average score becomes $0.3 \times 1 + 0.7 \times \frac{1}{3} = 53%$. Subsequently, all reduce tasks that are still in the shuffle stage will be 20% behind the average score. As a result, an arbitrary set of false stragglers will be speculated, filling up reduce slots quickly and possibly overwhelming the cloud network with unnecessary traffic.

Clearly, Hadoop MapReduce's standard speculative execution approach suffers from serious shortcomings. For this reason, Facebook has disabled speculative execution for reduce tasks.1 Yahoo! has also disabled speculative execution altogether, although only for certain jobs.1 To address the underlying problem, Zahria and associates1 propose a greedy strategy called longest approximate time to end (LATE), which suggests that only those tasks expected to finish farthest in the future can be speculated. LATE provides the greatest opportunity for speculative tasks to overtake originals and, accordingly, should tend to decrease job response times. However, the challenge lies in identifying appropriate candidate tasks. To do so, LATE proposes computing the progress rate of each task as progress $\frac{score}{T}$, where $T$ is the time the task has taken so far, and then predicting the task's time to completion as $\frac{(1 - progress\ score)}{progress\ rate}$. In addition, LATE promotes scheduling speculative tasks at only fast TTs (those above a certain threshold). Also, to account for the fact that speculation consumes resources, LATE specifies a cap on the number of speculative tasks that can be launched at once. Last, LATE overlooks data locality upon scheduling speculative map tasks, assuming that most original map tasks still run with local input HDFS blocks and commit successfully. Experimentation results show that LATE can improve Hadoop response times twofold on heterogeneous cloud environments.


9 Each original task should run for at least 1 minute (by default) before its progress score is computed. Afterward, the JT decides whether or not to schedule a corresponding speculative task.


References

  1. M. Zaharia, A. Konwinski, A. Joseph, R. Katz, and I. Stoica (2008). Improving MapReduce Performance in Heterogeneous Environments OSDI
  2. T. White (2011). Hadoop: The Definitive Guide 2nd Edition O'Reilly
  3. Z. Guo and G. Fox (2012). Improving MapReduce Performance in Heterogeneous Network Environments and Resource Utilization In Proceedings of the 2012 12th IEEE/ACM International Symposium on Cluster, Cloud and Grid Computing

Check your knowledge

1.

How is data redundancy handled in Hadoop MapReduce?

2.

How are tasks replicated in Hadoop MapReduce?

3.

The following is a list of some assumptions that Hadoop implicitly makes in speculative execution:

  • A. There's no cost of scheduling a speculative task at a TT that exposes an idle slot.
  • B. TTs execute tasks at roughly the same rate.
  • C. Tasks in a job progress at a constant rate over time.
  • D. In a reduce task, the shuffle, the merge and sort, and the reduce stages take equal time (i.e., each takes one-third of the total reduce task time).
  • E. A task with a low progress score is probably a straggler because tasks tend to finish at comparable times.
Which of the above assumptions are most likely to break down in heterogeneous clouds but not in homogeneous ones?

4.

Here are three assumptions that Hadoop implicitly makes in speculative execution:

  • A. TaskTrackers execute tasks at roughly the same rate.
  • B. In a reduce task, the shuffle, the merge and sort, and the reduce stages take equal times (i.e., each takes one-third of the total reduce task time).
  • C. The task with a low progress score is probably a straggler because tasks tend to finish at comparable times.
Assume a Hadoop cluster C with very limited network bandwidth, and a Hadoop job J with a very high shuffle rate. Which of the above assumptions are most likely to break down when running J over C?