A question that many developers often ask is, “Can I make it run faster?”
In particular, how can we discover the maximum performance of an individual MapReduce job? Let’s set up a test run, and see how fast we can make it. We’ll base this analysis on native Hadoop components, however, the solution should extend to any Hadoop vendors.
But first, some history. In 2004, Google shocked the world by released a paper on MapReduce, outlining the conceptual framework. Shortly after that, engineers started developing MapReduce implementations to solve problems of mind-boggling size (and would 7 years later describe to their bosses as Big Data). The one major reason why MapReduce grew so rapidly is because the pattern is so developer-friendly. The MapReduce paradigm encapsulates all distributed computing complexities in the infrastructure itself. Without the need to understand the underlying distributed computing platform, programmers can now focus on implementing data analytics logic and stop worrying on how data process between many machines.
Did you really believe that last point? Let programmers focus on logic implementation without understanding the underlying complexities? If you want your MapReduce tasks to run faster, you’re going to have to look at everything. Let’s dive right in, spin up a few new nodes (thanks, RightScale!), run a job, and figure out what we need to do to make it fast.
For this blog, I set up a native MapReduce implementation on Hadoop with 3 nodes. Let’s run one of the standard benchmarks on Hadoop: sorting a large list of words.
Step 1: Create our random word list
hduser@hadoop-1:~$ hadoop jar /usr/local/hadoop/hadoop-examples-1.0.4.jar randomwriter -Dtest.randomwriter.maps_per_host=3 rand Running 3 maps. Job started: Mon Sep 30 04:21:01 PDT 2013 13/09/30 04:21:01 INFO mapred.JobClient: Running job: job_201309300248_0015 13/09/30 04:21:02 INFO mapred.JobClient: map 0% reduce 0% ….. Job ended: Mon Sep 30 04:21:58 PDT 2013 The job took 57 seconds.
Step 2: Do our first run to determine the baseline
hduser@hadoop-1:/usr/local/hadoop/conf$ hadoop jar /usr/local/hadoop/hadoop-examples-1.0.4.jar sort -m 3 -r 3 rand rand-sort-baseline Job started: Mon Sep 30 05:00:12 PDT 2013 13/09/30 05:00:18 INFO mapred.FileInputFormat: Total input paths to process : 3 13/09/30 05:00:40 INFO mapred.JobClient: Running job: job_201309300439_0005 13/09/30 05:00:41 INFO mapred.JobClient: map 0% reduce 0% 13/09/30 05:01:06 INFO mapred.JobClient: map 8% reduce 0% … 13/09/30 05:08:52 INFO mapred.JobClient: map 100% reduce 29% 13/09/30 05:10:41 INFO mapred.JobClient: map 100% reduce 30% 13/09/30 05:12:14 INFO mapred.JobClient: map 100% reduce 31% 13/09/30 05:13:57 INFO mapred.JobClient: map 100% reduce 43% … waiting
…. 13/09/30 05:30:49 INFO mapred.JobClient: Physical memory (bytes) snapshot=5115322368 13/09/30 05:30:49 INFO mapred.JobClient: Reduce output records=153368 13/09/30 05:30:49 INFO mapred.JobClient: Virtual memory (bytes) snapshot=27016118272 13/09/30 05:30:49 INFO mapred.JobClient: Map output records=153368 Job ended: Mon Sep 30 05:30:49 PDT 2013 The job took 1837 seconds.
Ugh. 30 min to sort 15G of words? Let’s see if we can’t fix that with some old school config tuning.
Step 3: Tuning!
If we were going to get serious about this, we’d turn to the bible: O’Reilly’s “Hadoop: The Definitive Guide”. I rely on this book heavily. This is the book that I learned most of what I know about MapReduce from. Even better, it’s a great monitor stand. And given that I’m using my monitor, and it’s Friday afternoon, let’s start with something a bit less comprehensive.
Let’s open up our config file and change a few simple things first. The first place to look is in out data itself. We’re dealing with a lot of strings, and strings compress well. There are 3 types of compression in hadoop:
- Compression of the raw input
- Compression between the map output and the reduce input
- Compression on the reduced output
Each of these addresses one of the main stages of data transfer in a MapReduce job. Given that the data is already stored locally in HDFS, and we’re going to store the result in HDFS as well, let’s turn on compression for the middle stage. We can do that with a change to mapred.compress.map.output.
<property> <name>mapred.compress.map.output</name> <value>true</true> </property> <property> <name>mapred.map.output.compression.codec</name> <value>org.apache.hadoop.io.compress.DefaultCodec</true> </property> <property> <name>mapred.output.compression.type</name> <value>BLOCK</value> </property>
In terms of our server config, the 3 nodes I spun up don’t have enough space on them to really run well. By default, Hadoop replicates all the data to 3 nodes, in case of hard drive failure. This is a great feature for data warehousing, but for temporary jobs, it might not be important to make sure the cluster doesn’t lose the data. In our case, we’re just after job performance, so let’s disable this initial replication; if the job fails, we still have the initial data set, and we’ll just load it into the cluster again. This should make the task run a bit faster as well.
<property> <name>dfs.replication</name> <value>1</value> </property>
With these tweaks, lets run this job again:
hduser@hadoop-1:/usr/local/hadoop/conf$ hadoop jar /usr/local/hadoop/hadoop-examples-1.0.4.jar sort -m 3 -r 3 rand rand-sort-compress-dl Running on 3 nodes to sort from hdfs://hadoop-1:54310/user/hduser/rand into hdfs://hadoop-1:54310/user/hduser/rand-sort-noCompress-dl with 3 reduces. Job started: Mon Sep 30 05:51:52 PDT 2013 13/09/30 05:51:52 INFO mapred.FileInputFormat: Total input paths to process : 3 13/09/30 05:51:53 INFO mapred.JobClient: Running job: job_201309300439_0007 … 13/09/30 05:56:27 INFO mapred.JobClient: map 100% reduce 58% 13/09/30 05:56:40 INFO mapred.JobClient: map 100% reduce 59% 13/09/30 05:56:43 INFO mapred.JobClient: map 100% reduce 70% 13/09/30 05:56:49 INFO mapred.JobClient: map 100% reduce 71% 13/09/30 05:57:01 INFO mapred.JobClient: map 100% reduce 72% … 13/09/30 06:02:44 INFO mapred.JobClient: map 100% reduce 100% 13/09/30 06:02:49 INFO mapred.JobClient: Job complete: job_201309300439_0007 … 13/09/30 06:02:49 INFO mapred.JobClient: Physical memory (bytes) snapshot=5119651840 13/09/30 06:02:49 INFO mapred.JobClient: Reduce output records=153368 13/09/30 06:02:49 INFO mapred.JobClient: Virtual memory (bytes) snapshot=27044724736 13/09/30 06:02:49 INFO mapred.JobClient: Map output records=153368 Job ended: Mon Sep 30 06:02:49 PDT 2013 The job took 656seconds.
Wow, look at that! It ran 50% faster than my first baseline. We have a record breaker. It’s Friday afternoon and in Dilbert’s world, PHB will have the result stamped with the “Approved” word with ease. On the other hand, this seems too good to be true. Compression should give us a bit of a speed up, if we assume that transferring and storing our data is the bottleneck. Just trading CPU for storage / network, on the other hand, shouldn’t be this drastic. Let’s not just blindly tune; let’s actually look at the resource consumption and figure out where the bottleneck is. In any Hadoop setup, all bottlenecks fall into one of 4 areas: CPU usage, RAM, network, and I/O. If we look at top during this run, cpu, mem, and wa (io-wait) are all low at the shuffle phrase. By process of elimination, it sounds like we found the culprit here: The Network.
Just knowing it is the network doesn’t get us anywhere, so let’s fire up our favorite network tool: PathView.
Most everything looks OK: plenty of bandwidth, low RTT. On the other hand, one of my nodes was suffering 3-7% data loss! This kind of thing happens all the time on converged networks, which have to handle voice, video, and data. Commonly, network admins will add quality of service markers to the real-time traffic, leading to loss in data traffic if there’s not enough capacity. (To be honest, I artificially created this data loss, because these problems tend to be somewhat random and I didn’t want to run experiments on our office network until I found this particular failure mode. Other common causes include bad network hardware, but I couldn’t find any faulty switches, either.)
This data loss seems like it was the real problem, so let’s compare just fixing this network loss to the original (with none of the configuration tuning).
hduser@hadoop-1:/usr/local/hadoop/conf$ hadoop jar /usr/local/hadoop/hadoop-examples-1.0.4.jar sort -m 3 -r 3 rand rand-sort-no-dl Running on 3 nodes to sort from hdfs://hadoop-1:54310/user/hduser/rand into hdfs://hadoop-1:54310/user/hduser/rand-sort-no-dl with 3 reduces. … 13/09/30 06:36:45 INFO mapred.JobClient: Physical memory (bytes) snapshot=5152235520 13/09/30 06:36:45 INFO mapred.JobClient: Reduce output records=153368 13/09/30 06:36:45 INFO mapred.JobClient: Virtual memory (bytes) snapshot=27081445376 13/09/30 06:36:45 INFO mapred.JobClient: Map output records=153368 Job ended: Mon Sep 30 06:36:45 PDT 2013 The job took 252 seconds.
The job took from 1837 seconds to 252 seconds by clearing the data loss, an even bigger speedup over tuning the config! This also explains the drastic speedup from compression: since our network was the real bottleneck, transferring less data was a pretty effective bandaid. The lesson here is, don’t just blindly tune to your data set; break out your tools and figure out what’s actually going wrong. A lot of people misunderstand, or simply ignore, how network performance can affect your MapReduce speed. We can rely on Hadoop’s resilience to make sure the job finishes, but we’re explicitly masking correctness issues with performance issues. Just eliminating retries and TCP retransmission, we can get a speedup of 80%+. That’s the real problem to solve.
I am sorry if any of you that spent the extra money on spinning off extra nodes for archiving better performance due to a bad network. Real-world example: The extra money and time that you spent on those extra nodes? You will have a better return by just mining BitCoins. If you realize and fix this early enough, the return from BitCoins might worth more than a couple million…