Git Tip: checkout tag of repository

How to checkout a specific tag of a repository

  1. Check out the repo:
 git clone https://github.com/miguelgrinberg/microblog.git
  1. Determine the list of available tags:
cd microblog
git tag -l
  1. Checkout the tag as a new branch:
microblog$ git checkout tags/v0.3 -b tag_v0.3
Switched to a new branch 'tag_v0.3'
  1. The reason for checking out the tag as a new branch is that we wish to keep the tagged version separate from the latest branch.

Reference: http://bit.ly/2ANbo90

Spark Code Cheatsheet

Word count in single line

 rdd.flatMap(line => line.split(" "))
 .map(word => (word,1)
 .reduceByKey((count,accum)=>(count+accum))

Count number of lines in file

rdd.count

Display Spark version

print sc.version

Read csv file into dataframe in spark with header

df=spark.read.option("header","true")
    .option("delimiter",",").csv(csv_path)

Read csv file into dataframe in spark without header
First you need to specify a schema:

schema = StructType([
    StructField("Field0", IntegerType()),
    StructField("Field1", StringType()),
    StructField("Field2", StringType())
    ])

Then read in the data:

df=spark.read.format('com.databricks.spark.csv')
   .load(csv_path, schema = schema)


Handle commas in embedded quoted strings when reading in data

This is done by specifying an escape option:

df_raw=spark.read.format('com.databricks.spark.csv')
     .load(csv_path, schema = schema, escape='"')

Obtain distinct values of column in Spark DF

student_ids=df_raw.select('student_id').distinct().take(200)

Convert list of DataFrame Rows to Python list

student_ids=[x.student_id for x in df_raw
            .select('student_id').distinct().take(200)]

Filter out rows based on values in list

filtered_df=df_raw.where(col('student_id').isin(student_ids))

Save dataframe to csv (only for small datasets)

# Many Files
filtered_df.write.csv(output_csv) 
# Single File
filtered_df.coalesce(1).write
.option("header","true").csv(output_csv) 

Count number of occurrences of composite key in data frame:

df.select('major','gender').distinct().count()

Start pyspark in python notebook mode

export PYSPARK_DRIVER_PYTHON=ipython;pyspark

Python cheatsheet

Read input from user

In [1]: s = raw_input('--> ') # Python 2
s = input('--> ') #Python 3

Obtain base directory from within current module

print os.path.dirname(os.path.abspath(__file__))

Note that __file__ is the pathname of the file from which the module was loaded, if it was loaded from a file.

How I got Cloudera Quickstart VM running on Google Compute Engine

Introduction

This is a brief synopsis of how I got the Cloudera Quickstart VM running via Docker on Google Compute Engine, which is Google’s cloud equivalent to Amazon’s AWS Cloud Computing Service.

The Cloudera Quickstart VM is a basic “Hadoop-In-A-Box” virtual machine solution which provides a Hadoop ecosystem for developers who wish to quickly test out the basic features of Hadoop without having to deploy an entire cluster. Since it doesn’t entail setting up a cluster, certain features provided by a cluster are missing.

Detailed Steps

Step 0.

Install gcloud on your local workstation as per these instructions:

https://cloud.google.com/sdk/

Step 1. Create a container optimized VM on GCE:

https://cloud.google.com/compute/docs/containers/container_vms

$ gcloud compute --project "myproject-1439" \
instances create "quickstart-instance-1" \
--image container-vm --zone "us-central1-a" \
 --machine-type "n1-standard-2"
Created [https://www.googleapis.com/compute/v1/projects/gcebook-1039/zones/us-central1-a/instances/quickstart-instance-1].NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP EXTERNAL_IP STATUS
quickstart-instance-1 us-central1-a n1-standard-2 10.240.0.2 146.148.92.36 RUNNING

I created an n1-standard-2 VM on GCE which has 2vCPUs and 7.GB RAM. It will already have docker pre-installed.

Step 3. Let’s check the image size:

$ docker images
REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
cloudera/quickstart latest 2cda82941cb7 41 hours ago 6.336 GB

Given that our VM has total disk size of 10GB this is cutting it a bit close for the long term, if we wish to install other software.

So let’s create a persistent disk and make that available for storing our docker image:

https://cloud.google.com/compute/docs/disks/persistent-disks

I was able to create a 200GB persistent disk: bigdisk1

Step 4.  Switch docker image installation directory to use the big persistent disk.

There are a couple of ways to do this as per this article:

https://forums.docker.com/t/how-do-i-change-the-docker-image-installation-directory/1169

The least trouble free way IMO was to mount the bigdisk1 persistent disk to it would be available for use by my VM, and move the default docker image installation directory to it.

First, create a mountpoint for the bigdisk:

$ sudo mkdir /mnt/bigdisk1

Next, mount it:
On GCE, the raw disk can be found at /dev/disk/by-id/google-<diskid>
i.e. /dev/disk/by-id/google-bigdisk1

$ sudo mount -o discard,defaults /dev/disk/by-id/google-bigdisk1 \
  /mnt/bigdisk1

Finally, symlink it back to the default image installation directory:

$ sudo ln -s /mnt/bigdisk1/docker/ /var/lib/docker

Presto, we’re now ready. If we run docker pull on any image, the image will be written to the large persistent disk:

$ ls -ltr /var/lib/docker
lrwxrwxrwx 1 root root 21 Apr 9 03:20 /var/lib/docker -> /mnt/bigdisk1/docker/

Step 5. Run the Cloudera Quickstart VM and get access to all the goodies:

http://www.cloudera.com/documentation/enterprise/5-5-x/topics/quickstart_docker_container.html

$ sudo docker run --hostname=quickstart.cloudera --privileged=true \
-t -i cloudera/quickstart  /usr/bin/docker-quickstart 
...
Starting zookeeper ... STARTED
starting datanode, logging to /var/log/hadoop-hdfs/hadoop-hdfs-datanode-quickstart.cloudera.out

...
starting rest, logging to /var/log/hbase/hbase-hbase-rest-quickstart.cloudera.out
Started HBase rest daemon (hbase-rest): [ OK ]
starting thrift, logging to /var/log/hbase/hbase-hbase-thrift-quickstart.cloudera.out
Started HBase thrift daemon (hbase-thrift): [ OK ]
Starting Hive Metastore (hive-metastore): [ OK ]
Started Hive Server2 (hive-server2): [ OK ]
Starting Sqoop Server: [ OK ]
Sqoop home directory: /usr/lib/sqoop2
Setting SQOOP_HTTP_PORT: 12000
...
Started Impala Catalog Server (catalogd) : [ OK ]
Started Impala Server (impalad): [ OK ]
[root@quickstart /]#

Spark Expositions: Understanding RDDs

Introduction

RDD’s or resilient distributed datasets are the fundamental data abstraction
in Apache Spark. An RDD amounts to what is a distributed dataset in memory.
It is distributed or partitioned among various nodes in a Spark cluster.
The official definition of an RDD in the official documentation is as follows:

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

In the seminal paper on RDDs by Matei Zaharia, an RDD is defined as follows:

Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.

The Resilient part is due to the fact that it is fault tolerant and hence if a
partition of the dataset is lost due to a node going done it can be recovered in a timely manner. The recoverability comes from the fact that its lineage graph is maintained by the driver program and hence the partition can be recomputed.

The lineage graph consists of a series of transformations and actions to be
computed on a partition of data and this is stored on the driver program.
The RDD in effect is a set of instructions which are lazily evaluated.
The evaluation only occurs when an action is encountered.
Here is an example of what an RDD’s lineage graph looks like:

scala> idBudgetRDD.toDebugString
res6: String = 
(2) MapPartitionsRDD[8] at map at <console>:35 []
 | MapPartitionsRDD[7] at filter at <console>:33 []
 | MapPartitionsRDD[3] at map at <console>:31 []
 | MapPartitionsRDD[2] at map at <console>:31 []
 | MapPartitionsRDD[1] at textFile at <console>:29 []

The Distributed part stems from the fact that the data is distributed among worker nodes of a cluster. The driver program sends the RDD along with which partition of the data that should be computed on that particular cluster. The program on the worker node that is responsible for executing the set of instructions encapsulated in the RDD is called the Executor. The exact call is as follows, in

core/src/main/scala/org/apache/spark/rdd/RDD.scala

@DeveloperApi
 def compute(split: Partition, context: TaskContext): Iterator[T]

The Dataset expresses the fact that we are in fact processing a collection of
data albeit one that will be partitioned.

Again, taking a look at the source code, in
core/src/main/scala/org/apache/spark/rdd/RDD.scala
we see :

/**
 * Implemented by subclasses to return the set of partitions in this    RDD. This method will only
 * be called once, so it is safe to implement a time-consuming 
   computation in it.
 */
 protected def getPartitions: Array[Partition]

Types of Operations

RDDs support 2 types of operations: Transformations and Actions.

Transformations
Transformations convert an RDD from one type to another.
They are lazily evaluated, meaning that they’re only executed when data is ready to be output.

Actions
Actions trigger the execution of the chain of operations that result in data being returned. They are necessary for transformations to be evaluated. Without an action, an RDD is just a chain of transformations that are yet to be evaluated.

Contents of an RDD

An RDD is characterized by the of the following 5 properties:

  1. A list of partitions that comprise the dataset.
  2. A function to perform the computation for each partition.
  3. A list of dependencies on other RDDs i.e. parent RDDs.
    The parent RDD is the initial RDD on which a transformation is
    executed.
  4. A Partitioner for key-value/Pair RDDs (Pair RDDs are defined later).
  5. A list of preferred locations/hosts to load the various partitions into
    which the data has been split.

Where does the RDD live ?

The RDD lives in the Spark Context on the driver program which runs on the master node in a cluster setup.

It is uniquely identified by an id :

scala> val exampleRDD = sc.parallelize(List(1,2,3,4,5))
exampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> exampleRDD.id
res34: Int = 12 

scala> val example2RDD = sc.parallelize(List(1,2,3,4,5))
example2RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:27

scala> example2RDD.id
res36: Int = 13

scala> val example3RDD=exampleRDD
example3RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> example3RDD.id
res37: Int = 12

 

Ways to create an RDD

  1. Parallelize an existing collection e.g.
    primesRDD = sc.parallelize([2,3,5,7,11,13,17])
  2. Read data from an external distributed source e.g. HDFS, S3, Cassandra
    e.g.

     gettysburgRDD = sc.textFile("hdfs://user/dataphantik/gettysburg.txt"
    
  3. From an existing RDD – transforming an existing RDD into new RDD
    errorsRDD = baseRDD.filter(lambda line : line contains 'Error')
  4. Via Spark Streaming

Types of RDDs

The base class of all RDDs, org.apache.spark.rdd.RDD is an abstract class which all base RDDs inherit from.

The base RDD classes are:

  • HadoopRDD – Provides core functionality for reading data stored in Hadoop (e.g
    files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.
    apache.hadoop.mapred`
  • MapPartitionsRDD – applies the provided function to every partition of the
    parent RDD. Normally returned when an RDD is created from a file via sc.textFile(..)
  • ParallelCollectionRDD – RDD representing a collection of elements. It containe
    s numSlices Partitions and locationPrefs, which is a Map. It is obtained from call to sc.parallelize(..) on a collection in memory.
  • PipedRDD – An RDD that pipes the contents of each parent partition through an e
    xternal command (printing them one per line) and returns the output as a collect
    ion of strings.

There are also depictions of

  • PairRDD – an RDD of key-value pairs. It can be a ParallelCollectionRDD containin
    g key-value pairs. There is no concrete PairRDD class, since it is an abstraction, however a class, PairRDDFunctions provides a set of transformations that can
    be performed on Pair RDDs.

References:

Useful Distributed Systems Links

Distributed Systems is a constantly changing field. This page attempts to keep track of sites or blogs that are frequently updated and are chock full of useful information to folks interested in keeping up with the start of the art technologies.

Blogs

Framework Sites

Distributed Systems Concepts

Pandas Cheatsheet

Reset index for data frame based on specific column

df2=df.set_index(['id'])

Show null columns of dataframe

df.ix[df.index[(df.T==np.nan).sum() > 1]]

Sum specific column of dataframe, excluding NULLs

df[df['col_name'].notnull()]['col_name'].sum()

Loop over rows of dataframe
With iterrows (produces index, columns as a Series):

for idx, row in df.iterrows():
         print idx, row['a'], row['col2'], row['col3']

With itertuples ( produces tuple of index and column values):

for irow in df.itertuples():
         print irow

Check if dataframe column contains string

df['field_name'].str.contains('trinidad')

Increase the width of dataframe in jupyter display

pd.options.display.max_colwidth = 100

Estimate memory usage for dataframe
By column

df.memory_usage(index=True)

Total

df.memory_usage(index=True).sum()

Save numpy array to text file

np.savetxt('data.txt', A,fmt="%s")

Filter rows with null/nan values and write to CSV

rp_pdf[rp_pdf['last_name'].isnull()]\
.to_csv("./rows_missing_last.csv", sep='|', index=False, header=True)

Quick Introduction to Apache Spark

What is Spark

Spark is a fast and general purpose framework for cluster computing.
It is written in Scala but is available in Scala, Java and Python.
The main data abstraction in Spark is the RDD, or Resilient Distributed Dataset.
With this abstraction, Spark enables data to be distributed and processed among
the many nodes of a computing cluster.
It provides both interactive and programmatic interfaces. It consists of the following components:

  • Spark Core – the foundational classes on which Spark is built. It provides the RDD.
  • Spark Streaming – a protocol for processing streaming data
  • Spark SQL – an API for handling structured data
  • MLLib – a Machine Learning library
  • GraphX – a Graph library

Cluster Manager

In order to operate, in production mode Spark needs a cluster manager that manages data distribution, task scheduling and fault tolerance among the various nodes.
There are 3 types supported – Apache Mesos, Hadoop YARN and Spark standalone.

Spark Fundamentals

Spark, as has been previously defined is a framework for fast and general purpose cluster computing. Its fundamental abstraction is the RDD – the resilient distributed dataset which means that it is inherently parallelizable among the nodes and cores of a computing cluster. The RDD is held entirely in RAM.

When data is read into Spark, it is read into an RDD. Once it is read into an
RDD it can be operated on. There are 2 distinct types of operations on RDDs:

1. Transformations

Transformations are used to convert data in the RDD to another form. The result of a transformation is another RDD. Examples of transformations are:

  • map()– takes a function as argument and applies the function to each item/element of the RDD
  • flatMap() – takes a function as argument and applies the function to each element while “flattening” the results into a single level collection.
  • filter() – takes a boolean expression and returns an RDD with rows for which the boolean expression is true. e.g. lines of a file which contain the string “Obama”
  • countByKey() – given a Pair/map RDD i.e. with Key value pairs, return another Pair RDD with counts by key.

2. Actions

Actions are operations on an RDD which result in some sort of output that is not an RDD e.g. a list, DataFrame, or output to the screen. Examples of action operations are:

  • collect() – Applies the various transformations to an RDD then returns the result as a collection.
  • count – returns a count of the number of elements in an RDD
  • reduce() – takes a function and repeatedly applies it to the elements of the
    RDD to produce a single output value

RDDs and Lazy Evaluation

A fundamental idea in Spark’s implementation is the application of lazy evaluation and this is implemented for all Spark transformation operations.
Thus an RDD is fundamentally a data abstraction so, when we call say :

scala> val rdd = sc.parallelize(Seq(1,3,4,5,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :21

scala> val mappedRDD = rdd.map(x => x*x)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :23

what we’re getting as mappedRDD is just an an expression that hasn’t been evaluated. This expression is essentially a record of a sequence operations that need to be evaluated i.e. parallelize –> map
This expression amounts to what is known as a lineage graph and can be seen as follows :

scala> mappedRDD.toDebugString
res4: String = 
(4) MapPartitionsRDD[1] at map at :23 []
 |  ParallelCollectionRDD[0] at parallelize at :21 []

List of public datasets

I will be continuously updating this blog as I encounter more interesting datasets.