Unit 06 Lab 2: Mapreduce and YARN

Part 1: Overview

About Title

In this lab you will learn how jobs are processed in Hadoop through YARN. We will execute MapReduce jobs, track their progress and manage output.

Learning Outcomes

Upon completing this lab you will be able to:

Requirements

To complete this lab you will need:

Before You Begin

Before you start this lab you should:

  1. Login to your Hadoop client. Open a linux command prompt.
  2. Use Ambari to verify:
    • The HDFS, YARN and MapReduce2 services are running properly.
  3. Make the HDFS folder for the lab:
    $ hdfs dfs -mkdir unit06lab2

MapReduce runs on YARN

What is the relationship between MapReduce and YARN?

In the early versions of Hadoop there was but one data processing framework: MapReduce. MapReduce was good for batch processing but ill suited for other scenarios. In addition, MapReduce was not very efficient with its resources, and it was common for MapReduce jobs to have slow execution times and require multiple writes to HDFS.

Next-generation Hadoop recognized the need for a data-operating system, a more generic distributed processing framework with resource management suitable for a variety of data scenarios including batch, streaming, interactive, and real-time. YARN was developed for this purpose, providing a set of interfaces developers can use to build a variety of distributed applications on Hadoop. YARN applications scale better and use the cluster resources with much greater efficiency.

Most importantly, YARN was developed with backwards compatibility in mind. MapReduce2 is a YARN application that implements the MapReduce framework. It was developed for backwards compatibility so that Hadoop users could leverage the benefits of YARN for their existing MapReduce programs.

Part 2: Walk-Though

Understanding MapReduce

In this walk-through we will use the wordcount program in hadoop-mapreduce-examples.jar to demonstrate how MapReduce works.

  1. Make the sotu folder in HDFS:
    $ hdfs dfs -mkdir unit06lab2/sotu
  2. Upload 2016-state-of-the-union.txt to HDFS: $ hdfs dfs -put datasets/text/2016-state-of-the-union.txt unit06lab2/sotu
  3. To make life easier, let’s make a variable to represent the absolute path to the hadoop-mapreduce-examples.jar file. That way we do not have to constantly type it in. $ export MREX=/usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar
    Then let’s quickly verify our variable is set, type:
    $ echo $MREX and you should see:
    /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar `
  4. Let’s execute a wordcount over the file. Type:
    $ yarn jar $MREX wordcount unit06lab2/sotu/* unit06lab2/sotu-wordcount
    This will kick of the MapReduce job to count the words in the state of the union text file.
    IMPORTANT: Make a note of the YARN job. We will need it for the next section. I’ve highlighted it in the output so you can see what it looks like:
    map resource job
  5. When the job is complete, you should see output like this:
    wordcount sotu output
  6. Let’s inspect the output in the folder, type:
    $ hdfs dfs -ls unit06lab2/sotu-wordcount besides the SUCCESS file you should see one file in the output folder: wordcount sotu output file NOTE: The file name is part-r-00000 We learned in an earlier lab the -m- stands for map. As you can guess, the -r- stands for reduce. There is only one file because one reducer was used to process the word counts.
  7. Let’s observe the output from the MapReduce job, type: $ hdfs dfs -cat unit06lab2/sotu-wordcount/*
    A lot of data will scroll through your screen. Notice the words are sorted alphabetically. This is because the words in the source file are the keys from the map, and the count of the words results from the reduce.

NOTE: If you want to run the wordcount MapReduce again, you must either delete the output folder or write the output to a different folder. You can delete the output like this: $ hdfs dfs -rm -r unit06lab2/sotu-wordcount

YARN Job History

The word count MapReduce job we completed in the previous section was executed through YARN (Technically that makes it a MapReduce2 job). YARN keeps a record of all running applications. Let’s explore how we can leverage this to track job history and logs.

  1. First let’s locate our job by listing out jobs with a FINISHED application state, type:
    $ yarn application -list -appStates FINISHED
    To list all of the completed YARN jobs.
  2. If that’s too many for you we can isolate the output further, piping it through grep, type:
    $ yarn application -list -appStates FINISHED | grep "_0020" where 0020 should be replaced with the last 4 digits of your yarn job number. (Did you remember to jot it down?)
  3. Your output should appear similar to this:
    yarn application list output
    While the actual job number will vary, you should see that the job is FINISHED and has SUCCEEDED and that it was a word count job, using MAPREDUCE executed by user ischool.
  4. Dive a little deeper and check the details of the job. Try this:
    $ yarn application -status application_1463775986054_0020 (of course your job number will be different)
    You should see detailed job output like this:
    yarn application detailed output
  5. To view the log file for the job type:
    $ yarn logs -applicationId application_1463775986054_0020 (remember to replace with your job number)
    This will spill several hundred lines of logs across your screen…hardly useful.
  6. But piping the output to the less command is quite useful:
    $ yarn logs -applicationId application_1463775986054_0020 | less
  7. Now you can use the arrow keys to scroll through the logs and press q when you’re done viewing them.

NOTE: You can also view and track YARN jobs through the ResourceManager UI. This is Accessible through Ambari on the YARN service page in the Quick Links menu. It is also directly accessible through this URL: http://sandbox:8088 from your Hadoop client.

Managing YARN Jobs

In this next example we will demonstrate how to track running YARN jobs and terminate them as needed. Sometimes jobs get stuck in a running state and thus it is useful to know how to terminate them to reclaim resources.

  1. First let’s start out by running a long MapReduce2 job, where we estimate the value of Pi via the quasi-Monte Carlo method. We submit the job with 100 mappers and a sample size of 1,234,567,890. To start this job, type: $ yarn jar $MREX pi 100 1234567890 (remember $MREX is an environment variable representing the full path to hadoop-mapreduce-examples.jar).
  2. Eventually you’ll see output of the map portion of the MapReduce job running in your terminal window, like this:
    yarn mapreduce long job
    At this point, press CTRL+C to break out of the job output and return to the linux command prompt. Do not wait for the job to complete
  3. You might think we just canceled the job, but you need to remember how Hadoop works. The execution occurs on the cluster, and we’re only seeing the job output on the client. We can verify the job is still running by typing:
    $ yarn application --list
    You should see output similar to the following, note the application id and job progress have been highlighted:
    yarn application list running
  4. We can get a detailed view of the running application, with a similar command, type”
    $ yarn application --status application_1463775986054_0021 (Again, your application Id will vary from the screenshot, be sure to use yours!)
    Notice the application state says RUNNING yarn application status
  5. To terminiate the running application we add the --kill option, try this:
    $ yarn application --kill application_1463775986054_0021
    In the command output you should see Killed Application and then the application Id.
  6. Let’s verify it is no longer running:
    $ yarn application --list
    You will see there are no application Id’s running.
  7. And if we check on the status of the applciation Id,
    $ yarn application --status application_1463775986054_0021 You will notice the Diagnostics: line now says Application killed by user.

Congratulations you now know how to terminate a running YARN application!

Hadoop Streaming

Hadoop has a custom, library called hadoop-streaming.jar which you can use to write MapReduce programs in just about any programming language. The two conditions are:

  1. The pre-requisites to execute the program must be installed on each data node in your Hadoop cluster.
  2. The program must read from stdin and write to stdout

Let’s walk through an example in the sections below. In this example we will use the exam-scores.csv data set in our datasets/exam-scores folder. We’ll write a program to output the grade distribution of scores.

Testing Outside Hadoop

We should be able to test our Hadoop Streaming program outside of Hadoop. If we can get it working locally, and the pre-requisites are met on the data nodes then there’s no reason we cannot get it working in Hadoop.

  1. Let’s see what we have to work with. Type:
    $ cat ~/datasets/exam-scores/exam-scores.csv
    Notice the file is comma-delimited and the last value in the row is the letter grade representation of the exam score:
    exam scores output
  2. Let’s create our mapper program, type:
    $ nano grade-mapper.py to start the editor.
  3. Here’s our mapper program.
#!/usr/bin/python2
import sys

for line in sys.stdin:
        line = line.strip()
        items = line.split(',')
        letter = items[-1]
        if letter != 'Letter Grade':
          print '%s\t%s' % (letter,1)

This program reads each line, splitting on the ',' then finds the letter grade (last item in items which is items[-1] then outputs the letter grade and a 1 as part of the mapper. The if statement skips the first line which does not contain the header information.

  1. Save your code with CTRL+X and press Y
  2. Before we can test this program we need to make the program executable, type: $ chmod u+x grade-mapper.py
  3. Now we can test our mapper:
    $ cat datasets/exam-scores/exam-scores.csv | ./grade-mapper.py
    You should see output like this:
    grade mapper output
  4. We will have to sort the mapper output before sending it to the reducer program. This is the shuffle phase of MapReduce, type:
    $ cat datasets/exam-scores/exam-scores.csv | ./grade-mapper.py | sort
    The output should now be sorted:
    grade mapper sorted output
  5. Now we need to write the reducer program, type: $ nano grade-reducer.py to start the editor.
  6. Here’s our reducer program:
#!/usr/bin/python2
import sys

current_grade = None
current_count = 0
grade = None

for line in sys.stdin:
        line = line.strip()
        grade, count = line.split('\t',1)
        try:
                count = int(count)
        except ValueError:
                continue

        if current_grade == grade:
                current_count +=count
        else:
                if current_grade:
                        print '%s\t%s' % (current_grade, current_count)
                current_count = count
                current_grade = grade

if current_grade == grade:
        print '%s\t%s' % (current_grade, current_count)

It’s a little more involved than our mapper, but just as easy to understand. Since our output is sorted by key grade we are looping through each line in the mapper output and counting up the values (1’s in this case). When the current_grade no longer matches grade its time to print the value and start counting the next grade.

  1. Save your code with CTRL+X and press Y
  2. Again we must make the program executable before we can run it, type: $ chmod u+x grade-reducer.py
  3. Finally we run our entire MapReduce program as follows: :
    $ cat datasets/exam-scores/exam-scores.csv | ./grade-mapper.py | sort | ./grade-reducer.py
    Yielding the following output:
    map reduce output
    Our distribution of letter grades!

Running Our Program on Hadoop

To run this program on Hadoop we use the hadoop-streaming.jar. This is fairly trivial now that we have tested our code locally first.

  1. Let’s put the exam-scores.csv in HDFS, type:
    $ hdfs dfs -put datasets/exam-scores/exam-scores.csv unit06lab2/exam-scores.csv
  2. To simplify the next command, let’s make a variable for the hadoop-streaming.jar library, type:
    $ export HADOOP_STREAMING=/usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar
  3. Now we can run our Hadoop streaming job:
    $ yarn jar $HADOOP_STREAMING -file grade-mapper.py -file grade-reducer.py -mapper grade-mapper.py -reducer grade-reducer.py -input /user/ischool/unit06lab2/exam-scores.csv -output /user/ischool/unit06lab2/exam-distribution
    This will kick-off a MapReduce2 job. The files in the -file option are uploaded to HDFS to execute the job and we specified which program was the -mapper and which was the -reducer Eventually you will see the familiar job output:
    hadoop streaming job output
  4. And we can view the output with:
    $ hdfs dfs -cat unit06lab2/exam-distribution/*
    map reduce output
    Which is the same output as before, only executed on out Hadoop Cluster!

Test Yourself

  1. Explain the relationship between YARN and MapReduce in the current versions of Hadoop?
  2. Which command displays running YARN applications?
  3. Which command displays a history of YARN applications?
  4. What is the purpose of the Application Id in YARN?
  5. How do you view the output of an application that had finished executing?
  6. Explain how hadoop streaming works. What are its advantages?

Part 3 On Your Own

Instructions

Perform a MapReduce word count job on preamble.txt found in your datasets/text folder. Make sure to use best practices and put the text file in the unit01lab1/preamble folder on HDFS.

Questions

  1. What are the commands required to complete upload the file to HDFS, execute the word count job, and view the files in the job output? List them here.
  2. Provide a screenshot of the job output. Which word appears most often?
  3. Execute a command to show the status of the job. Include a screenshot. How long did the job take?
  4. Execute a command to show the log output from the job. What is the last line of the log?