Running standalone program in Spark

In this article we will walk through the steps to setup and run a standalone program in Apache Spark for 3 languages – Python, Scala and Java.

As a first step, let us go ahead and obtain some some ready made code that implements the WordCount algorithm from the Learning Spark examples git repository.

Step 1 – Create code

First, cd to the directory of your choice and run:

git clone https://github.com/holdenk/learning-spark-examples

This will checkout code under the directory learning-spark-examples.
You will also need to download the file we’re using for this example, Thomas Jefferson’s Gettysburg Address.

Step 2 – Package and run the application

Our WordCount code is nicely isolated in the directory

learning-spark-examples/mini-complete-example.

We will need to package this code as an application so we can distribute this to a Spark cluster. There are different steps for this for Java, Scala and Python

Step 2a – Java

In the case of Java we use Maven to package our code into a jar. Maven which is a package manager is configured via a pom.xml file. The contents of the file are shown below:

<project>
 <groupId>com.oreilly.learningsparkexamples.mini</groupId>
  <artifactId>learning-spark-mini-example</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>example</name>
  <packaging>jar</packaging>
  <version>0.0.1</version>
  <dependencies>
   <dependency> <!-- Spark dependency -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.2.0</version>
    <scope>provided</scope>
   </dependency>
 </dependencies>
 <properties>
  <java.version>1.6</java.version>
 </properties>
 <build>
  <pluginManagement>
   <plugins>
    <plugin>
     <groupId>org.apache.maven.plugins</groupId>
     <artifactId>maven-compiler-plugin</artifactId>
     <version>3.1</version>
     <configuration>
      <source>${java.version}</source>
      <target>${java.version}</target>
     </configuration>
    </plugin>
   </plugins>
  </pluginManagement>
 </build>
</project>

The most important sections with respect to the name of the jar running the application are as follows:

<project>
 <groupId>com.oreilly.learningsparkexamples.mini</groupId>
 <artifactId>learning-spark-mini-example</artifactId>
 ...
 <version>0.0.1</version>
 ...
</project>

The groupId is used to determine the package namespace, and the artifactId and version are used to formulate the name of the resulting jar file.

To compile, and package the application run :

cd learning-spark-examples/mini-complete-example
mvn clean && mvn compile && mvn package

This creates the following file in the target directory:
learning-spark-mini-example-0.0.1.jar
Now that our application has been packaged, we launch it using the script spark-submit under $SPARK_HOME/bin. The advantage of this script is that it sets up the CLASSPATH and any other dependencies as well as supporting multiple deploy modes and cluster managers. The arguments to spark-submit are as follows:

$SPARK_HOME/bin/spark-submit
 --class <main-class> \
 --master <master-url> \
 --deploy-mode <deploy-mode> \
 --conf <key>=<value>
 ...
 <application-jar> \
 [application args]

Our WordCount application code takes 2 arguments :

  • input file
  • output dir

Note that the argument to the JavaPairRDD counts object is actually a directory name to which the output is saved.

Thus we run the Java application as follows:

$SPARK_HOME/bin/spark-submit --class \ 
com.oreilly.learningsparkexamples.mini.java.WordCount \
./target/learning-spark-mini-example-0.0.1.jar \
 ./gettysburg.txt ./java_output

The result of running the code above is the creation of the java_output directory:

-rw-r--r-- 1 user user 646 Dec 15 04:59 part-00000
-rw-r--r-- 1 user user 764 Dec 15 04:59 part-00001
-rw-r--r-- 1 user user   0 Dec 15 04:59 _SUCCESS

The 1st 5 lines of output from the part-0000 file looks as follows:

head -5 java_output/part-00000
(earth,1)
(created,1)
(under,1)
(this,4)
(God,1)

Step 2b – Scala

Now we turn our attention to creating an application in Scala.
sbt is an open source build tool and is the equivalent of Ant/Maven. The equivalent to the pom.xml file is the build.sbt file. This file looks like this:

cat build.sbt
name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
)

Building using sbt results in a jar file similar to that done for Java using Maven. To build this jar file, we do the following:

sbt clean package

This generates the jar file which is written to the target directory by default:

...
[info] Packaging /home/user/devel/learning-spark/mini-complete-example/target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 384 s, completed Dec 12, 2015 4:16:21 AM

We can then run the Scala application as follows:

$SPARK_HOME/bin/spark-submit \
 --class com.oreilly.learningsparkexamples.mini.scala.WordCount \
 ./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
 ./gettysburg.txt ./scala_output

This writes its output into the scala_output directory in a manner similar to the Java case :

$ l -ltr scala_output/
total 8
-rw-r--r-- 1 user user 746 Dec 14 04:54 part-00001
-rw-r--r-- 1 user user 646 Dec 14 04:54 part-00000
-rw-r--r-- 1 user user   0 Dec 14 04:54 _SUCCESS

Step 2c – Python

The Python case is much simpler than Java or Scala.
There is no build/packaging stage. All that needs to be done is to specify a source file using the --py-files of spark-submit. In this example there was no Python script supplied so I wrote this script, wordcount.py:


import sys, argparse, os
from operator import add

from pyspark import SparkContext


parser = argparse.ArgumentParser(description="Run wordcount in Spark")
parser.add_argument('-f', help='input file',dest='fileName',required=True)
parser.add_argument('-o', help='output dir',dest='outdir',required=True)
args=parser.parse_args()

def main():
    count_words(args.fileName)

def count_words(filename):
    sc = SparkContext(appName="WordCount in Python")
    in_lines = sc.textFile(filename, 1)
    inter_counts = in_lines.flatMap(lambda line: line.split(' ')) \
     .map(lambda x: (x.rstrip(',.'),1)).reduceByKey(add)
    total_counts = inter_counts.collect()
    outdir = args.outdir
    if not os.path.exists(outdir):
        os.makedirs(outdir)
    outfile = "{0}/{1}".format(outdir,"py_wordcount.txt")
    print "Outputting to {0}".format(outfile)
    with open(outfile, 'w') as fh:
        for (word, count) in total_counts:
            fh.write("{0}: {1}\n".format(word, count))
sc.stop()

if __name__ == "__main__":
    main()

We can then run it as follows:

$SPARK_HOME/bin/spark-submit ./src/wordcount.py -f gettysburg.txt -o py_output

Refinements

The strings in the gettysburg.txt file need to be stripped of non-alphanumeric in order to get a correct word count, otherwise strings such as “dead,” and “dead” would be counted as separate words.
In the Java case, to fix this, you can make this change to
WordCount.java:

return new Tuple2(x.replaceAll("[^A-Za-z0-9]", ""), 1);

Similarly for Scala, make this change :

 val counts = words.map(word => (word.replaceAll("[^A-Za-z0-9]", ""), 1)).reduceByKey{case (x, y) => x + y}