Unit 06 Lab 3: HDFS Integrations

Part 1: Overview

Now that you know the basis of the Hadoop Distributed File System, this lesson will introduce the various ways you can get data in and out of HDFS to operationalize the process.

Learning Outcomes

Upon completing this lab you will be able to: - Understand how to operationalize data imports into HDFS. - Demonstrate how to import SQL data with sqoop and log data with Flume - Evaluate which approach is suitable for a given scenario.

Requirements

To complete this lab you will need:

Before You Begin

Before you start this lab you should:

  1. Login to your Hadoop client. Open a linux command prompt.
  2. Use Ambari to verify:
    • The HDFS and Flume services are running properly.
    • The Sqoop service is installed
  3. Make the HDFS folder for the lab:
    $ hdfs dfs -mkdir unit06lab3

Part 2: Walk-Though

Sqoop

Sqoop provides data integrations with Relational Database Management Systems such as: MySQL, MS SQL Server, and Oracle.Sqoop In fact, Sqoop works with any data source or target which has a JDBC connector. In this walk-through we will use Sqoop to import data from a MySQL database into Hadoop. The process for performing an export, or using a different database is similar and therefore left to the reader.

Setup

Let’s start by importing the data set we plan on importing into HDFS into MySQL first. You would not normally do this, but this is a data integration lab and we need a data source. :-) After we get the data imported we will run some basic sqoop commands to verify everything is there.

  1. Let’s begin by moving into the datasets/superhero folder where our source data is located.
  2. Let’s copy the data to a folder where our import script can find it Type:
    $ cp superhero-movie-dataset-1978-2012.csv /tmp/movies.csv
  3. Next, let’s create out table and import the data into MySQL:
    $ mysql -u root -p < superhero-movies.mysql when prompted for the password, enter mysql this command will execute the SQL script in superhero-movies.mysql which creates our movies table in the superhero database, then imports the data.
  4. To make sure everything worked, let’s logon to MySQL:
    $ mysql superhero -u root -p (again entering mysql as the password)
  5. From the mysql> prompt let’s see if the movies table is there:
    mysql> show tables;
  6. Then verify there’s data in the table with a SELECT statement:
    mysql> select * from movies limit 10;
    You should see some 10 rows of superhero movies with box office numbers in the output.
  7. Type quit to exit the mysql> prompt and return to the linux command prompt.
  8. Let’s run execute the same commands we just did, but this time use Sqoop. First, we view to the tables in the superhero database, type:
    $ sqoop list-tables --connect jdbc:mysql://dsappliance/superhero --username root --password mysql
  9. If we break this command down, we’re connecting to the mysql DBMS on host dsappliance. On that host were attempting to access the superhero database with user name root and password mysql.
  10. In our last check we will write an SQL query to retrieve 10 movies, type:
    $ sqoop eval --connect jdbc:mysql://dsappliance/superhero --username root --password mysql --query "select * from movies limit 10;"
    It should be noted the eval command executes on the mysql DBMS but does not import or export into HDFS.

Importing Data with Sqoop

In this section we will use Sqoop to import our Superhero Movies Dataset into HDFS. We will demonstrate two types of imports a table import where the entire table is loaded into HDFS and a query import where the output of an SQL SELECT statement is imported. If we desire to import in parallel and use more than one mapper during a query import we will need to include how the data should be split.

  1. Importing a table with sqoop is simple using the import command. Type:
    $ sqoop import --connect jdbc:mysql://dsappliance/superhero --username root --password mysql --table movies --target-dir unit06lab3/movies
    to import the table movies in database superhero on MySQL server dsappliance to the HDFS folder unit041lab1/movies.
  2. The sqoop command runs a MapReduce job. When the job completes in about a minute or two you should see an output of summary information:
    Sqoop table import summary
  3. Let’s make sure the data is there:
    $ hdfs dfs -cat unit06lab3/movies/* You should see rows of comma-separated data. Here’s a screenshot of the last few lines:
    Sqoop table import last lines
  4. When we import our data with an SQL SELECT query we must tell Sqoop how to split the data among the mappers. For example if we --split-by the comic column, and conveniently choose two mappers -m 2 since there are only two comics in the database (DC and Marvel) we will import the data into two separate data sets, split on that column:
    $ sqoop import --connect jdbc:mysql://dsappliance/superhero --username root --password mysql --query "select * from movies where \$CONDITIONS" --target-dir unit06lab3/movies-comic -m 2 --split-by comic
    NOTE: You might be asking yourself why do you need to include \$CONDITIONS in the SELECT statement’s WHERE clause. This is required by Sqoop, which uses it to break the source dataset up into parts to be consumed by the Hadoop mapper. It is required whenever you import via a --query. Furthermore, if you require additional logic on the WHERE clause, simply use the AND operator. We will see an example of this later.
  5. Verify it made two data sets by looking at the files in the movies-comic folder:
    $ hdfs dfs -ls unit06lab3/movies-comic
    You will see two files: HDFS ls movies-comic
  6. The first file should contain only movies from DC comics. Why DC? It comes first alphabetically. Type:
    $ hdfs dfs -cat unit06lab3/movies-comic/part-m-00000
    The output should only contain movies from DC comics. HINT: check the 3rd column for the comic name.
  7. Likewise, the second file should contain just Marvel comics. Type:
    $ hdfs dfs -cat unit06lab3/movies-comic/part-m-00001
    NOTE: the -m- in the two files stands for mapper, and there are two files because we used two mappers!
  8. The previous example was quite convenient that we used two mappers and the data we split by had only two values. This quite uncommon, as normally we’ll split on a business key or date value so that we can track the data we’ve imported already. For our final example, let’s do another --query import but include some advanced logic. Let’s import movies with an IMDB rating of 7 or better, splitting them across 4 mappers using the year column. Type:
    $ sqoop import --connect jdbc:mysql://dsappliance/superhero --username root --password mysql --query "select year, comic, title, imdb_rating from movies where \$CONDITIONS and imdb_rating >=7.0" --target-dir unit06lab3/movies-imdb -m 4 --split-by year
    When the job completes, you should see output like this:
    sqoop job output imdb
  9. Uneven splits? You should see the 4 split files when you type:
    $ hdfs dfs -ls unit06lab3/movies-imdb
    But if you pay attention to the file sizes you’ll notice they were split unevenly:
    uneven split
    And if you inspect their contents, you can verify this: $ hdfs dfs -cat unit06lab3/movies-imdb/part-m-00000 contains only 1 movie while
    $ hdfs dfs -cat unit06lab3/movies-imdb/part-m-00003 contains 12 movies. Why?
    The split is actually quite even, but its the data that’s not, mainly due to the recent growth of the superhero movie genre. The --split-by command takes the min() and max() values of the year column to get a range of values then divides them evenly among the number of mappers (4 in this case). Since the first movie was 1978 and the last 2012 that’s a span of 34 years, across 4 splits that about every 8 years. But most of the superhero movies are from 2000 onward. One way to get a more even split is to use a column with evenly distributed data, such as the id column.

Flume

Apache Flume is a distributed, reliable and available system for moving data. Flume makes it simple to stream log data from a website into HDFS, but it also can distribute any other event driven data streams like email, social media, and network traffic. We will use Flume to stream sample Twitter tweets into HDFS.

As you may recall, the Flume agent data flow model consists of a source, channel and sink: Flume Agent Model
The source specifies where the data comes from, the sink represents the data destination, and the channel is the queueing mechanism from source to sink. Flume supports a variety of sources, channels and sinks, and is capable of some complex data flows. In this walk-though we will use the exec source type to monitor changes to a log file and send those changes to a sink of type HDFS.

Before Starting The Steps

NOTE: To complete this walk-though you will need to open 3 terminal windows:

  1. One window to execute a script to simulate a Twitter stream.
  2. A second window to configure and execute the Flume agent, and
  3. A third window to verify our tweets are being streamed into HDFS.

You can start with a single terminal window for now. You will be instructed to open the other windows as needed.

The Tweet Simulator

First let’s explore the Tweet simulator. The tweet simulator is a script which generates a random number of fake tweets every few seconds. It is designed to simulate an actual logging type event where the contents of a file grow in response to events. As you run the script a file sample-tweet-stream.psv will contain the tweets that are generated, since the file is constantl changing we cannot simply -put it into HDFS… the solution is Flume!

  1. From the linux command prompt of your terminal window on your Hadoop client, change into the datasets/tweets folder:
    $ cd datasets/tweets
  2. Type: $ ls -l to list the contents of the folder. You will see several files, most notably tweet-stream.sh, the script we need to execute to generate a stream of tweets, and logagent.conf a base flume configuration file.
  3. Let’s start the Tweet simulator. Type: $ /bin/bash tweet-stream.sh you should begin to see output beneath the script like this:
    Tweet Stream run
    The script will loop 100 times, each time generating a random number of tweets every few seconds.
  4. Since the script is running, you no longer have a command prompt, press CTRL+SHIFT+N to open a new terminal.
  5. In one window, you now have your script running, and in the other window you have a command prompt:
    Two windows
  6. Let’s take a look at the last few lines of the sample-tweet-stream.psv file being generated by the script:
    $ tail sample-tweet-stream.psv inside the file you will see some sample tweets, which is expected.
  7. However, if you wait until the script in the first window generates new tweets, and then repeat the $ tail sample-tweet-stream.psv command, you will notice new tweets were added to the file. NOTE these are RANDOM tweets, so a screenshot makes little sense here.
  8. To stop the tweet simulator, click your mouse into the first window, where the tweets are being generated and press CTRL+C This sends a break signal which halts the script:
    break signal in script

Configure Flume

Next we need to configure Flume to read sample-tweet-stream.psv as a source and write it to a folder in HDFS. We will do this in our second terminal window, leaving our original window as our method of running the tweet simulator.

  1. First, verify flume is installed, type:
    $ flume-ng the command should instructions for how to use Flume from the command line.
  2. Let’s edit the logagent.conf file. For the most part this file has been configured for you but let’s take the opportunity to explain what has been done, type:
    $ nano logagent.conf to edit the file.
  3. Use the arrow keys to scroll down to the line starting with agent.sources.weblog.command = which specifies the command which provides Flume with the data source. After the = you will see the command: tail -F sample-tweet-stream.psv We know that tail shows us the last few lines in the file, but the -F switch “Follows” changes to the file, in effect showing changes to the console as they happen.
  4. Scroll down to the line starting with agent.sinks.mycluster.hdfs.path = which specifies the destination in HDFS for the source data. This should be set to /user/ischool/unit06lab3/flumetweets which is fine for this example. Ideally it should represent the folder based on the data source, so something like /user/ischool/unit06lab3/tweets would be more appropriate. Here’s a screenshot of the logagent.conf with both settings highlighted:
    logagent.conf with highlights
  5. Since we didn’t change anything in the file there’s no need to save. Press CTRL+X to exit. (If asked to save changes, select No.)
  6. Back at the linux command prompt, let’s start the Flume agent, type:
    $ flume-ng agent -n agent -f logagent.conf
    to start Flume. You’ll see a bunch of information spill across your screen but eventually Flume will settle down and display Writer callback called. At this point its waiting to process new data.
  7. Let’s dissect the Flume command line options we used. -f logagent.conf is the name of the configuration file. This can be any name we wish, so we can copy this file for example and create a different configuration. -n agent is the name of the agent to run. If you look at the content of logagent.conf you will notice we named the agent agent (I know its a boring name!)
  8. You’ve got Flume running in the second terminal window, so let’s go back to the first terminal window and restart our tweet simulator and begin to create new tweets for Flume to send to HDFS:
    $ /bin/bash tweet-stream.sh should kick off the simulated tweet stream again:
    Another Tweet stream
    And in the other window, you should see Flume processing the Data:
    Flume agent processing
  9. With both terminal windows busy doing their thing, its time to open that third terminal window so we can get a linux command prompt. Click on one of the terminal windows and press CTRL+SHIFT+N to open a new window.
  10. Let’s see what’s happening in HDFS. What’s in the flumedata folder:
    $ hdfs dfs -ls unit06lab3/flumedata
    You should see a list of files in the folder, with the most recent file ending with .tmp.
  11. Let’s verify the data is streaming, type: $ hdfs dfs -cat unit06lab3/flumedata/*
    to retrieve the contents of the folder from HDFS. Pay attention to the text of the last tweet.
  12. Wait until Flume processes more data in the second window, then re-run the $ hdfs dfs -cat unit06lab3/flumedata/* again in the 3rd windows. You should see a different tweet now as your last tweet. We are streaming!
  13. Return to the first window, and press CTRL+C to stop the tweet simulator.
  14. Return to the second window, and press CTRL+C to stop Flume.
  15. Return to the third window, and re-run: $ hdfs dfs -cat unit06lab3/flumedata/* since the streaming has stopped you will always see the same last tweet.

Test Yourself

  1. Which Sqoop command allows you to execute an SQL query but does not import the data?
  2. Write a Sqoop to show you the tables in MySQL database peter on host paul assuming the root password is mysql
  3. Explain how Sqoop --split-by works.
  4. Justify a reason for using more mappers when importing data with sqoop. Do the same for less mappers.
  5. Explain the purpose of a Flume source, sink and agent.
  6. What does -n agent tell the flume agent?
  7. Can we have more than one configured agent in the same file? Explain your answer.
  8. What is the flume command to start an agent named mike using configuration file fudge.conf?

Part 3 On Your Own

Instructions

Before starting this section, you’ll need to make sure you’ve imported the fudgemart_v3 database into your MySQL server on dsappliance. Here’s the process, from the datasets folder:

  1. Import $ mysql -u root -p < fudgemart/mysql.sql again, password is mysql
  2. Verify $ mysql -u root -p -e "select * from fudgemart_v3.fudgemart_customers limit 5" You should see the first 5 customers from this table.

Questions

  1. Configure a Flume agent to consume the linux system log /var/log/syslog write the contents to the folder unit06lab3/syslog in the ischool account’s HDFS home directory. Start by modifying the logagent.conf file we used in part two of the lab to a file called syslogagent.conf and then edit it accordingly. Provide the command you wrote along with commands that prove the contents of /var/log/syslog are in HDFS.
  2. Use Sqoop to import the fudgemart_employees table from the fudgemart_v3 database on MySQL server dsappliance into the folder unit06lab3/fudgemart_employees Write the command you wrote to do this and provide verification the import worked and the data are in HDFS.
  3. Import the following data: select * from fudgemart_employee_timesheets as a sqoop query. Use 4 mappers and split on timesheet_id. Write the command you wrote to do this and provide evidence you import worked and the data are in HDFS. Are the splits even? Why or Why Not?