Pandas Cheatsheet

Reset index for data frame based on specific column


Show null columns of dataframe

df.ix[df.index[(df.T==np.nan).sum() > 1]]

Sum specific column of dataframe, excluding NULLs


Loop over rows of dataframe
With iterrows (produces index, columns as a Series):

for idx, row in df.iterrows():
         print idx, row['a'], row['col2'], row['col3']

With itertuples ( produces tuple of index and column values):

for irow in df.itertuples():
         print irow

Check if dataframe column contains string


Increase the width of dataframe in jupyter display

pd.options.display.max_colwidth = 100

Estimate memory usage for dataframe
By column




Save numpy array to text file

np.savetxt('data.txt', A,fmt="%s")

Filter rows with null/nan values and write to CSV

.to_csv("./rows_missing_last.csv", sep='|', index=False, header=True)

Quick Introduction to Apache Spark

What is Spark

Spark is a fast and general purpose framework for cluster computing.
It is written in Scala but is available in Scala, Java and Python.
The main data abstraction in Spark is the RDD, or Resilient Distributed Dataset.
With this abstraction, Spark enables data to be distributed and processed among
the many nodes of a computing cluster.
It provides both interactive and programmatic interfaces. It consists of the following components:

  • Spark Core – the foundational classes on which Spark is built. It provides the RDD.
  • Spark Streaming – a protocol for processing streaming data
  • Spark SQL – an API for handling structured data
  • MLLib – a Machine Learning library
  • GraphX – a Graph library

Cluster Manager

In order to operate, in production mode Spark needs a cluster manager that manages data distribution, task scheduling and fault tolerance among the various nodes.
There are 3 types supported – Apache Mesos, Hadoop YARN and Spark standalone.

Spark Fundamentals

Spark, as has been previously defined is a framework for fast and general purpose cluster computing. Its fundamental abstraction is the RDD – the resilient distributed dataset which means that it is inherently parallelizable among the nodes and cores of a computing cluster. The RDD is held entirely in RAM.

When data is read into Spark, it is read into an RDD. Once it is read into an
RDD it can be operated on. There are 2 distinct types of operations on RDDs:

1. Transformations

Transformations are used to convert data in the RDD to another form. The result of a transformation is another RDD. Examples of transformations are:

  • map()– takes a function as argument and applies the function to each item/element of the RDD
  • flatMap() – takes a function as argument and applies the function to each element while “flattening” the results into a single level collection.
  • filter() – takes a boolean expression and returns an RDD with rows for which the boolean expression is true. e.g. lines of a file which contain the string “Obama”
  • countByKey() – given a Pair/map RDD i.e. with Key value pairs, return another Pair RDD with counts by key.

2. Actions

Actions are operations on an RDD which result in some sort of output that is not an RDD e.g. a list, DataFrame, or output to the screen. Examples of action operations are:

  • collect() – Applies the various transformations to an RDD then returns the result as a collection.
  • count – returns a count of the number of elements in an RDD
  • reduce() – takes a function and repeatedly applies it to the elements of the
    RDD to produce a single output value

RDDs and Lazy Evaluation

A fundamental idea in Spark’s implementation is the application of lazy evaluation and this is implemented for all Spark transformation operations.
Thus an RDD is fundamentally a data abstraction so, when we call say :

scala> val rdd = sc.parallelize(Seq(1,3,4,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :21

scala> val mappedRDD = => x*x)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :23

what we’re getting as mappedRDD is just an an expression that hasn’t been evaluated. This expression is essentially a record of a sequence operations that need to be evaluated i.e. parallelize –> map
This expression amounts to what is known as a lineage graph and can be seen as follows :

scala> mappedRDD.toDebugString
res4: String = 
(4) MapPartitionsRDD[1] at map at :23 []
 |  ParallelCollectionRDD[0] at parallelize at :21 []

List of public datasets

I will be continuously updating this blog as I encounter more interesting datasets.

Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problems I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]:
|                text|retweetCount|
|Adventures With C...|           0|

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"\" rel\u003d\"nofollow\"\u003eTwitter Web Client

However, for a json file that looks like this:

  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
    "url": "", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
In [123]:
          |           locations|

How to set up an Apache Spark Cluster

I now detail the steps I took to setup an Apache Spark Cluster on my home network.
I have the following 2 hosts:

  • G50 Lenovo laptop running Ubuntu 15.04 (slave/worker)
  • iMac desktop running Mac OS X 10 (master)

Step 1 – Install Spark into same location on both machines

My first step was to make sure I installed Apache Spark into the same location : /opt/spark-1.4.1
The steps for doing this are detailed here: How to install Apache Spark

Step 2 – Modify the config file on master machine

Edit the following file on the master machine, making sure to add
the host name of the slave machine: $SPARK_HOME/conf/slaves

Step 3 – Setup ssh password-less login

The master needs to be able to login without a password on the slave machine. To enable this, run the following on the master machine:

ssh-keygen -t dsa
Enter file in which to save the key (/home/youruser/.ssh/id_dsa): [ENTER]
Enter passphrase (empty for no passphrase): [EMPTY]
Enter same passphrase again: [EMPTY]

Next, copy this file from the master to the the slave machine

and then run:

$ cat ~/.ssh/ >> ~/.ssh/authorized_keys
$ chmod 644 ~/.ssh/authorized_keys


Step 4 – Start the cluster from the master machine

Run the following: sbin/

This should start up the cluster and produce output that looks like

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.master.Master-1-macdesk.fios-router.home.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-macdesk.fios-router.home.out
fembuntu: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-fembuntu.out

Stopping the cluster

The cluster can be stopped as follows: sbin/
and produces output that looks like:

localhost: stopping org.apache.spark.deploy.worker.Worker
fembuntu: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master

Monitoring from the master web ui

The cluster can be monitored from the web ui of the master node which is accessible from port 8080 (default). It should look similar to the following:



I obtained the following errors on the slave machine:

16/01/03 05:40:24 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@master:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: master/
16/01/03 05:41:30 ERROR Worker: All masters are unresponsive! Giving up.

The problem is that I had 2 separate ip addresses pointing to the
same hostname for my slave machine (it was both on the wifi network and connected via ethernet as well).

Reference links

How to install Apache Spark

Mac OS/X

Installing Spark on Mac OS/X will involve downloading the required
archive files into the final location where we want to run Spark and also Scala from. On my machine that directory location is:

  1. cd to destination directory:
    cd /Users/myuser/local
  2. Download Spark from
  3. Download scala from
  4. Uncompress the spark and scala archive files you downloaded:
    tar -xvzf spark-1.4.1.tgz
    tar -xvzf scala-2.11.7.tgz

    The above step uncompresses the file contents into these directories: spark-1.4.1, scala-2.11.7

  5. Set the following environment variables – SCALA_HOME, SPARK_HOME, PATH:

    export SCALA_HOME=/Users/myuser/local/scala-2.11.7
    export SPARK_HOME=/Users/myuser/local/spark-1.4.1
    export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
  6. Run sbt – Scala build tool
    cd $SPARK_HOME
    build/sbt clean assembly
  7. Run spark shell

    Sometimes you may get an error about an invalid sbt-launch file.
    To resolve this do the following:

    cd build
    mv sbt-launch.jar sbt-launch.0.13.7.jar

Reference Links

Daily Algo: Primality of a given n

There are various questions one can ask while investigating primes algorithmically. One might be : Given a number n, is it prime ? A naive, brute force approach would be to iterate through all the numbers i from 0 to n-1, and test if n%i ==0. However, this approach would be naive in the extreme, and a more efficient approach can be devised by noting a few points.

  1. 0 and 1 are not primes
  2. 2 is a prime. However, all multiples of 2 are not prime. Hence we can reduce the numbers we have to inspect by 50%.
  3. If a number m divides n, then n/m also divides n, and n/m <=m or vice-versa. Hence all we need is to examine numbers up to sqrt(n) and no further.

The Python code is shown below:

In [15]: import math
def isPrime(n):
    if n <= 1 or n%2==0: return False 
    if n==2: return True
    ulim = int(math.sqrt(n))+1
    for i in range(3,ulim+1,2):
        if n%i ==0:
            return False
    return True

In [16]: isPrime(6767)
Out[16]: False
In [17]: isPrime(62417)
Out[17]: True

The code is available for download here :

Running standalone program in Spark

In this article we will walk through the steps to setup and run a standalone program in Apache Spark for 3 languages – Python, Scala and Java.

As a first step, let us go ahead and obtain some some ready made code that implements the WordCount algorithm from the Learning Spark examples git repository.

Step 1 – Create code

First, cd to the directory of your choice and run:

git clone

This will checkout code under the directory learning-spark-examples.
You will also need to download the file we’re using for this example, Thomas Jefferson’s Gettysburg Address.

Step 2 – Package and run the application

Our WordCount code is nicely isolated in the directory


We will need to package this code as an application so we can distribute this to a Spark cluster. There are different steps for this for Java, Scala and Python

Step 2a – Java

In the case of Java we use Maven to package our code into a jar. Maven which is a package manager is configured via a pom.xml file. The contents of the file are shown below:

   <dependency> <!-- Spark dependency -->

The most important sections with respect to the name of the jar running the application are as follows:


The groupId is used to determine the package namespace, and the artifactId and version are used to formulate the name of the resulting jar file.

To compile, and package the application run :

cd learning-spark-examples/mini-complete-example
mvn clean && mvn compile && mvn package

This creates the following file in the target directory:
Now that our application has been packaged, we launch it using the script spark-submit under $SPARK_HOME/bin. The advantage of this script is that it sets up the CLASSPATH and any other dependencies as well as supporting multiple deploy modes and cluster managers. The arguments to spark-submit are as follows:

 --class <main-class> \
 --master <master-url> \
 --deploy-mode <deploy-mode> \
 --conf <key>=<value>
 <application-jar> \
 [application args]

Our WordCount application code takes 2 arguments :

  • input file
  • output dir

Note that the argument to the JavaPairRDD counts object is actually a directory name to which the output is saved.

Thus we run the Java application as follows:

$SPARK_HOME/bin/spark-submit --class \ \
./target/learning-spark-mini-example-0.0.1.jar \
 ./gettysburg.txt ./java_output

The result of running the code above is the creation of the java_output directory:

-rw-r--r-- 1 user user 646 Dec 15 04:59 part-00000
-rw-r--r-- 1 user user 764 Dec 15 04:59 part-00001
-rw-r--r-- 1 user user   0 Dec 15 04:59 _SUCCESS

The 1st 5 lines of output from the part-0000 file looks as follows:

head -5 java_output/part-00000

Step 2b – Scala

Now we turn our attention to creating an application in Scala.
sbt is an open source build tool and is the equivalent of Ant/Maven. The equivalent to the pom.xml file is the build.sbt file. This file looks like this:

cat build.sbt
name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"

Building using sbt results in a jar file similar to that done for Java using Maven. To build this jar file, we do the following:

sbt clean package

This generates the jar file which is written to the target directory by default:

[info] Packaging /home/user/devel/learning-spark/mini-complete-example/target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 384 s, completed Dec 12, 2015 4:16:21 AM

We can then run the Scala application as follows:

$SPARK_HOME/bin/spark-submit \
 --class \
 ./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
 ./gettysburg.txt ./scala_output

This writes its output into the scala_output directory in a manner similar to the Java case :

$ l -ltr scala_output/
total 8
-rw-r--r-- 1 user user 746 Dec 14 04:54 part-00001
-rw-r--r-- 1 user user 646 Dec 14 04:54 part-00000
-rw-r--r-- 1 user user   0 Dec 14 04:54 _SUCCESS

Step 2c – Python

The Python case is much simpler than Java or Scala.
There is no build/packaging stage. All that needs to be done is to specify a source file using the --py-files of spark-submit. In this example there was no Python script supplied so I wrote this script,

import sys, argparse, os
from operator import add

from pyspark import SparkContext

parser = argparse.ArgumentParser(description="Run wordcount in Spark")
parser.add_argument('-f', help='input file',dest='fileName',required=True)
parser.add_argument('-o', help='output dir',dest='outdir',required=True)

def main():

def count_words(filename):
    sc = SparkContext(appName="WordCount in Python")
    in_lines = sc.textFile(filename, 1)
    inter_counts = in_lines.flatMap(lambda line: line.split(' ')) \
     .map(lambda x: (x.rstrip(',.'),1)).reduceByKey(add)
    total_counts = inter_counts.collect()
    outdir = args.outdir
    if not os.path.exists(outdir):
    outfile = "{0}/{1}".format(outdir,"py_wordcount.txt")
    print "Outputting to {0}".format(outfile)
    with open(outfile, 'w') as fh:
        for (word, count) in total_counts:
            fh.write("{0}: {1}\n".format(word, count))

if __name__ == "__main__":

We can then run it as follows:

$SPARK_HOME/bin/spark-submit ./src/ -f gettysburg.txt -o py_output


The strings in the gettysburg.txt file need to be stripped of non-alphanumeric in order to get a correct word count, otherwise strings such as “dead,” and “dead” would be counted as separate words.
In the Java case, to fix this, you can make this change to

return new Tuple2(x.replaceAll("[^A-Za-z0-9]", ""), 1);

Similarly for Scala, make this change :

 val counts = => (word.replaceAll("[^A-Za-z0-9]", ""), 1)).reduceByKey{case (x, y) => x + y}

Depth First Search


The goal of Depth First Search is to search as ‘deep’ into the graph as possible, which is opposed to Breadth First Search which is to search as ‘wide’ as possible. In DFS, edges are explored out of the most recently discovered vertex v that has unexplored edges out of it. When all of v’s edges have been explored, the search “backtracks” to explore other edges leaving the source vertex from which v was discovered. This process is continued until all the vertices reachable from the original source vertex have been discovered. If any undiscovered vertices remain, then one of them is selected as a new source and the search is repeated from that source. This process is repeated until all vertices have been discovered.
Unlike BFS, where the predecessor subgraph forms a tree, the predecessor subgraph formed by a DFS may be composed of several trees to form a DFS forest. This is because the search may be repeated from multiple sources.


As in BFS, vertices are colored during ths search to indicate their state. Each vertex is initially white, is grayed when it is discovered in the search and blackened when finished, i.e. when it’s adjacency list/neighbor set has been examined completely. This guarantees that each vertex ends up in exactly one depth-first tree so that these trees are disjoint.
Besides creating a depth-first forest, depth-first search also timestamps each vertex. Each vertex v has 2 timestamps: the 1st timestamp disc[v] records when v is first discovered and grayed, and the 2nd timestamp fin[v] records when the search finishes examining v’s adjacency list and blackens v.


The pseudocode is as follows:

1 for each vertex u in V[G]
2      do color[u] <--- WHITE
3           pred[u] <--- NULL
4 time <--- 0
5  for each vertex u in V[G]
6        do if color[u] == WHITE
7             then DFS-VISIT(u)

1 color[u] <--- WHITE  // White vertex u has just been discovered.
2 time <--- time + 1
3 disc[u] <-- time
4 for each v in Adj[u]     // Explore edge (u,v)
5       do if color[v] == WHITE
6       then pred[v] <--- u
7               DFS-VISIT(v)
8 color[u] <-- BLACK     // Blacken u; it is finished
9 fin[u] <--- time <--- time +1


Total running time is: θ(V+E). DFS runs linear in the size of the adjacency list representation of the graph G.

Various Machine Learning Links

ML Tutorials and Tips

In-depth introduction to machine learning in 15 hours of expert videos
Machine Learning Practical Advice – Scikit-Learn
Introduction to Sckit-Learn (Vanderplass)

Data Mining

R and Data Mining: Examples and Case Studies


Winning Kaggle, An introduction to Re-Ranking
Titanic: Machine Learning from Disaster


Titanic Survival Prediction Using R
Titanic Survival Prediction Using R (Wehrley)
Titanic with Decision Trees
Kaggle – Titanic Machine Learning from Disaster
Data Science Learning from the Titanic in R
EECS 349 Titanic – Machine Learning From Disaster
The sinking of the Titanic – Binary Logistic Regression in R
CS229 Titanic – Machine Learning from Disaster

Supervised Learning

Scikit-Learn Supervised Learning
DataRobot – Classification with Sckit-Learn

Logistic Regression

Logistic Regression with sckit-learn
Kaggle — Photo quality prediction

Decision Trees

Classification with Decision Tree Induction
Scikit-Learn Decision Tree Regression

Support Vector Machines

Scikit-Learn Support Vector Machines

Unsupervised Learning

Principal Component Analysis

PCA in an image with scikit-learn and scikit-image
Scikit-Learn Faces Dataset Decomposition
PCA for Machine Learning (R)


Scikit-Learn K-Means Clustering