Spark Code Cheatsheet

Word count in single line

 rdd.flatMap(line => line.split(" "))
 .map(word => (word,1)
 .reduceByKey((count,accum)=>(count+accum))

Count number of lines in file

rdd.count

Display Spark version

print sc.version

Read csv file into dataframe in spark with header

df=spark.read.option("header","true")
    .option("delimiter",",").csv(csv_path)

Read csv file into dataframe in spark without header
First you need to specify a schema:

schema = StructType([
    StructField("Field0", IntegerType()),
    StructField("Field1", StringType()),
    StructField("Field2", StringType())
    ])

Then read in the data:

df=spark.read.format('com.databricks.spark.csv')
   .load(csv_path, schema = schema)


Handle commas in embedded quoted strings when reading in data

This is done by specifying an escape option:

df_raw=spark.read.format('com.databricks.spark.csv')
     .load(csv_path, schema = schema, escape='"')

Obtain distinct values of column in Spark DF

student_ids=df_raw.select('student_id').distinct().take(200)

Convert list of DataFrame Rows to Python list

student_ids=[x.student_id for x in df_raw
            .select('student_id').distinct().take(200)]

Filter out rows based on values in list

filtered_df=df_raw.where(col('student_id').isin(student_ids))

Save dataframe to csv (only for small datasets)

# Many Files
filtered_df.write.csv(output_csv) 
# Single File
filtered_df.coalesce(1).write
.option("header","true").csv(output_csv) 

Count number of occurrences of composite key in data frame:

df.select('major','gender').distinct().count()

Start pyspark in python notebook mode

export PYSPARK_DRIVER_PYTHON=ipython;pyspark

How I got Cloudera Quickstart VM running on Google Compute Engine

Introduction

This is a brief synopsis of how I got the Cloudera Quickstart VM running via Docker on Google Compute Engine, which is Google’s cloud equivalent to Amazon’s AWS Cloud Computing Service.

The Cloudera Quickstart VM is a basic “Hadoop-In-A-Box” virtual machine solution which provides a Hadoop ecosystem for developers who wish to quickly test out the basic features of Hadoop without having to deploy an entire cluster. Since it doesn’t entail setting up a cluster, certain features provided by a cluster are missing.

Detailed Steps

Step 0.

Install gcloud on your local workstation as per these instructions:

https://cloud.google.com/sdk/

Step 1. Create a container optimized VM on GCE:

https://cloud.google.com/compute/docs/containers/container_vms

$ gcloud compute --project "myproject-1439" \
instances create "quickstart-instance-1" \
--image container-vm --zone "us-central1-a" \
 --machine-type "n1-standard-2"
Created [https://www.googleapis.com/compute/v1/projects/gcebook-1039/zones/us-central1-a/instances/quickstart-instance-1].NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP EXTERNAL_IP STATUS
quickstart-instance-1 us-central1-a n1-standard-2 10.240.0.2 146.148.92.36 RUNNING

I created an n1-standard-2 VM on GCE which has 2vCPUs and 7.GB RAM. It will already have docker pre-installed.

Step 3. Let’s check the image size:

$ docker images
REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
cloudera/quickstart latest 2cda82941cb7 41 hours ago 6.336 GB

Given that our VM has total disk size of 10GB this is cutting it a bit close for the long term, if we wish to install other software.

So let’s create a persistent disk and make that available for storing our docker image:

https://cloud.google.com/compute/docs/disks/persistent-disks

I was able to create a 200GB persistent disk: bigdisk1

Step 4.  Switch docker image installation directory to use the big persistent disk.

There are a couple of ways to do this as per this article:

https://forums.docker.com/t/how-do-i-change-the-docker-image-installation-directory/1169

The least trouble free way IMO was to mount the bigdisk1 persistent disk to it would be available for use by my VM, and move the default docker image installation directory to it.

First, create a mountpoint for the bigdisk:

$ sudo mkdir /mnt/bigdisk1

Next, mount it:
On GCE, the raw disk can be found at /dev/disk/by-id/google-<diskid>
i.e. /dev/disk/by-id/google-bigdisk1

$ sudo mount -o discard,defaults /dev/disk/by-id/google-bigdisk1 \
  /mnt/bigdisk1

Finally, symlink it back to the default image installation directory:

$ sudo ln -s /mnt/bigdisk1/docker/ /var/lib/docker

Presto, we’re now ready. If we run docker pull on any image, the image will be written to the large persistent disk:

$ ls -ltr /var/lib/docker
lrwxrwxrwx 1 root root 21 Apr 9 03:20 /var/lib/docker -> /mnt/bigdisk1/docker/

Step 5. Run the Cloudera Quickstart VM and get access to all the goodies:

http://www.cloudera.com/documentation/enterprise/5-5-x/topics/quickstart_docker_container.html

$ sudo docker run --hostname=quickstart.cloudera --privileged=true \
-t -i cloudera/quickstart  /usr/bin/docker-quickstart 
...
Starting zookeeper ... STARTED
starting datanode, logging to /var/log/hadoop-hdfs/hadoop-hdfs-datanode-quickstart.cloudera.out

...
starting rest, logging to /var/log/hbase/hbase-hbase-rest-quickstart.cloudera.out
Started HBase rest daemon (hbase-rest): [ OK ]
starting thrift, logging to /var/log/hbase/hbase-hbase-thrift-quickstart.cloudera.out
Started HBase thrift daemon (hbase-thrift): [ OK ]
Starting Hive Metastore (hive-metastore): [ OK ]
Started Hive Server2 (hive-server2): [ OK ]
Starting Sqoop Server: [ OK ]
Sqoop home directory: /usr/lib/sqoop2
Setting SQOOP_HTTP_PORT: 12000
...
Started Impala Catalog Server (catalogd) : [ OK ]
Started Impala Server (impalad): [ OK ]
[root@quickstart /]#

Spark Expositions: Understanding RDDs

Introduction

RDD’s or resilient distributed datasets are the fundamental data abstraction
in Apache Spark. An RDD amounts to what is a distributed dataset in memory.
It is distributed or partitioned among various nodes in a Spark cluster.
The official definition of an RDD in the official documentation is as follows:

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

In the seminal paper on RDDs by Matei Zaharia, an RDD is defined as follows:

Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.

The Resilient part is due to the fact that it is fault tolerant and hence if a
partition of the dataset is lost due to a node going done it can be recovered in a timely manner. The recoverability comes from the fact that its lineage graph is maintained by the driver program and hence the partition can be recomputed.

The lineage graph consists of a series of transformations and actions to be
computed on a partition of data and this is stored on the driver program.
The RDD in effect is a set of instructions which are lazily evaluated.
The evaluation only occurs when an action is encountered.
Here is an example of what an RDD’s lineage graph looks like:

scala> idBudgetRDD.toDebugString
res6: String = 
(2) MapPartitionsRDD[8] at map at <console>:35 []
 | MapPartitionsRDD[7] at filter at <console>:33 []
 | MapPartitionsRDD[3] at map at <console>:31 []
 | MapPartitionsRDD[2] at map at <console>:31 []
 | MapPartitionsRDD[1] at textFile at <console>:29 []

The Distributed part stems from the fact that the data is distributed among worker nodes of a cluster. The driver program sends the RDD along with which partition of the data that should be computed on that particular cluster. The program on the worker node that is responsible for executing the set of instructions encapsulated in the RDD is called the Executor. The exact call is as follows, in

core/src/main/scala/org/apache/spark/rdd/RDD.scala

@DeveloperApi
 def compute(split: Partition, context: TaskContext): Iterator[T]

The Dataset expresses the fact that we are in fact processing a collection of
data albeit one that will be partitioned.

Again, taking a look at the source code, in
core/src/main/scala/org/apache/spark/rdd/RDD.scala
we see :

/**
 * Implemented by subclasses to return the set of partitions in this    RDD. This method will only
 * be called once, so it is safe to implement a time-consuming 
   computation in it.
 */
 protected def getPartitions: Array[Partition]

Types of Operations

RDDs support 2 types of operations: Transformations and Actions.

Transformations
Transformations convert an RDD from one type to another.
They are lazily evaluated, meaning that they’re only executed when data is ready to be output.

Actions
Actions trigger the execution of the chain of operations that result in data being returned. They are necessary for transformations to be evaluated. Without an action, an RDD is just a chain of transformations that are yet to be evaluated.

Contents of an RDD

An RDD is characterized by the of the following 5 properties:

  1. A list of partitions that comprise the dataset.
  2. A function to perform the computation for each partition.
  3. A list of dependencies on other RDDs i.e. parent RDDs.
    The parent RDD is the initial RDD on which a transformation is
    executed.
  4. A Partitioner for key-value/Pair RDDs (Pair RDDs are defined later).
  5. A list of preferred locations/hosts to load the various partitions into
    which the data has been split.

Where does the RDD live ?

The RDD lives in the Spark Context on the driver program which runs on the master node in a cluster setup.

It is uniquely identified by an id :

scala> val exampleRDD = sc.parallelize(List(1,2,3,4,5))
exampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> exampleRDD.id
res34: Int = 12 

scala> val example2RDD = sc.parallelize(List(1,2,3,4,5))
example2RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:27

scala> example2RDD.id
res36: Int = 13

scala> val example3RDD=exampleRDD
example3RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> example3RDD.id
res37: Int = 12

 

Ways to create an RDD

  1. Parallelize an existing collection e.g.
    primesRDD = sc.parallelize([2,3,5,7,11,13,17])
  2. Read data from an external distributed source e.g. HDFS, S3, Cassandra
    e.g.

     gettysburgRDD = sc.textFile("hdfs://user/dataphantik/gettysburg.txt"
    
  3. From an existing RDD – transforming an existing RDD into new RDD
    errorsRDD = baseRDD.filter(lambda line : line contains 'Error')
  4. Via Spark Streaming

Types of RDDs

The base class of all RDDs, org.apache.spark.rdd.RDD is an abstract class which all base RDDs inherit from.

The base RDD classes are:

  • HadoopRDD – Provides core functionality for reading data stored in Hadoop (e.g
    files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.
    apache.hadoop.mapred`
  • MapPartitionsRDD – applies the provided function to every partition of the
    parent RDD. Normally returned when an RDD is created from a file via sc.textFile(..)
  • ParallelCollectionRDD – RDD representing a collection of elements. It containe
    s numSlices Partitions and locationPrefs, which is a Map. It is obtained from call to sc.parallelize(..) on a collection in memory.
  • PipedRDD – An RDD that pipes the contents of each parent partition through an e
    xternal command (printing them one per line) and returns the output as a collect
    ion of strings.

There are also depictions of

  • PairRDD – an RDD of key-value pairs. It can be a ParallelCollectionRDD containin
    g key-value pairs. There is no concrete PairRDD class, since it is an abstraction, however a class, PairRDDFunctions provides a set of transformations that can
    be performed on Pair RDDs.

References:

Useful Distributed Systems Links

Distributed Systems is a constantly changing field. This page attempts to keep track of sites or blogs that are frequently updated and are chock full of useful information to folks interested in keeping up with the start of the art technologies.

Blogs

Framework Sites

Distributed Systems Concepts

Quick Introduction to Apache Spark

What is Spark

Spark is a fast and general purpose framework for cluster computing.
It is written in Scala but is available in Scala, Java and Python.
The main data abstraction in Spark is the RDD, or Resilient Distributed Dataset.
With this abstraction, Spark enables data to be distributed and processed among
the many nodes of a computing cluster.
It provides both interactive and programmatic interfaces. It consists of the following components:

  • Spark Core – the foundational classes on which Spark is built. It provides the RDD.
  • Spark Streaming – a protocol for processing streaming data
  • Spark SQL – an API for handling structured data
  • MLLib – a Machine Learning library
  • GraphX – a Graph library

Cluster Manager

In order to operate, in production mode Spark needs a cluster manager that manages data distribution, task scheduling and fault tolerance among the various nodes.
There are 3 types supported – Apache Mesos, Hadoop YARN and Spark standalone.

Spark Fundamentals

Spark, as has been previously defined is a framework for fast and general purpose cluster computing. Its fundamental abstraction is the RDD – the resilient distributed dataset which means that it is inherently parallelizable among the nodes and cores of a computing cluster. The RDD is held entirely in RAM.

When data is read into Spark, it is read into an RDD. Once it is read into an
RDD it can be operated on. There are 2 distinct types of operations on RDDs:

1. Transformations

Transformations are used to convert data in the RDD to another form. The result of a transformation is another RDD. Examples of transformations are:

  • map()– takes a function as argument and applies the function to each item/element of the RDD
  • flatMap() – takes a function as argument and applies the function to each element while “flattening” the results into a single level collection.
  • filter() – takes a boolean expression and returns an RDD with rows for which the boolean expression is true. e.g. lines of a file which contain the string “Obama”
  • countByKey() – given a Pair/map RDD i.e. with Key value pairs, return another Pair RDD with counts by key.

2. Actions

Actions are operations on an RDD which result in some sort of output that is not an RDD e.g. a list, DataFrame, or output to the screen. Examples of action operations are:

  • collect() – Applies the various transformations to an RDD then returns the result as a collection.
  • count – returns a count of the number of elements in an RDD
  • reduce() – takes a function and repeatedly applies it to the elements of the
    RDD to produce a single output value

RDDs and Lazy Evaluation

A fundamental idea in Spark’s implementation is the application of lazy evaluation and this is implemented for all Spark transformation operations.
Thus an RDD is fundamentally a data abstraction so, when we call say :

scala> val rdd = sc.parallelize(Seq(1,3,4,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :21

scala> val mappedRDD = rdd.map(x => x*x)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :23

what we’re getting as mappedRDD is just an an expression that hasn’t been evaluated. This expression is essentially a record of a sequence operations that need to be evaluated i.e. parallelize –> map
This expression amounts to what is known as a lineage graph and can be seen as follows :

scala> mappedRDD.toDebugString
res4: String = 
(4) MapPartitionsRDD[1] at map at :23 []
 |  ParallelCollectionRDD[0] at parallelize at :21 []

Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problems I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
...
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]: topTweets.show()
+--------------------+------------+
|                text|retweetCount|
+--------------------+------------+
|Adventures With C...|           0|
+--------------------+------------+

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"http://twitter.com\" rel\u003d\"nofollow\"\u003eTwitter Web Client
...

However, for a json file that looks like this:

{
  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
   {
    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null
   }, 
...

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
         root
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends
          DataFrame[]

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
          multiline_rdd=sc.wholeTextFiles(inputFile)
          type(multiline_rdd)
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
Out[102]:[(u'file:/home/myuser/twitter_trends.json',
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }
...

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = multiline_rdd.map(lambda x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \
                    flags=re.UNICODE))

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
Out[105]:
[u'{"created_at":"2015-12-23T23:39:06Z","trends":[{"url":"http://twitter.com/search?q=%23HappyBirthdayLouis","query":"%23HappyBirthdayLouis","tweet_volume":658249,"name":"#HappyBirthdayLouis","promoted_content":null},{"url":"http://twitter.com/search?q=%23ShadeHour","query":"%23ShadeHour","tweet_volume":null,"name":"#ShadeHour","promoted_content":null},
,,,
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          root
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)
          ...

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
                                    tweet_trends")
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
Out[122]:DataFrame[locations:array<struct<name:string,woeid:bigint>>]
In [123]: trends_data.show()
          +--------------------+
          |           locations|
          +--------------------+
          |List([SouthAfrica...|
          +--------------------+

How to set up an Apache Spark Cluster

I now detail the steps I took to setup an Apache Spark Cluster on my home network.
I have the following 2 hosts:

  • G50 Lenovo laptop running Ubuntu 15.04 (slave/worker)
  • iMac desktop running Mac OS X 10 (master)

Step 1 – Install Spark into same location on both machines

My first step was to make sure I installed Apache Spark into the same location : /opt/spark-1.4.1
The steps for doing this are detailed here: How to install Apache Spark

Step 2 – Modify the config file on master machine

Edit the following file on the master machine, making sure to add
the host name of the slave machine: $SPARK_HOME/conf/slaves

Step 3 – Setup ssh password-less login

The master needs to be able to login without a password on the slave machine. To enable this, run the following on the master machine:

ssh-keygen -t dsa
Enter file in which to save the key (/home/youruser/.ssh/id_dsa): [ENTER]
Enter passphrase (empty for no passphrase): [EMPTY]
Enter same passphrase again: [EMPTY]

Next, copy this file from the master to the the slave machine
/home/youruser/.ssh/id_dsa.pub

and then run:

$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
$ chmod 644 ~/.ssh/authorized_keys

 

Step 4 – Start the cluster from the master machine

Run the following: sbin/start-all.sh

This should start up the cluster and produce output that looks like
this:

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.master.Master-1-macdesk.fios-router.home.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-macdesk.fios-router.home.out
fembuntu: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-fembuntu.out

Stopping the cluster

The cluster can be stopped as follows: sbin/stop-all.sh
and produces output that looks like:

localhost: stopping org.apache.spark.deploy.worker.Worker
fembuntu: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master

Monitoring from the master web ui

The cluster can be monitored from the web ui of the master node which is accessible from port 8080 (default). It should look similar to the following:

spark_web_ui

Troubleshooting

I obtained the following errors on the slave machine:

16/01/03 05:40:24 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@master:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: master/192.269.8.153:7077
16/01/03 05:41:30 ERROR Worker: All masters are unresponsive! Giving up.

The problem is that I had 2 separate ip addresses pointing to the
same hostname for my slave machine (it was both on the wifi network and connected via ethernet as well).

Reference links

How to install Apache Spark

Mac OS/X

Installing Spark on Mac OS/X will involve downloading the required
archive files into the final location where we want to run Spark and also Scala from. On my machine that directory location is:
/Users/myuser/local

  1. cd to destination directory:
    cd /Users/myuser/local
  2. Download Spark from http://spark.apache.org/downloads.html
    wget http://apache.arvixe.com/spark/spark-1.4.1/spark-1.4.1.tgz
  3. Download scala from http://www.scala-lang.org/download/
    wget http://downloads.typesafe.com/scala/2.11.7/scala-2.11.7.tgz
    
  4. Uncompress the spark and scala archive files you downloaded:
    tar -xvzf spark-1.4.1.tgz
    tar -xvzf scala-2.11.7.tgz
    

    The above step uncompresses the file contents into these directories: spark-1.4.1, scala-2.11.7

  5. Set the following environment variables – SCALA_HOME, SPARK_HOME, PATH:

    export SCALA_HOME=/Users/myuser/local/scala-2.11.7
    export SPARK_HOME=/Users/myuser/local/spark-1.4.1
    export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
    
  6. Run sbt – Scala build tool
    cd $SPARK_HOME
    build/sbt clean assembly
    
  7. Run spark shell
     ./bin/spark-shell 

    Sometimes you may get an error about an invalid sbt-launch file.
    To resolve this do the following:

    cd build
    wget http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/sbt-launch.jar
    mv sbt-launch.jar sbt-launch.0.13.7.jar
    

Reference Links

Running standalone program in Spark

In this article we will walk through the steps to setup and run a standalone program in Apache Spark for 3 languages – Python, Scala and Java.

As a first step, let us go ahead and obtain some some ready made code that implements the WordCount algorithm from the Learning Spark examples git repository.

Step 1 – Create code

First, cd to the directory of your choice and run:

git clone https://github.com/holdenk/learning-spark-examples

This will checkout code under the directory learning-spark-examples.
You will also need to download the file we’re using for this example, Thomas Jefferson’s Gettysburg Address.

Step 2 – Package and run the application

Our WordCount code is nicely isolated in the directory

learning-spark-examples/mini-complete-example.

We will need to package this code as an application so we can distribute this to a Spark cluster. There are different steps for this for Java, Scala and Python

Step 2a – Java

In the case of Java we use Maven to package our code into a jar. Maven which is a package manager is configured via a pom.xml file. The contents of the file are shown below:

<project>
 <groupId>com.oreilly.learningsparkexamples.mini</groupId>
  <artifactId>learning-spark-mini-example</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>example</name>
  <packaging>jar</packaging>
  <version>0.0.1</version>
  <dependencies>
   <dependency> <!-- Spark dependency -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.2.0</version>
    <scope>provided</scope>
   </dependency>
 </dependencies>
 <properties>
  <java.version>1.6</java.version>
 </properties>
 <build>
  <pluginManagement>
   <plugins>
    <plugin>
     <groupId>org.apache.maven.plugins</groupId>
     <artifactId>maven-compiler-plugin</artifactId>
     <version>3.1</version>
     <configuration>
      <source>${java.version}</source>
      <target>${java.version}</target>
     </configuration>
    </plugin>
   </plugins>
  </pluginManagement>
 </build>
</project>

The most important sections with respect to the name of the jar running the application are as follows:

<project>
 <groupId>com.oreilly.learningsparkexamples.mini</groupId>
 <artifactId>learning-spark-mini-example</artifactId>
 ...
 <version>0.0.1</version>
 ...
</project>

The groupId is used to determine the package namespace, and the artifactId and version are used to formulate the name of the resulting jar file.

To compile, and package the application run :

cd learning-spark-examples/mini-complete-example
mvn clean && mvn compile && mvn package

This creates the following file in the target directory:
learning-spark-mini-example-0.0.1.jar
Now that our application has been packaged, we launch it using the script spark-submit under $SPARK_HOME/bin. The advantage of this script is that it sets up the CLASSPATH and any other dependencies as well as supporting multiple deploy modes and cluster managers. The arguments to spark-submit are as follows:

$SPARK_HOME/bin/spark-submit
 --class <main-class> \
 --master <master-url> \
 --deploy-mode <deploy-mode> \
 --conf <key>=<value>
 ...
 <application-jar> \
 [application args]

Our WordCount application code takes 2 arguments :

  • input file
  • output dir

Note that the argument to the JavaPairRDD counts object is actually a directory name to which the output is saved.

Thus we run the Java application as follows:

$SPARK_HOME/bin/spark-submit --class \ 
com.oreilly.learningsparkexamples.mini.java.WordCount \
./target/learning-spark-mini-example-0.0.1.jar \
 ./gettysburg.txt ./java_output

The result of running the code above is the creation of the java_output directory:

-rw-r--r-- 1 user user 646 Dec 15 04:59 part-00000
-rw-r--r-- 1 user user 764 Dec 15 04:59 part-00001
-rw-r--r-- 1 user user   0 Dec 15 04:59 _SUCCESS

The 1st 5 lines of output from the part-0000 file looks as follows:

head -5 java_output/part-00000
(earth,1)
(created,1)
(under,1)
(this,4)
(God,1)

Step 2b – Scala

Now we turn our attention to creating an application in Scala.
sbt is an open source build tool and is the equivalent of Ant/Maven. The equivalent to the pom.xml file is the build.sbt file. This file looks like this:

cat build.sbt
name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
)

Building using sbt results in a jar file similar to that done for Java using Maven. To build this jar file, we do the following:

sbt clean package

This generates the jar file which is written to the target directory by default:

...
[info] Packaging /home/user/devel/learning-spark/mini-complete-example/target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 384 s, completed Dec 12, 2015 4:16:21 AM

We can then run the Scala application as follows:

$SPARK_HOME/bin/spark-submit \
 --class com.oreilly.learningsparkexamples.mini.scala.WordCount \
 ./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
 ./gettysburg.txt ./scala_output

This writes its output into the scala_output directory in a manner similar to the Java case :

$ l -ltr scala_output/
total 8
-rw-r--r-- 1 user user 746 Dec 14 04:54 part-00001
-rw-r--r-- 1 user user 646 Dec 14 04:54 part-00000
-rw-r--r-- 1 user user   0 Dec 14 04:54 _SUCCESS

Step 2c – Python

The Python case is much simpler than Java or Scala.
There is no build/packaging stage. All that needs to be done is to specify a source file using the --py-files of spark-submit. In this example there was no Python script supplied so I wrote this script, wordcount.py:


import sys, argparse, os
from operator import add

from pyspark import SparkContext


parser = argparse.ArgumentParser(description="Run wordcount in Spark")
parser.add_argument('-f', help='input file',dest='fileName',required=True)
parser.add_argument('-o', help='output dir',dest='outdir',required=True)
args=parser.parse_args()

def main():
    count_words(args.fileName)

def count_words(filename):
    sc = SparkContext(appName="WordCount in Python")
    in_lines = sc.textFile(filename, 1)
    inter_counts = in_lines.flatMap(lambda line: line.split(' ')) \
     .map(lambda x: (x.rstrip(',.'),1)).reduceByKey(add)
    total_counts = inter_counts.collect()
    outdir = args.outdir
    if not os.path.exists(outdir):
        os.makedirs(outdir)
    outfile = "{0}/{1}".format(outdir,"py_wordcount.txt")
    print "Outputting to {0}".format(outfile)
    with open(outfile, 'w') as fh:
        for (word, count) in total_counts:
            fh.write("{0}: {1}\n".format(word, count))
sc.stop()

if __name__ == "__main__":
    main()

We can then run it as follows:

$SPARK_HOME/bin/spark-submit ./src/wordcount.py -f gettysburg.txt -o py_output

Refinements

The strings in the gettysburg.txt file need to be stripped of non-alphanumeric in order to get a correct word count, otherwise strings such as “dead,” and “dead” would be counted as separate words.
In the Java case, to fix this, you can make this change to
WordCount.java:

return new Tuple2(x.replaceAll("[^A-Za-z0-9]", ""), 1);

Similarly for Scala, make this change :

 val counts = words.map(word => (word.replaceAll("[^A-Za-z0-9]", ""), 1)).reduceByKey{case (x, y) => x + y}