Unit 08 Lab 1: Spark (PySpark)

Part 1: Overview

About Title

In this lab we will learn the Spark distributed computing framework.

Learning Outcomes

Upon completing this lab you will be able to: - Program in Spark with the Python Language - Demonstrate how to read and process data using Spark - Compare and contrast RDD and Dataframes. - Explain the difference between SQLContext and HiveContext - Write Spark output to HDFS and create Hive tables from that output.

Requirements

To complete this lab you will need:

NOTE: We can’t teach you all the Spark commands, so we highly recommend referring to the Spark Documentation here: https://spark.apache.org/docs/1.4.1/

Before You Begin

Before you start this lab you should:

From your Hadoop Client:

  1. Open a linux command prompt.
  2. Create a folder for the lab:
    $ hdfs dfs -mkdir unit08lab1
  3. Make sure Hive has permissions to this folder:
    $ hdfs dfs -chmod -R 777 unit08lab1
  4. Create a folder for the ufo dataset:
    $ hdfs dfs -mkdir ufo
  5. Upload the UFO Sightings dataset:
    $ hdfs dfs -put ~/datasets/ufo-sightings/* unit08lab1/ufo
  6. Create a folder for the preamble.txt:
    $hdfs dfs -mkdir unit08lab1/preamble
  7. Upload the preamble.txt file:
    $ hdfs dfs -put ~/datasets/text/preamble.txt unit08lab1/preamble
  8. Set your $SPARK_HOME environment variable:
    $ export SPARK_HOME=/usr/hdp/current/spark-client/
  9. Start jupyter notebook:
    $ jupyter-notebook
  10. Open a browser and go to http://localhost:8888
    You should see the Jupyter Application in your browser:
    jupyter

Part 2: Walk-Though

Necessary for spark:

import findspark     
findspark.init()
import pyspark
sc = pyspark.SparkContext()

Understanding Spark RDD’s

The goal of this part of the Walk-Though is to help you understand how Spark RDD’s (Resilient distributed datasets) are used in distributed processing. We will use the classic MapReduce example of “Wordcount” but instead write it as a PySpark program.

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()
data = sc.textFile('/user/ischool/unit08lab1/preamble')
data.collect()
words = data.flatMap( lambda line: line.lower().replace(',','').split(' '))
words.take(10)
wordsmap = words.map(lambda word: (word,1))
wordsmap.take(10)
wordCounts = wordsmap.reduceByKey( lambda a, b: a+b)
wordCounts.take(20)
wordCounts.filter(lambda tup: tup[1] >1).sortByKey().collect()
data = sc.textFile('/user/ischool/unit08lab1/preamble')
words = data.flatMap( lambda line: line.lower().replace(',','').split(' '))
wordsmap = words.map(lambda word: (word,1))
wordCounts = wordsmap.reduceByKey( lambda a, b: a+b)
wordCounts.filter(lambda tup: tup[1] >1).sortByKey().collect()

Data Manipulation With Spark

In this part we Walk-Though a typical example of how to manipulate data with Spark using the UFO Sightings dataset.

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()
data = sc.textFile('/user/ischool/unit08lab1/ufo')
data.take(5)
import csv
import string
exclude = set(string.punctuation)

def parseLine(line):
    csvdata = csv.reader([line], delimiter=',', quotechar='"')
    for fields in csvdata:
        try:
            summary = ''.join(ch for ch in fields[5].lower() if ch not in exclude)
            return { "City" : fields[1] , "State": fields[2], "Summary" : summary}
        except:
            return { "City" : "City", "State" :"State", "Summary": "Summary"}
ufo_data = data.map(parseLine).filter(lambda ufo: ufo['City'] != 'City')
ufo_data.take(5)

The filter() method omits rows with value 'City' in the dictionary. These are the header and error rows. - When you execute this cell you should see a Python list of Dictionary containing City, State and Summary:
ufo extraction
- At this point we might want to analyze word counts in the summary text to determine if there are any significant recurring words. This exercise will be left to the reader. One thing that stood out to me as I my own analysis was the number of occurrences of colors and directions among summary text. For example this Spark program will get you a count of directions (north, south, east, west) from the summaries:

ufo_summaries = ufo_data.map( lambda ufo: ufo['Summary'])
ufo_words = ufo_summaries.flatMap(lambda summary: summary.split(' ')).map(lambda word: (word,1))
ufo_direction = ufo_words.reduceByKey(lambda a,b: a+b).filter( lambda tup: tup[0] in ('north','south','east','west'))
ufo_direction.collect()

When you execute the code you will see this output:
directions word count
So among 1939 Sightings, 93 had the word “South” in their summaries. This is about a 4.7% occurrence. - From this analysis, we determine which terms we want to extract and create a new method parseLineExtractColorAndDirection to extract the color of the UFO (red, blue, etc…) and the direction from the summary text. If more than one term appears in the text e.g. “A red and blue triangle crossed the sky” this gets labeled as “multiple” colors. Here’s the code, which you should paste into a new cell and execute:

import csv
import string
exclude = set(string.punctuation)
colors = ('sliver','gold','white','black','green','purple','orange','yellow','red','blue')
directions =('north','south','east','west')

def parseLineExtractColorAndDirection(line):
    csvdata = csv.reader([line], delimiter=',', quotechar='"')
    for fields in csvdata:
        try:
            summary = ''.join(ch for ch in fields[5].lower() if ch not in exclude)
            colorcounts = [1 if summary.find(str(c)) != -1 else 0 for c in colors]
            if (sum(colorcounts)==0):
                color='none'
            elif (sum(colorcounts)>1):
                color='multiple'
            else:
                color = colors[colorcounts.index(1)]
            directioncounts = [1 if summary.find(str(d)) != -1 else 0 for d in directions]
            if (sum(directioncounts)==0):
                direction='none'
            elif (sum(directioncounts)>1):
                direction='multiple'
            else:
                direction = directions[directioncounts.index(1)]

            return { "City" : fields[1] , "State": fields[2], "Color" : color,  "Direction": direction, "Summary" : summary}
        except:
            return { "City" : "City"  }
ufo_data = data.map(parseLineExtractColorAndDirection) #.filter(lambda ufo: ufo['City'] != 'City')
ufo_data.take(5)

It might be useful to store this data back into HDFS or into HCatalog. This way others can analyze it with Pig, Hive and Zeppelin. We will do this in our final Walk-Though.

Spark DataFrames, SQL Context and Hive Context

In this final part of the Walk-Though we explore Spark DataFrames and the SQL Context. We will also look at Hive Context and see how its different from SQL Context. We will continue on with our example from the previous Walk-Though and work with our term-extracted UFO Sightings dataset.

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()
import csv
import string
exclude = set(string.punctuation)
colors = ('sliver','gold','white','black','green','purple','orange','yellow','red','blue')
directions =('north','south','east','west')

def parseLineExtractColorAndDirection(line):
    csvdata = csv.reader([line], delimiter=',', quotechar='"')
    for fields in csvdata:
        try:
            summary = ''.join(ch for ch in fields[5].lower() if ch not in exclude)
            colorcounts = [1 if summary.find(str(c)) != -1 else 0 for c in colors]
            if (sum(colorcounts)==0):
                color='none'
            elif (sum(colorcounts)>1):
                color='multiple'
            else:
                color = colors[colorcounts.index(1)]
            directioncounts = [1 if summary.find(str(d)) != -1 else 0 for d in directions]
            if (sum(directioncounts)==0):
                direction='none'
            elif (sum(directioncounts)>1):
                direction='multiple'
            else:
                direction = directions[directioncounts.index(1)]

            return { "City" : fields[1] , "State": fields[2], "Color" : color,  "Direction": direction, "Summary" : summary}
        except:
            return { "City" : "City"  }
from pyspark.sql import SQLContext
from pyspark.sql import Row
sql = SQLContext(sc)
def toRow(dic):
    return Row(City=dic['City'],State=dic['State'],Color=dic['Color'],Direction=dic['Direction'],Summary=dic['Summary'])
data = sc.textFile('/user/ischool/unit08lab1/ufo')
ufo_data = data.map(parseLineExtractColorAndDirection).filter(lambda d: d['City'] != 'City')
ufo_dataframe = ufo_data.map(toRow).toDF()
ufo_dataframe.show(5)
ufo_dataframe.toPandas().head(5)
ufo_dataframe.printSchema()
ufo_dataframe.groupBy(ufo_dataframe.Color).count().toPandas().sort_values('count')
ufo_dataframe.registerTempTable('ufo')
sql.sql("SELECT Color, count(*) as ColorCount FROM ufo GROUP BY Color ORDER BY ColorCount DESC").toPandas()

HiveContext

Temporary tables are useful as they allow us to express Spark code in SQL, but have two limitations. They only exist for the Spark Session, and they are not accessible by other users, either. The solution to this problem is Spark’s HiveContext which as the name implies provides Spark with access to Hive through HCatalog. We will use HiveContext to write our ufo_dataframe to HDFS, create an external Hive table, then from it.

from pyspark.sql import HiveContext
hive = HiveContext(sc)
hive.sql('CREATE DATABASE IF NOT EXISTS unit08lab1')
hive.sql('USE unit08lab1')
hive.registerDataFrameAsTable(ufo_dataframe,'ufo_temp')
ufo_hive = hive.sql('SELECT * FROM ufo_temp')
ufo_hive.write.save('/user/ischool/unit08lab1/ufo_orc', format='orc')
  1. open up a Linux Command Prompt,
  2. Type: $ hdfs dfs -ls unit08lab1/ufo_orc
    you will see .orc files created from this command.
hive.sql("""CREATE EXTERNAL TABLE IF NOT EXISTS ufo_orc
    ( City string, Color string, Direction string, State string, Summary string)
    STORED AS ORC LOCATION '/user/ischool/unit08lab1/ufo_orc' """)
  1. Start hive $ hive
  2. Query our new table: hive> SELECT * FROM unit08lab1.ufo_orc LIMIT 5;
    You will see our first 5 rows again.

Test Yourself

  1. What is SparkContext?
  2. Spark uses lazy execution like Pig. What does this mean?
  3. Explain the concept behind a RDD.
  4. Explain the difference between a Spark RDD and a DataFrame. What are the advantages and disdvantages of each?
  5. Explain the difference between a SQLContext and a HiveContext
  6. Which Spark object as a map() method? RDD or DataFrame?
  7. How do you convert a DataFrame back to an RDD?
  8. Describe the process to convert an RDD to a DataFrame.

Part 3 On Your Own

Instructions

  1. Load the exam-scores.csv from datasets/exam-scores to the HDFS folder unit08lab1/exam-scores. This dataset will be used in this section of the lab.
  2. Create a new Jupyter Notebook name it unit08lab1-Part3 and set it up to use SparkContext and HiveContext.

Questions

Answer each of the following by writing a spark program in a new cell of your notebook.

  1. Create an RDD called scores to read from exam-scores into scores RDD. Include a screenshot of your code and the top 5 rows. Your output should look like this:
    scores output
  2. Write a parseLine method to split the comma-delimited row and create a DataFrame Row of Fields: ClassSection, ExamVersion, CompletionTime, Score, LetterGrade.
  3. Use your parseLine method to get a DataFrame called scores_df. Filter out the header row, convert it to Pandas and output the first 5 rows. Output should look like this:
    scores pandas output
  4. Register the scores_df DataFrame as a Hive temporary table scores_temp. Execute an SQL statement to produce the average Score by ExamVersion store the output in the DataFrame avg_score_df And display the output using Pandas. Should look like this:
    average score dataframe output
  5. Write the avg_score_df to /user/ischool/unit08lab1/avg_scores_orc as an ORC file.
  6. Create an External table from /user/ischool/unit08lab1/avg_scores_orc as the hive table unit08lab1.avg_scores_orc Make sure you select the proper data types for each field.
    NOTE: If you get a permission denied error, change permissions on the HDFS folder.$ hdfs dfs -chmod 777 -R unit08lab1
  7. Use the Hive Client to query the table and demonstrate it is a persistent table. Provide a screenshot of the query and output.