We presume that we have already setup a cluster with HDFS, Spark2 and Hive running. The easiest way to do this is through Apache Ambari.
If this is a raw HDFS installation (no previous setup), we need to create user home directory for the user that we are working from.
For example if we are using hdfs user to submit Spark jobs we need to create the corresponding home directory on HDFS to be used with that user:
hdfs dfs -mkdir /user/hdfs
Then we need to find example file with large text. I am using this file which I found in internet. It is only 6.2MB but is big enough for testing and it is still easy to copy it over the network.
Now we need to put this file in HDFS:
hdfs dfs -put big.txt /user/hdfs/big.txt
The script itself
# Some python imports # Just for using better print functionality from __future__ import print_function # Used to count to arguments given to the script import sys # Used in reduce phase just to be able to pass + operator as a reference from operator import add # Import Spark interface and Spark Hive integration from pyspark.sql import SparkSession, HiveContext if __name__ == "__main__": # Make sure we have enough arguments to run the job if len(sys.argv) != 2: print("Usage: wordcount <file>", file=sys.stderr) sys.exit(-1) # Create spark session with name PythonWordCount spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() # Convert file to RDD rdd = spark.sparkContext.textFile(sys.argv) # Split text on space counts = rdd.flatMap(lambda x: x.split(' ')) \ # Count every item .map(lambda x: (x, 1)) \ # Sum the counts reduced by the key (the word itself) .reduceByKey(add) \ # Sort by the second item (count) descending. False as a second argument means descending .sortBy(lambda x: x, False) # Get the results output = counts.collect() # Create Data Frame to be used for storing the results df = spark.createDataFrame(output) # We can also write the results back to HDFS in csv format #df.write.csv("hdfs:///user/hdfs/results.csv") # Create Hive context to interact with Hive hive_ctx = HiveContext(spark) # Create the table wordcount if it is not already present hive_ctx.sql("CREATE TABLE IF NOT EXISTS wordcount (value STRING, count INT)") # Write the results directly to Hive df.write.insertInto("wordcount") spark.stop()
What the script does is basically:
Create Spark session
Convert the text file to resilient distributed dataset (RDD) so after that we are able to do map/reduce jobs in parallel on the cluster
Execute flatMap which will “flatten“ the results: [[word => word], [word => word]] becomes [word, word]
Then map each word with number 1
Reduce all the words by combining them by key and sum the number associated with each occurrence (number 1 in our case see point 4) so the structure now becomes [[word => count], [word => count]]
We need the words sorted by occurrences and we perform sortBy operation with two arguments. The first one is the function to use for sorting and we return the value of the count. The second argument is the sorting order – False means descending
Then we need to create Data Frame to prepare the data to be stored in Hive
In the last step we just write the Data Frame directly to Hive
Submitting the job
Usually spark-submit command is automatically set in the user $PATH but if not just
cd to the script
$ cd /usr/hdp/current/spark-client
We can use YARN for resource manager which is the recommended (production) method for submitting the job
$ spark-submit --master yarn --deploy-mode client ./wordcount.py
Usually suitable for local testing. Will execute the job faster but won’t get any benefits for multi node environment.
local[*] setting will tell spark to use all available CPU cores.
$ spark-submit --master local[*] --deploy-mode client ./wordcount.py
Multiple versions of Spark are installed but SPARK_MAJOR_VERSION is not set Spark1 will be picked by default Traceback (most recent call last): File "/home/hdfs/spark_test/wordcount.py", line 23, in <module> from pyspark.sql import SparkSession, HiveContext ImportError: cannot import name SparkSession
If you see this error just set SPARK_MAJOR_VERSION to 2 like this: export SPARK_MAJOR_VERSION=2