Spark Expositions: Understanding RDDs


RDD’s or resilient distributed datasets are the fundamental data abstraction
in Apache Spark. An RDD amounts to what is a distributed dataset in memory.
It is distributed or partitioned among various nodes in a Spark cluster.
The official definition of an RDD in the official documentation is as follows:

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

In the seminal paper on RDDs by Matei Zaharia, an RDD is defined as follows:

Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.

The Resilient part is due to the fact that it is fault tolerant and hence if a
partition of the dataset is lost due to a node going done it can be recovered in a timely manner. The recoverability comes from the fact that its lineage graph is maintained by the driver program and hence the partition can be recomputed.

The lineage graph consists of a series of transformations and actions to be
computed on a partition of data and this is stored on the driver program.
The RDD in effect is a set of instructions which are lazily evaluated.
The evaluation only occurs when an action is encountered.
Here is an example of what an RDD’s lineage graph looks like:

scala> idBudgetRDD.toDebugString
res6: String = 
(2) MapPartitionsRDD[8] at map at <console>:35 []
 | MapPartitionsRDD[7] at filter at <console>:33 []
 | MapPartitionsRDD[3] at map at <console>:31 []
 | MapPartitionsRDD[2] at map at <console>:31 []
 | MapPartitionsRDD[1] at textFile at <console>:29 []

The Distributed part stems from the fact that the data is distributed among worker nodes of a cluster. The driver program sends the RDD along with which partition of the data that should be computed on that particular cluster. The program on the worker node that is responsible for executing the set of instructions encapsulated in the RDD is called the Executor. The exact call is as follows, in


 def compute(split: Partition, context: TaskContext): Iterator[T]

The Dataset expresses the fact that we are in fact processing a collection of
data albeit one that will be partitioned.

Again, taking a look at the source code, in
we see :

 * Implemented by subclasses to return the set of partitions in this    RDD. This method will only
 * be called once, so it is safe to implement a time-consuming 
   computation in it.
 protected def getPartitions: Array[Partition]

Types of Operations

RDDs support 2 types of operations: Transformations and Actions.

Transformations convert an RDD from one type to another.
They are lazily evaluated, meaning that they’re only executed when data is ready to be output.

Actions trigger the execution of the chain of operations that result in data being returned. They are necessary for transformations to be evaluated. Without an action, an RDD is just a chain of transformations that are yet to be evaluated.

Contents of an RDD

An RDD is characterized by the of the following 5 properties:

  1. A list of partitions that comprise the dataset.
  2. A function to perform the computation for each partition.
  3. A list of dependencies on other RDDs i.e. parent RDDs.
    The parent RDD is the initial RDD on which a transformation is
  4. A Partitioner for key-value/Pair RDDs (Pair RDDs are defined later).
  5. A list of preferred locations/hosts to load the various partitions into
    which the data has been split.

Where does the RDD live ?

The RDD lives in the Spark Context on the driver program which runs on the master node in a cluster setup.

It is uniquely identified by an id :

scala> val exampleRDD = sc.parallelize(List(1,2,3,4,5))
exampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

res34: Int = 12 

scala> val example2RDD = sc.parallelize(List(1,2,3,4,5))
example2RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:27

res36: Int = 13

scala> val example3RDD=exampleRDD
example3RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

res37: Int = 12


Ways to create an RDD

  1. Parallelize an existing collection e.g.
    primesRDD = sc.parallelize([2,3,5,7,11,13,17])
  2. Read data from an external distributed source e.g. HDFS, S3, Cassandra

     gettysburgRDD = sc.textFile("hdfs://user/dataphantik/gettysburg.txt"
  3. From an existing RDD – transforming an existing RDD into new RDD
    errorsRDD = baseRDD.filter(lambda line : line contains 'Error')
  4. Via Spark Streaming

Types of RDDs

The base class of all RDDs, org.apache.spark.rdd.RDD is an abstract class which all base RDDs inherit from.

The base RDD classes are:

  • HadoopRDD – Provides core functionality for reading data stored in Hadoop (e.g
    files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.
  • MapPartitionsRDD – applies the provided function to every partition of the
    parent RDD. Normally returned when an RDD is created from a file via sc.textFile(..)
  • ParallelCollectionRDD – RDD representing a collection of elements. It containe
    s numSlices Partitions and locationPrefs, which is a Map. It is obtained from call to sc.parallelize(..) on a collection in memory.
  • PipedRDD – An RDD that pipes the contents of each parent partition through an e
    xternal command (printing them one per line) and returns the output as a collect
    ion of strings.

There are also depictions of

  • PairRDD – an RDD of key-value pairs. It can be a ParallelCollectionRDD containin
    g key-value pairs. There is no concrete PairRDD class, since it is an abstraction, however a class, PairRDDFunctions provides a set of transformations that can
    be performed on Pair RDDs.


Useful Distributed Systems Links

Distributed Systems is a constantly changing field. This page attempts to keep track of sites or blogs that are frequently updated and are chock full of useful information to folks interested in keeping up with the start of the art technologies.


Framework Sites

Distributed Systems Concepts

Pandas Cheatsheet

Reset index for data frame based on specific column


Show null columns of dataframe

df.ix[df.index[(df.T==np.nan).sum() > 1]]

Sum specific column of dataframe, excluding NULLs


Loop over rows of dataframe
With iterrows (produces index, columns as a Series):

for idx, row in df.iterrows():
         print idx, row['a'], row['col2'], row['col3']

With itertuples ( produces tuple of index and column values):

for irow in df.itertuples():
         print irow

Check if dataframe column contains string


Increase the width of dataframe in jupyter display

pd.options.display.max_colwidth = 100

Estimate memory usage for dataframe
By column




Save numpy array to text file

np.savetxt('data.txt', A,fmt="%s")

Filter rows with null/nan values and write to CSV

.to_csv("./rows_missing_last.csv", sep='|', index=False, header=True)

Quick Introduction to Apache Spark

What is Spark

Spark is a fast and general purpose framework for cluster computing.
It is written in Scala but is available in Scala, Java and Python.
The main data abstraction in Spark is the RDD, or Resilient Distributed Dataset.
With this abstraction, Spark enables data to be distributed and processed among
the many nodes of a computing cluster.
It provides both interactive and programmatic interfaces. It consists of the following components:

  • Spark Core – the foundational classes on which Spark is built. It provides the RDD.
  • Spark Streaming – a protocol for processing streaming data
  • Spark SQL – an API for handling structured data
  • MLLib – a Machine Learning library
  • GraphX – a Graph library

Cluster Manager

In order to operate, in production mode Spark needs a cluster manager that manages data distribution, task scheduling and fault tolerance among the various nodes.
There are 3 types supported – Apache Mesos, Hadoop YARN and Spark standalone.

Spark Fundamentals

Spark, as has been previously defined is a framework for fast and general purpose cluster computing. Its fundamental abstraction is the RDD – the resilient distributed dataset which means that it is inherently parallelizable among the nodes and cores of a computing cluster. The RDD is held entirely in RAM.

When data is read into Spark, it is read into an RDD. Once it is read into an
RDD it can be operated on. There are 2 distinct types of operations on RDDs:

1. Transformations

Transformations are used to convert data in the RDD to another form. The result of a transformation is another RDD. Examples of transformations are:

  • map()– takes a function as argument and applies the function to each item/element of the RDD
  • flatMap() – takes a function as argument and applies the function to each element while “flattening” the results into a single level collection.
  • filter() – takes a boolean expression and returns an RDD with rows for which the boolean expression is true. e.g. lines of a file which contain the string “Obama”
  • countByKey() – given a Pair/map RDD i.e. with Key value pairs, return another Pair RDD with counts by key.

2. Actions

Actions are operations on an RDD which result in some sort of output that is not an RDD e.g. a list, DataFrame, or output to the screen. Examples of action operations are:

  • collect() – Applies the various transformations to an RDD then returns the result as a collection.
  • count – returns a count of the number of elements in an RDD
  • reduce() – takes a function and repeatedly applies it to the elements of the
    RDD to produce a single output value

RDDs and Lazy Evaluation

A fundamental idea in Spark’s implementation is the application of lazy evaluation and this is implemented for all Spark transformation operations.
Thus an RDD is fundamentally a data abstraction so, when we call say :

scala> val rdd = sc.parallelize(Seq(1,3,4,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :21

scala> val mappedRDD = => x*x)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :23

what we’re getting as mappedRDD is just an an expression that hasn’t been evaluated. This expression is essentially a record of a sequence operations that need to be evaluated i.e. parallelize –> map
This expression amounts to what is known as a lineage graph and can be seen as follows :

scala> mappedRDD.toDebugString
res4: String = 
(4) MapPartitionsRDD[1] at map at :23 []
 |  ParallelCollectionRDD[0] at parallelize at :21 []

List of public datasets

I will be continuously updating this blog as I encounter more interesting datasets.

Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problems I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]:
|                text|retweetCount|
|Adventures With C...|           0|

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"\" rel\u003d\"nofollow\"\u003eTwitter Web Client

However, for a json file that looks like this:

  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
    "url": "", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
In [123]:
          |           locations|

How to set up an Apache Spark Cluster

I now detail the steps I took to setup an Apache Spark Cluster on my home network.
I have the following 2 hosts:

  • G50 Lenovo laptop running Ubuntu 15.04 (slave/worker)
  • iMac desktop running Mac OS X 10 (master)

Step 1 – Install Spark into same location on both machines

My first step was to make sure I installed Apache Spark into the same location : /opt/spark-1.4.1
The steps for doing this are detailed here: How to install Apache Spark

Step 2 – Modify the config file on master machine

Edit the following file on the master machine, making sure to add
the host name of the slave machine: $SPARK_HOME/conf/slaves

Step 3 – Setup ssh password-less login

The master needs to be able to login without a password on the slave machine. To enable this, run the following on the master machine:

ssh-keygen -t dsa
Enter file in which to save the key (/home/youruser/.ssh/id_dsa): [ENTER]
Enter passphrase (empty for no passphrase): [EMPTY]
Enter same passphrase again: [EMPTY]

Next, copy this file from the master to the the slave machine

and then run:

$ cat ~/.ssh/ >> ~/.ssh/authorized_keys
$ chmod 644 ~/.ssh/authorized_keys


Step 4 – Start the cluster from the master machine

Run the following: sbin/

This should start up the cluster and produce output that looks like

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.master.Master-1-macdesk.fios-router.home.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-macdesk.fios-router.home.out
fembuntu: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-fembuntu.out

Stopping the cluster

The cluster can be stopped as follows: sbin/
and produces output that looks like:

localhost: stopping org.apache.spark.deploy.worker.Worker
fembuntu: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master

Monitoring from the master web ui

The cluster can be monitored from the web ui of the master node which is accessible from port 8080 (default). It should look similar to the following:



I obtained the following errors on the slave machine:

16/01/03 05:40:24 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@master:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: master/
16/01/03 05:41:30 ERROR Worker: All masters are unresponsive! Giving up.

The problem is that I had 2 separate ip addresses pointing to the
same hostname for my slave machine (it was both on the wifi network and connected via ethernet as well).

Reference links

How to install Apache Spark

Mac OS/X

Installing Spark on Mac OS/X will involve downloading the required
archive files into the final location where we want to run Spark and also Scala from. On my machine that directory location is:

  1. cd to destination directory:
    cd /Users/myuser/local
  2. Download Spark from
  3. Download scala from
  4. Uncompress the spark and scala archive files you downloaded:
    tar -xvzf spark-1.4.1.tgz
    tar -xvzf scala-2.11.7.tgz

    The above step uncompresses the file contents into these directories: spark-1.4.1, scala-2.11.7

  5. Set the following environment variables – SCALA_HOME, SPARK_HOME, PATH:

    export SCALA_HOME=/Users/myuser/local/scala-2.11.7
    export SPARK_HOME=/Users/myuser/local/spark-1.4.1
    export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
  6. Run sbt – Scala build tool
    cd $SPARK_HOME
    build/sbt clean assembly
  7. Run spark shell

    Sometimes you may get an error about an invalid sbt-launch file.
    To resolve this do the following:

    cd build
    mv sbt-launch.jar sbt-launch.0.13.7.jar

Reference Links

Daily Algo: Primality of a given n

There are various questions one can ask while investigating primes algorithmically. One might be : Given a number n, is it prime ? A naive, brute force approach would be to iterate through all the numbers i from 0 to n-1, and test if n%i ==0. However, this approach would be naive in the extreme, and a more efficient approach can be devised by noting a few points.

  1. 0 and 1 are not primes
  2. 2 is a prime. However, all multiples of 2 are not prime. Hence we can reduce the numbers we have to inspect by 50%.
  3. If a number m divides n, then n/m also divides n, and n/m <=m or vice-versa. Hence all we need is to examine numbers up to sqrt(n) and no further.

The Python code is shown below:

In [15]: import math
def isPrime(n):
    if n <= 1 or n%2==0: return False 
    if n==2: return True
    ulim = int(math.sqrt(n))+1
    for i in range(3,ulim+1,2):
        if n%i ==0:
            return False
    return True

In [16]: isPrime(6767)
Out[16]: False
In [17]: isPrime(62417)
Out[17]: True

The code is available for download here :

Running standalone program in Spark

In this article we will walk through the steps to setup and run a standalone program in Apache Spark for 3 languages – Python, Scala and Java.

As a first step, let us go ahead and obtain some some ready made code that implements the WordCount algorithm from the Learning Spark examples git repository.

Step 1 – Create code

First, cd to the directory of your choice and run:

git clone

This will checkout code under the directory learning-spark-examples.
You will also need to download the file we’re using for this example, Thomas Jefferson’s Gettysburg Address.

Step 2 – Package and run the application

Our WordCount code is nicely isolated in the directory


We will need to package this code as an application so we can distribute this to a Spark cluster. There are different steps for this for Java, Scala and Python

Step 2a – Java

In the case of Java we use Maven to package our code into a jar. Maven which is a package manager is configured via a pom.xml file. The contents of the file are shown below:

   <dependency> <!-- Spark dependency -->

The most important sections with respect to the name of the jar running the application are as follows:


The groupId is used to determine the package namespace, and the artifactId and version are used to formulate the name of the resulting jar file.

To compile, and package the application run :

cd learning-spark-examples/mini-complete-example
mvn clean && mvn compile && mvn package

This creates the following file in the target directory:
Now that our application has been packaged, we launch it using the script spark-submit under $SPARK_HOME/bin. The advantage of this script is that it sets up the CLASSPATH and any other dependencies as well as supporting multiple deploy modes and cluster managers. The arguments to spark-submit are as follows:

 --class <main-class> \
 --master <master-url> \
 --deploy-mode <deploy-mode> \
 --conf <key>=<value>
 <application-jar> \
 [application args]

Our WordCount application code takes 2 arguments :

  • input file
  • output dir

Note that the argument to the JavaPairRDD counts object is actually a directory name to which the output is saved.

Thus we run the Java application as follows:

$SPARK_HOME/bin/spark-submit --class \ \
./target/learning-spark-mini-example-0.0.1.jar \
 ./gettysburg.txt ./java_output

The result of running the code above is the creation of the java_output directory:

-rw-r--r-- 1 user user 646 Dec 15 04:59 part-00000
-rw-r--r-- 1 user user 764 Dec 15 04:59 part-00001
-rw-r--r-- 1 user user   0 Dec 15 04:59 _SUCCESS

The 1st 5 lines of output from the part-0000 file looks as follows:

head -5 java_output/part-00000

Step 2b – Scala

Now we turn our attention to creating an application in Scala.
sbt is an open source build tool and is the equivalent of Ant/Maven. The equivalent to the pom.xml file is the build.sbt file. This file looks like this:

cat build.sbt
name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"

Building using sbt results in a jar file similar to that done for Java using Maven. To build this jar file, we do the following:

sbt clean package

This generates the jar file which is written to the target directory by default:

[info] Packaging /home/user/devel/learning-spark/mini-complete-example/target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 384 s, completed Dec 12, 2015 4:16:21 AM

We can then run the Scala application as follows:

$SPARK_HOME/bin/spark-submit \
 --class \
 ./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
 ./gettysburg.txt ./scala_output

This writes its output into the scala_output directory in a manner similar to the Java case :

$ l -ltr scala_output/
total 8
-rw-r--r-- 1 user user 746 Dec 14 04:54 part-00001
-rw-r--r-- 1 user user 646 Dec 14 04:54 part-00000
-rw-r--r-- 1 user user   0 Dec 14 04:54 _SUCCESS

Step 2c – Python

The Python case is much simpler than Java or Scala.
There is no build/packaging stage. All that needs to be done is to specify a source file using the --py-files of spark-submit. In this example there was no Python script supplied so I wrote this script,

import sys, argparse, os
from operator import add

from pyspark import SparkContext

parser = argparse.ArgumentParser(description="Run wordcount in Spark")
parser.add_argument('-f', help='input file',dest='fileName',required=True)
parser.add_argument('-o', help='output dir',dest='outdir',required=True)

def main():

def count_words(filename):
    sc = SparkContext(appName="WordCount in Python")
    in_lines = sc.textFile(filename, 1)
    inter_counts = in_lines.flatMap(lambda line: line.split(' ')) \
     .map(lambda x: (x.rstrip(',.'),1)).reduceByKey(add)
    total_counts = inter_counts.collect()
    outdir = args.outdir
    if not os.path.exists(outdir):
    outfile = "{0}/{1}".format(outdir,"py_wordcount.txt")
    print "Outputting to {0}".format(outfile)
    with open(outfile, 'w') as fh:
        for (word, count) in total_counts:
            fh.write("{0}: {1}\n".format(word, count))

if __name__ == "__main__":

We can then run it as follows:

$SPARK_HOME/bin/spark-submit ./src/ -f gettysburg.txt -o py_output


The strings in the gettysburg.txt file need to be stripped of non-alphanumeric in order to get a correct word count, otherwise strings such as “dead,” and “dead” would be counted as separate words.
In the Java case, to fix this, you can make this change to

return new Tuple2(x.replaceAll("[^A-Za-z0-9]", ""), 1);

Similarly for Scala, make this change :

 val counts = => (word.replaceAll("[^A-Za-z0-9]", ""), 1)).reduceByKey{case (x, y) => x + y}