Unit 07 Lab 1: Pig

Part 1: Overview

About Title

What is this lab about? Define and set expectations here.

Learning Outcomes

Upon completing this lab you will be able to: - Use pig

Requirements

To complete this lab you will need:

Before You Begin

Before you start this lab you should:

  1. Use Ambari to verify:
    • The HDFS, YARN and MapReduce2 services are running properly.
    • Pig is installed on your Hadoop client.
  2. Login to your Hadoop client. Open a linux command prompt.
  3. From the command prompt, setup the files and folders for the lab:
    1. Make the HDFS folder for the lab:
      $ hdfs dfs -mkdir unit07lab1
    2. Make a folder for the clickstream logs in HDFS:
      $ hdfs dfs -mkdir unit07lab1/weblogs
    3. Make a folder for the IP Lookupsin HDFS:
      $ hdfs dfs -mkdir unit07lab1/iplookups
    4. Put the logs into HDFS: $ hdfs dfs -put datasets/clickstream/*.log unit07lab1/weblogs
    5. Put the iplookups into HDFS: $ hdfs dfs -put datasets/clickstream/ip_lookup.csv unit07lab1/iplookups

Part 2: Walk-Though

In this walk-through we will learn how to manipulate data with with the Pig scripting language.

Getting Your Bearings

  1. To start Pig, from the command line, type:
    $ pig
  2. This will launch a different prompt which says grunt> This is the grunt shell and it’s were we write pig commands.
  3. To get help, try:
    grunt> help
  4. To quit, type:
    grunt> quit
    This returns you to the linux command prompt. Start Pig again:
    $ pig
  5. Let’s try our first grunt command and see what it does, type:
    grunt> ls unit07lab1
    You should see the two folders we created in the Before You Begin section of the lab:
    ls of unit07lab1
    So the ls command does not retrieve a directory of local files, but rather files stored in Hadoop.
  6. Since we know these commands operate on HDFS, let’s try this command:
    grunt> cat unit07lab1/iplookups/
    This should output the contents of the iplookupsfolder. It contains a list of IP Addresses with their Country, City, State, Latitude, and Longitude. Here’s the last few lines of the output:
    cat iplookup last few lines
  7. At this point you might be wondering how Pig is any different from hdfs commands. Pig allows us to read data in and store it into an alias, which we can then manipulate. For example, type: grunt> ip = LOAD 'unit07lab1/iplookups/*' USING TextLoader();
    This command says to LOAD the files at unit07lab1/iplookups/* using the TextLoader() method into the alias ip. The TextLoader() function instructs Pig to read the data as raw text. In Pig parlance, the entire step is called a relation; a Pig script consists of a series of relations.
  8. When you execute this relation nothing appears to happen, and if that’s what you’re thinking then you’re only kind of correct. Pig uses lazy evaluation, which means no processing occurs in Hadoop until a command is forced to generate output. At that point the relations associated with the aliases are evaluated, and a MapReduce program executes. For example, type:
    grunt> DUMP ip;
    You’ll see a few log messages as well as an Application Id for the YARN job. When the program completes you should see output. Here’s the last few lines:
    dump iplookup last few lines
    This time you’ll see () around each row of data, that’s because Pig reads data into a tuple instead of as raw rows. More on this later.
  9. You can see the list of Pig aliases at any time by typing:
    grunt> aliases;
    At this point you will only see [ip] because we’ve only created that single alias.

Understanding Schema-On-Read

With Hadoop we store the data “as-is” and apply a schema to data only when we need it. This section will demonstrate how Pig implements Schema-On-Read, which is one of the keys to understating the capabilities of big data.

Implicit schemas

  1. Let’s see what kind of schema we’ve applied to the ip alias. Type:
    grunt> DESCRIBE ip;
    Sadly, pig will report back Schema for ip unknown. Without a schema we cannot process advanced logic such as sorting by City or filtering by state.
  2. You probably noticed from previous steps where we output the data that it is comma-delimited. Pig will allow us to infer a schema using the PigStorage() method. Type:
    grunt> ip = LOAD 'unit07lab1/iplookups/*' USING PigStorage(',');
    we say PigSotrage(',') because our data is comma-delimited.
  3. When we DUMP the data it looks the same as before, type:
    grunt> DUMP ip;
    dump iplookup last few lines
  4. In actuality there’s an inferred schema. Let’s demonstrate by projecting just the first and third columns which are IP Address and State using Pig’s FOREACH ... GENERATE statement to make this relation:
    grunt> ip_and_state = FOREACH ip GENERATE $0, $2;
    then type:
    grunt> DUMP ip_and_state to view the output:
    dump ip and state last few lines
  5. Let’s filter rows using the FILTER command to only IP Addresses in 'NY' type this relation:
    grunt> ny_ips = FILTER ip_and_state BY $1 == 'NY';
    why is it $1 and not $2? That’s because the ip_and_state alias only has two columns, and the state column is now the second one $1.
  6. Confirm your output, type:
    grunt> DUMP ny_ips;
    dump ny ip addresses
  7. Let’s see the schema associated with ny_ips, type:
    grunt> DESCRIBE ny_ips;
    You should see it consists of two bytearray columns, which is the default data type:
    describe ny_ips

Defining Schemas

  1. Let’s re-read our iplookups but this time, we’ll apply a schema, type:
    grunt> ip = LOAD 'unit07lab1/iplookups/*' USING PigStorage(',') AS (ipaddress:chararray, country:chararray, state:chararray, city:chararray, latitude:float, longitude:float);
  2. Now when we type:
    grunt> DESCRIBE ip;
    We see our defined schema:
    ip lookup schema
  3. Let’s continue with the same example we did from the previous section. Here’s the schema version which gets the IP Address and state columns:
    grunt> ip_and_state = FOREACH ip GENERATE ipaddress, state;
    run a DESCRIBE then a DUMP to verify the schema and output.
  4. And then we filter for just 'NY' IP’s, type:
    grunt> ny_ips = FILTER ip_and_state BY state == 'NY'; once again run a DESCRIBE then a DUMP to verify the schema and output. It should be the same as before.

NOTE: Explicit schemas are not required, but do make our relations easier to understand.

Filtering the Header Row.

  1. If you perform a:
    grunt> DUMP ip; You’ll notice the first row in the output is not actual data, but a header row of column names:
    ip lookup first row headers
  2. It’s common practice to filter this header row out, which we can do in this case with:
    grunt> ip_noheader = FILTER ip BY ipaddress != 'IP';
    Execute a DUMP to verify the header is gone.

weblogs

In this section we will prepare the weblogs data set in a similar manner as we setup iplookups.

  1. First we load the weblogs, and apply a schema:
    grunt> weblogs = LOAD 'unit07lab1/weblogs/*' USING PigStorage(' ') AS (reqdate:chararray, reqtime:chararray,x1:int,method:chararray,uri:chararray,x2:int,x3:int,x4:int,ipaddress:chararray,useragent:chararray); Notice for the columns we do not need from the source data that we do not apply a valid schema, and use x1, x2 etc instead.
  2. Next we filter out the headers, we can do this easily because in the header rows the ipaddres field will be empty, type:
    grunt> weblogs_noheader = FILTER weblogs BY ipaddress !='';
  3. Execute a DUMP to verify you have output and a DESCRIBE to ensure you’ve created the proper schema before continuing.

Understanding Pig Grouping

  1. The GROUP operator creates a relation with two columns. The first column contains the values you’re grouping by, the second column contains a bag of all the rows which match the group. Try this:
    grunt> by_state = GROUP ip_noheader BY state;
  2. Let’s inspect the schema:
    grunt> DESCRIBE by_state;
    You will see:
    group by state schema
    The first field is named group and is the same type as state the second field is named ip_noheader after the relation and contains a bag (think mini-table) of rows which match the group. You can see this by executing a:
    grunt> DUMP by_state;
    Here’s the last few rows of output:
    dump by state last few rows
  3. Let’s do something productive with our grouping, like count the number of IP’s by state:
    grunt> count_by_state = FOREACH by_state GENERATE group, COUNT(ip_noheader);
    Then, of course, execute a DUMP to see the output:
    dump count by state
  4. As you might already anticipate, we can add a schema to this relation if we like:
    grunt> count_by_state = FOREACH by_state GENERATE group AS (state:chararray), COUNT(ip_noheader) as (ip_count:int);
    Execute a DESCRIBE to see the schema.
    NOTE: We can add a schema to any FOREACH command.

More Than Just Counts:

  1. We can do more than count rows. There’s other aggregate operators like MIN, MAX, SUM, and AVG to name a few. For instance this relation gets the longitude by state:
    grunt> avg_lng_by_state = FOREACH by_state GENERATE group as (state:chararray), AVG(ip_noheader.longitude) as (avg_lng:float)
    Execute a DESCRIBE and then a DUMP to get an idea of how this command worked.
  2. Sometimes you need to count or find the min or max amongst all the rows. we use GROUP ALL to do this:
    grunt> ip_group = GROUP ip_noheader ALL;
  3. Then with this grouping we can find the Northern most IP address, type:
    grunt> northernmost_coord = FOREACH ip_group GENERATE MIN(ip_noheader.latitude);
    then a DUMP reveals:
    dump northernmost coord

Pig Joins

In this section we demonstrate how to perform basic joins in Pig.

  1. The most common type of join is the inner join, which produces a single data set from two separate data sets by matching rows found in a column. For example, we can combine the weblogs and iplookups datasets on ipaddress like so:
    grunt> weblogs_with_iplookups = JOIN ip_noheader BY ipaddress, weblogs_noheader BY ipaddress
  2. If you perform a DUMP to view the output, you’ll see its far too much data to view at once. If you look closely, you can see the data is joined together:
    joined output
  3. If you DESCRIBE the alias, you will it consists of fields from both ip_noheader and weblogs_noheader if you need to reference a single column, for example the ipaddress we would say ip_noheader::ipaddress
  4. One way to speed up the join process is to use a replicated join, which attempts to load the smaller of the two data sets being joined into memory. Time this MapReduce job:
    grunt> DUMP weblogs_with_iplookups ;
    Then re-write our join as:
    grunt> weblogs_with_iplookups = JOIN weblogs_noheader BY ipaddress, ip_noheader BY ipaddress USING 'replicated';
    Then re-time this MapReduce job:
    grunt> DUMP weblogs_with_iplookups ;
    The second time should be faster!

Joins Are Expensive

Joins are a computationally expensive operation and therefore it is a best practice to filter out the number of rows in your relations prior to completing a join. In this next section we will demonstrate this by way of an example:

We’d like to know the cities and states of people who added something to their shopping cart. We understand when they’ve added something to the cart because our web application accepts items via HTTP POST to uri’s beginning with /ShoppingCart this will form the basis of our pre-join filter.

  1. First we filter out weblogs_noheader to include POST to uri‘s which begin with /ShoppingCart, type:
    grunt> weblogs_shopping_cart = FILTER weblogs_noheader BY method=='POST' AND STARTSWITH(uri,'/ShoppingCart')
  2. Next we join to iplookups type:
    grunt> cart_join = JOIN ip_noheader BY ipaddress, weblogs_shopping_cart BY ipaddress USING 'replicated';
  3. Then project only the columns we want to display:
    grunt> cart_addresses = FOREACH cart_join GENERATE ip_noheader::city, ip_noheader::state, ip_noheader::ipaddress, weblogs_shopping_cart::uri;
    NOTE: It helps to DESCRIBE the cart_join relation to better understand the available fields.
  4. When you type:
    grunt> DUMP cart_addresses;
    You should see:
    dump cart addresses

Saving Data with Store

Up to this point we’ve been DUMPing the output of our relations to the console. While this is useful while you’re developing your Pig script, at some point you may want to write out the relation to HDFS.

  1. This example stores the cart_addresses relation to HDFS using comma-delimited storage:
    grunt> STORE cart_addresses INTO 'unit07lab1/cart_addresses' USING PigStorage(',');
    This will execute a MapReduce job to evaluate the relation and write it to HDFS.
  2. We can now verify the data is in HDFS, type:
    grunt> cat 'unit07lab1/cart_addresses' and you should see:
    dump cart addresses

NOTE: You can use the PigStorage() method to LOAD or STORE data in a variety of delimiters!

Sessionization of Data

TODO: [Time Permitting] One common big data activity we do with clickstream data is Sessionization. ....

Test Yourself

NOTE: To Complete this section you might need to consult the Pig Latin reference manual here: http://pig.apache.org/docs/r0.16.0/basic.html

Write a series of Pig Latin scripts to complete each of the following:

  1. Use the ip_noheader to get a list of IP addresses in state CA.
  2. Use the weblogs_noheader relation to get IP Addresses which accessed loading.gif (Hint: in the uri)
  3. Use the weblogs_noheader relation to get a count of the different method in the log files (GET, POST, etc..)
  4. Join ip_noheader and weblogs_noheader to get a count of page requests by state

Part 3 On Your Own

Instructions

For this part of the lab, use the Chipolte Orders data set. The file is orders.tsv and its located in the chipotle folder.

Before you begin you should upload the orders.tsv file to HDFS in the unit07lab1/chipotle folder.

Questions

Answer each of the following questions by writing down the Pig Latin script to complete each task. You may need to write more than one relation to complete the task.

  1. Load chipotle orders into the relation orders using this schema: order_id:int, quantity:int, item_name:chararray, choice_description:chararray, item_price:chararray
  2. Create a relation cg to filter out only the orders of ‘Chips and Guacamole’.
  3. Create a relation cg_total as the total quantity of ‘Chips and Guacamole’ ordered. HINT: There should be 506.
  4. How customer orders had ‘Chicken Burrito’ as an item?
  5. What is the total quantity of ‘Chicken Burrito’s sold across all orders?