Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problems I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
...
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]: topTweets.show()
+--------------------+------------+
|                text|retweetCount|
+--------------------+------------+
|Adventures With C...|           0|
+--------------------+------------+

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"http://twitter.com\" rel\u003d\"nofollow\"\u003eTwitter Web Client
...

However, for a json file that looks like this:

{
  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
   {
    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null
   }, 
...

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
         root
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends
          DataFrame[]

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
          multiline_rdd=sc.wholeTextFiles(inputFile)
          type(multiline_rdd)
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
Out[102]:[(u'file:/home/myuser/twitter_trends.json',
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }
...

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = multiline_rdd.map(lambda x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \
                    flags=re.UNICODE))

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
Out[105]:
[u'{"created_at":"2015-12-23T23:39:06Z","trends":[{"url":"http://twitter.com/search?q=%23HappyBirthdayLouis","query":"%23HappyBirthdayLouis","tweet_volume":658249,"name":"#HappyBirthdayLouis","promoted_content":null},{"url":"http://twitter.com/search?q=%23ShadeHour","query":"%23ShadeHour","tweet_volume":null,"name":"#ShadeHour","promoted_content":null},
,,,
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          root
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)
          ...

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
                                    tweet_trends")
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
Out[122]:DataFrame[locations:array<struct<name:string,woeid:bigint>>]
In [123]: trends_data.show()
          +--------------------+
          |           locations|
          +--------------------+
          |List([SouthAfrica...|
          +--------------------+

How to set up an Apache Spark Cluster

I now detail the steps I took to setup an Apache Spark Cluster on my home network.
I have the following 2 hosts:

  • G50 Lenovo laptop running Ubuntu 15.04 (slave/worker)
  • iMac desktop running Mac OS X 10 (master)

Step 1 – Install Spark into same location on both machines

My first step was to make sure I installed Apache Spark into the same location : /opt/spark-1.4.1
The steps for doing this are detailed here: How to install Apache Spark

Step 2 – Modify the config file on master machine

Edit the following file on the master machine, making sure to add
the host name of the slave machine: $SPARK_HOME/conf/slaves

Step 3 – Setup ssh password-less login

The master needs to be able to login without a password on the slave machine. To enable this, run the following on the master machine:

ssh-keygen -t dsa
Enter file in which to save the key (/home/youruser/.ssh/id_dsa): [ENTER]
Enter passphrase (empty for no passphrase): [EMPTY]
Enter same passphrase again: [EMPTY]

Next, copy this file from the master to the the slave machine
/home/youruser/.ssh/id_dsa.pub

and then run:

$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
$ chmod 644 ~/.ssh/authorized_keys

 

Step 4 – Start the cluster from the master machine

Run the following: sbin/start-all.sh

This should start up the cluster and produce output that looks like
this:

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.master.Master-1-macdesk.fios-router.home.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-macdesk.fios-router.home.out
fembuntu: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-fembuntu.out

Stopping the cluster

The cluster can be stopped as follows: sbin/stop-all.sh
and produces output that looks like:

localhost: stopping org.apache.spark.deploy.worker.Worker
fembuntu: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master

Monitoring from the master web ui

The cluster can be monitored from the web ui of the master node which is accessible from port 8080 (default). It should look similar to the following:

spark_web_ui

Troubleshooting

I obtained the following errors on the slave machine:

16/01/03 05:40:24 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@master:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: master/192.269.8.153:7077
16/01/03 05:41:30 ERROR Worker: All masters are unresponsive! Giving up.

The problem is that I had 2 separate ip addresses pointing to the
same hostname for my slave machine (it was both on the wifi network and connected via ethernet as well).

Reference links

How to install Apache Spark

Mac OS/X

Installing Spark on Mac OS/X will involve downloading the required
archive files into the final location where we want to run Spark and also Scala from. On my machine that directory location is:
/Users/myuser/local

  1. cd to destination directory:
    cd /Users/myuser/local
  2. Download Spark from http://spark.apache.org/downloads.html
    wget http://apache.arvixe.com/spark/spark-1.4.1/spark-1.4.1.tgz
  3. Download scala from http://www.scala-lang.org/download/
    wget http://downloads.typesafe.com/scala/2.11.7/scala-2.11.7.tgz
    
  4. Uncompress the spark and scala archive files you downloaded:
    tar -xvzf spark-1.4.1.tgz
    tar -xvzf scala-2.11.7.tgz
    

    The above step uncompresses the file contents into these directories: spark-1.4.1, scala-2.11.7

  5. Set the following environment variables – SCALA_HOME, SPARK_HOME, PATH:

    export SCALA_HOME=/Users/myuser/local/scala-2.11.7
    export SPARK_HOME=/Users/myuser/local/spark-1.4.1
    export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
    
  6. Run sbt – Scala build tool
    cd $SPARK_HOME
    build/sbt clean assembly
    
  7. Run spark shell
     ./bin/spark-shell 

    Sometimes you may get an error about an invalid sbt-launch file.
    To resolve this do the following:

    cd build
    wget http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/sbt-launch.jar
    mv sbt-launch.jar sbt-launch.0.13.7.jar
    

Reference Links

Daily Algo: Primality of a given n

There are various questions one can ask while investigating primes algorithmically. One might be : Given a number n, is it prime ? A naive, brute force approach would be to iterate through all the numbers i from 0 to n-1, and test if n%i ==0. However, this approach would be naive in the extreme, and a more efficient approach can be devised by noting a few points.

  1. 0 and 1 are not primes
  2. 2 is a prime. However, all multiples of 2 are not prime. Hence we can reduce the numbers we have to inspect by 50%.
  3. If a number m divides n, then n/m also divides n, and n/m <=m or vice-versa. Hence all we need is to examine numbers up to sqrt(n) and no further.

The Python code is shown below:

In [15]: import math
def isPrime(n):
    if n <= 1 or n%2==0: return False 
    if n==2: return True
    ulim = int(math.sqrt(n))+1
    for i in range(3,ulim+1,2):
        if n%i ==0:
            return False
    return True

In [16]: isPrime(6767)
Out[16]: False
In [17]: isPrime(62417)
Out[17]: True

The code is available for download here : primes.py

Running standalone program in Spark

In this article we will walk through the steps to setup and run a standalone program in Apache Spark for 3 languages – Python, Scala and Java.

As a first step, let us go ahead and obtain some some ready made code that implements the WordCount algorithm from the Learning Spark examples git repository.

Step 1 – Create code

First, cd to the directory of your choice and run:

git clone https://github.com/holdenk/learning-spark-examples

This will checkout code under the directory learning-spark-examples.
You will also need to download the file we’re using for this example, Thomas Jefferson’s Gettysburg Address.

Step 2 – Package and run the application

Our WordCount code is nicely isolated in the directory

learning-spark-examples/mini-complete-example.

We will need to package this code as an application so we can distribute this to a Spark cluster. There are different steps for this for Java, Scala and Python

Step 2a – Java

In the case of Java we use Maven to package our code into a jar. Maven which is a package manager is configured via a pom.xml file. The contents of the file are shown below:

<project>
 <groupId>com.oreilly.learningsparkexamples.mini</groupId>
  <artifactId>learning-spark-mini-example</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>example</name>
  <packaging>jar</packaging>
  <version>0.0.1</version>
  <dependencies>
   <dependency> <!-- Spark dependency -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.2.0</version>
    <scope>provided</scope>
   </dependency>
 </dependencies>
 <properties>
  <java.version>1.6</java.version>
 </properties>
 <build>
  <pluginManagement>
   <plugins>
    <plugin>
     <groupId>org.apache.maven.plugins</groupId>
     <artifactId>maven-compiler-plugin</artifactId>
     <version>3.1</version>
     <configuration>
      <source>${java.version}</source>
      <target>${java.version}</target>
     </configuration>
    </plugin>
   </plugins>
  </pluginManagement>
 </build>
</project>

The most important sections with respect to the name of the jar running the application are as follows:

<project>
 <groupId>com.oreilly.learningsparkexamples.mini</groupId>
 <artifactId>learning-spark-mini-example</artifactId>
 ...
 <version>0.0.1</version>
 ...
</project>

The groupId is used to determine the package namespace, and the artifactId and version are used to formulate the name of the resulting jar file.

To compile, and package the application run :

cd learning-spark-examples/mini-complete-example
mvn clean && mvn compile && mvn package

This creates the following file in the target directory:
learning-spark-mini-example-0.0.1.jar
Now that our application has been packaged, we launch it using the script spark-submit under $SPARK_HOME/bin. The advantage of this script is that it sets up the CLASSPATH and any other dependencies as well as supporting multiple deploy modes and cluster managers. The arguments to spark-submit are as follows:

$SPARK_HOME/bin/spark-submit
 --class <main-class> \
 --master <master-url> \
 --deploy-mode <deploy-mode> \
 --conf <key>=<value>
 ...
 <application-jar> \
 [application args]

Our WordCount application code takes 2 arguments :

  • input file
  • output dir

Note that the argument to the JavaPairRDD counts object is actually a directory name to which the output is saved.

Thus we run the Java application as follows:

$SPARK_HOME/bin/spark-submit --class \ 
com.oreilly.learningsparkexamples.mini.java.WordCount \
./target/learning-spark-mini-example-0.0.1.jar \
 ./gettysburg.txt ./java_output

The result of running the code above is the creation of the java_output directory:

-rw-r--r-- 1 user user 646 Dec 15 04:59 part-00000
-rw-r--r-- 1 user user 764 Dec 15 04:59 part-00001
-rw-r--r-- 1 user user   0 Dec 15 04:59 _SUCCESS

The 1st 5 lines of output from the part-0000 file looks as follows:

head -5 java_output/part-00000
(earth,1)
(created,1)
(under,1)
(this,4)
(God,1)

Step 2b – Scala

Now we turn our attention to creating an application in Scala.
sbt is an open source build tool and is the equivalent of Ant/Maven. The equivalent to the pom.xml file is the build.sbt file. This file looks like this:

cat build.sbt
name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
)

Building using sbt results in a jar file similar to that done for Java using Maven. To build this jar file, we do the following:

sbt clean package

This generates the jar file which is written to the target directory by default:

...
[info] Packaging /home/user/devel/learning-spark/mini-complete-example/target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 384 s, completed Dec 12, 2015 4:16:21 AM

We can then run the Scala application as follows:

$SPARK_HOME/bin/spark-submit \
 --class com.oreilly.learningsparkexamples.mini.scala.WordCount \
 ./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
 ./gettysburg.txt ./scala_output

This writes its output into the scala_output directory in a manner similar to the Java case :

$ l -ltr scala_output/
total 8
-rw-r--r-- 1 user user 746 Dec 14 04:54 part-00001
-rw-r--r-- 1 user user 646 Dec 14 04:54 part-00000
-rw-r--r-- 1 user user   0 Dec 14 04:54 _SUCCESS

Step 2c – Python

The Python case is much simpler than Java or Scala.
There is no build/packaging stage. All that needs to be done is to specify a source file using the --py-files of spark-submit. In this example there was no Python script supplied so I wrote this script, wordcount.py:


import sys, argparse, os
from operator import add

from pyspark import SparkContext


parser = argparse.ArgumentParser(description="Run wordcount in Spark")
parser.add_argument('-f', help='input file',dest='fileName',required=True)
parser.add_argument('-o', help='output dir',dest='outdir',required=True)
args=parser.parse_args()

def main():
    count_words(args.fileName)

def count_words(filename):
    sc = SparkContext(appName="WordCount in Python")
    in_lines = sc.textFile(filename, 1)
    inter_counts = in_lines.flatMap(lambda line: line.split(' ')) \
     .map(lambda x: (x.rstrip(',.'),1)).reduceByKey(add)
    total_counts = inter_counts.collect()
    outdir = args.outdir
    if not os.path.exists(outdir):
        os.makedirs(outdir)
    outfile = "{0}/{1}".format(outdir,"py_wordcount.txt")
    print "Outputting to {0}".format(outfile)
    with open(outfile, 'w') as fh:
        for (word, count) in total_counts:
            fh.write("{0}: {1}\n".format(word, count))
sc.stop()

if __name__ == "__main__":
    main()

We can then run it as follows:

$SPARK_HOME/bin/spark-submit ./src/wordcount.py -f gettysburg.txt -o py_output

Refinements

The strings in the gettysburg.txt file need to be stripped of non-alphanumeric in order to get a correct word count, otherwise strings such as “dead,” and “dead” would be counted as separate words.
In the Java case, to fix this, you can make this change to
WordCount.java:

return new Tuple2(x.replaceAll("[^A-Za-z0-9]", ""), 1);

Similarly for Scala, make this change :

 val counts = words.map(word => (word.replaceAll("[^A-Za-z0-9]", ""), 1)).reduceByKey{case (x, y) => x + y}

Depth First Search

Description

The goal of Depth First Search is to search as ‘deep’ into the graph as possible, which is opposed to Breadth First Search which is to search as ‘wide’ as possible. In DFS, edges are explored out of the most recently discovered vertex v that has unexplored edges out of it. When all of v’s edges have been explored, the search “backtracks” to explore other edges leaving the source vertex from which v was discovered. This process is continued until all the vertices reachable from the original source vertex have been discovered. If any undiscovered vertices remain, then one of them is selected as a new source and the search is repeated from that source. This process is repeated until all vertices have been discovered.
Unlike BFS, where the predecessor subgraph forms a tree, the predecessor subgraph formed by a DFS may be composed of several trees to form a DFS forest. This is because the search may be repeated from multiple sources.

Implementation

As in BFS, vertices are colored during ths search to indicate their state. Each vertex is initially white, is grayed when it is discovered in the search and blackened when finished, i.e. when it’s adjacency list/neighbor set has been examined completely. This guarantees that each vertex ends up in exactly one depth-first tree so that these trees are disjoint.
Besides creating a depth-first forest, depth-first search also timestamps each vertex. Each vertex v has 2 timestamps: the 1st timestamp disc[v] records when v is first discovered and grayed, and the 2nd timestamp fin[v] records when the search finishes examining v’s adjacency list and blackens v.

Pseudocode

The pseudocode is as follows:

DFS(G)
1 for each vertex u in V[G]
2      do color[u] <--- WHITE
3           pred[u] <--- NULL
4 time <--- 0
5  for each vertex u in V[G]
6        do if color[u] == WHITE
7             then DFS-VISIT(u)

DFS-VISIT
1 color[u] <--- WHITE  // White vertex u has just been discovered.
2 time <--- time + 1
3 disc[u] <-- time
4 for each v in Adj[u]     // Explore edge (u,v)
5       do if color[v] == WHITE
6       then pred[v] <--- u
7               DFS-VISIT(v)
8 color[u] <-- BLACK     // Blacken u; it is finished
9 fin[u] <--- time <--- time +1

Analysis

Total running time is: θ(V+E). DFS runs linear in the size of the adjacency list representation of the graph G.

Various Machine Learning Links

ML Tutorials and Tips

In-depth introduction to machine learning in 15 hours of expert videos
Machine Learning Practical Advice – Scikit-Learn
Introduction to Sckit-Learn (Vanderplass)

Data Mining

R and Data Mining: Examples and Case Studies

Kaggle

Winning Kaggle, An introduction to Re-Ranking
Titanic: Machine Learning from Disaster

Titanic

Titanic Survival Prediction Using R
Titanic Survival Prediction Using R (Wehrley)
Titanic with Decision Trees
Kaggle – Titanic Machine Learning from Disaster
Data Science Learning from the Titanic in R
EECS 349 Titanic – Machine Learning From Disaster
The sinking of the Titanic – Binary Logistic Regression in R
CS229 Titanic – Machine Learning from Disaster

Supervised Learning

Scikit-Learn Supervised Learning
DataRobot – Classification with Sckit-Learn

Logistic Regression

Logistic Regression with sckit-learn
Kaggle — Photo quality prediction

Decision Trees

Classification with Decision Tree Induction
Scikit-Learn Decision Tree Regression

Support Vector Machines

Scikit-Learn Support Vector Machines

Unsupervised Learning

Principal Component Analysis

PCA in an image with scikit-learn and scikit-image
Scikit-Learn Faces Dataset Decomposition
PCA for Machine Learning (R)

Clustering

Scikit-Learn K-Means Clustering

Breadth-First Search

Description

Given a graph G=(V,E) and a source vertex s, BFS systematically explores the edges of E to disover every vertex that is reachable from s.
It computes the distance (smallest no. of edges) from s to each reachable vertex. It produces a BFS tree with root s that contains all reachable vertices.
For any vertex v reachable from s, the path in the BFS tree from s to v corr. to a shortest path from s to v in G, i.e. path containing the smallest number of edges. BFS discovers all vertices at distance k from s before discovering any vertices at distance k+1.

Implementation

The pseudocode below assumes that the input graph G = (V,E) is represented using adjacency lists. It maintains several additional data fields with each vertex in the graph.
The color of each vertex u in V is stored in color[u], and the predecessor of u is stored in prev[u]. If u has no predecessor, then prev[u] = NULL. The distance from the source s to vertex u is stored in dist[u] . The algorithm uses a FIFO queue Q to manage the set of gray vertices.

Pseudocode

BFS(G,s)
1   for each vertex u in V[G] - {s}
2     do color[u] <-- WHITE
3         dist[u] <-- INF
4         prev[u] <-- NULL
5   color[s] <-- GRAY
6   dist[s] <-- 0
7   prev[s] <-- NULL
8   Q <-- {}
9   ENQUEUE(Q, s)
10 while Q!={}
11    do u <-- DEQUEUE(Q)
12      foreach v in Adj[u]
13         do if color[v]=WHITE
14              then color[v] <-- GRAY
15                  dist[v] <-- d[u] +1
16                  prev[v] <-- u
17                  ENQUEUE(Q,v)
18      color[u] <-- BLACK

Analysis

Total running time is: O(V+E). BFS runs linear in the size of the adjacency list representation of the graph G.

Source code:

BFS.py

Reference :  Introduction to Algorithms by Corman, Leiserson & Rivest (MIT Press)

Binary Search Trees

Definition

A binary search tree is a binary tree whose keys satisfy the binary search tree property:
Let x be a node in the BST.
If y is a node in the left subtree of x, then key[y] <= key[x].
If y is a node in the right subtree of x then key[y] >= key[x].

Basic operations on a binary search tree (BST) take time proportional to the height of the tree. For a complete binary tree with n nodes, such operations run in
O(lg n) worst-case time.
If the tree is a linear chain of n nodes, the same operations take O(n) worst case time. The expected height of a randomly built BST is O(lg n), so that basic dynamic set operations on such a tree take θ(lg n) time on average. There are variations of BSTs whose worst-case performance can be guaranteed to be good – red-black and B-trees are 2 such examples.

Basic operations on BSTs include:
SEARCH, MINIMUM, MAXIMUM, PREDECESSOR, SUCCESSOR, INSERT & DELETE.

The BST property allows us to print out all the keys in 3 different ways:

  1. InOrder Tree Walk – the key of the root of a subtree is printed between the values in its left subtree and those in its right subtree – LPR
  2. PreOrder Tree walk – root is printed before values in either subtree
  3. PostOrder Treee walk – root is printed after values in its subtrees.

It takes θ(n) time to walk an n -node binary tree.

Querying a binary search tree

Searching

We wishto search for a node with a given key in a BST. Given a pointer to the root of the tree and a key k, TREE_SEARCH returns a pointer to a node with key k if one exists, else it returns NULL. The search begins at the root and moves downward in the tree. For each node x it encounters, it compares the key k with key[x]. if the 2 keys are equal, the search terminates. If k < key[x] the search continues in the left subtree, else it continues in the right subtree. The running time of TREE_SEARCH is O(h), where h is the height of the tree.

TREE_SEARCH(x,k)
1 if x==NULL or k=key[x]
2    then return x
3 if k < key[x]
4    then return TREE_SEARCH(left[x],k)
5    else return TREE_SEARCH(right[x],k)

ITERATIVE_TREE_SEARCH(x,k)
1 while x!=NULL && k != key[x]
2   do if k < key[x]
3      then x <-- left[x]
4      else x <-- right[x]
5 return x

Maximum and Minimum

A minimum can always be found by following left child pointers down from the rroot until a NULL is reached :

TREE_MINIMUM(x)
1 while left[x] != NULL
2  do x <--- left[x]
3 return x

The pseudo-code for TREE_MAXIMUM is symmetric:

TREE_MAXIMUM(x)
1 while right[x] != NULL
2  do x <--- right[x]
3 return x

Both of these procedures run in O(h) time.

SuCcessor and predecessoR

It is sometimes important to be able to find the successor to a node in the sorted order determined by an inorder tree walk. There are 2 cases:

  1. If the right subtree of node x is nonempty, then the successor of x is the leftmost node in the right subtree, given by TREE_MINIMUM(right[x]).
  2. If the right subtree of x is empty, and x has a successor y, then y is the lowest ancestor of x whose left child is also an ancestor of x.
TREE_SUCCESSOR(x)
1 if right[x] != NULL
2   then return TREE_MINIMUM(right[x])
3 y <-- p[x]
4 while y!=NULL && x=right[y]
5      do x <--- y
6         y <--- p[y]
7 return y

TREE_PREDECESSOR is symmetric to TREE_SUCCESSOR and both run in time O(h),

Insertion and Deletion

Insertion

The TREE_INSERT procedure works as follows – walk down the tree from the root, choosing the left or right child until we find an appropriate leaf node from which to hang the new node. Hence if our new node is z, for each node x if key[z] < key [x], we know that z must hang somewhere off the left subtree of x, and set x as left[x] and as right[x] if NOT.
When we reach a leaf node we have found the node y to hang off and we assign this node y as parent[z]. We then determine if key[z] < key[y]. If so, then z becomes left[y] and if not, it becomes right[y].

TREE_INSERT(T,z)
1 y <--- NULL
2 x <--- root[T]
3 while x !=NULL
4  do y <-- x
5     if key[z] < key[x]
6     then x <--- left[x]
7     else x <--- right[x]
8 parent[z] <--- y        //At this point y=parent[x] from loop above
9 if y == NULL
10   then root[T] <--- z
11   else if key[z] < key[y]
12        then left[y] <--- z
13        else right[y] <--- z

Deletion

There are 3 cases to consider when deleting a node z from a BST.

  1. Node z is a leaf node and has no children In this case we modify parent[z] to replace z as its child with NULL.
  2. Node z has only 1 child. In this case we “splice out” z by making a new link between child[z] and parent[z].
  3. Node z has 2 children. In this case we splice out z ‘s successor y, which has no left child (else it wouldn’t be the successor) and replace z ‘s key and satellite data with y ‘s key and satellite data.
TREE_DELETE(T,z)
1 if left[z]==NULL or right[z]==NULL // Cases 1 && 2
2     then y <--- z                     // Cases 1 && 2
3     else y <--- TREE_SUCCESSOR(z)     // Case 3
4 if left[y] != NULL                    // Lines 4-6 set x to non-NULL child of y, or NULL
5     then x <--- left[y]               // if y has no children 
6     else x <--- right[y]
7 if x != NULL                          // Line 7-13 splice out y
8    then parent[x] <--- parent[y]
9 if parent[y]==NULL                    // y is root,splice it out and set its child as
10   then root[T] <--- x                // root
11   else if y==left[parent[y]]          // If y is the left child of its parent, 
12         then left[parent[y]]  <--- x // splice it out and replace it with it's child x
13         else right[parent[y]] <--- x // else replace the right child of y's parent  
14 if y != z                             // In lines 14-16, if the successor of z y was the
15    then key[z] <--- key[y]           // node spliced out, replace z's key and data with
16       copy y's satellite data into z // that of y
17 return y 

Both insertion and deletion procedures run in O(h) time on a tree of height h.
References :
http://algs4.cs.princeton.edu/32bst

 

Top N Queries in SQL

One query that is one often encounters is to answer the question, given data in a database table, how do we obtain the TOP N based on some column of the table ?
There are a few ways to do this, but I will present the most popular 2 methods.

I will illustrate using the following table, emp:

SELECT * FROM emp;
+-------+--------+-----------+------+------------+---------+---------+--------+
| empno | ename | job | mgr | hiredate | sal | comm | deptno |
+-------+--------+-----------+------+------------+---------+---------+--------+
| 7369 | SMITH | CLERK | 7902 | 1980-12-17 | 800.00 | NULL | 20 |
| 7499 | ALLEN | SALESMAN | 7698 | 1981-02-20 | 1600.00 | 300.00 | 30 |
| 7521 | WARD | SALESMAN | 7698 | 1981-02-22 | 1250.00 | 500.00 | 30 |
| 7566 | JONES | MANAGER | 7839 | 1981-04-02 | 2975.00 | NULL | 20 |
| 7654 | MARTIN | SALESMAN | 7698 | 1981-09-28 | 1250.00 | 1400.00 | 30 |
| 7698 | BLAKE | MANAGER | 7839 | 1981-05-01 | 2850.00 | NULL | 30 |
| 7782 | CLARK | MANAGER | 7839 | 1981-06-09 | 2450.00 | NULL | 10 |
| 7788 | SCOTT | ANALYST | 7566 | 1982-12-09 | 3000.00 | NULL | 20 |
| 7839 | KING | PRESIDENT | NULL | 1981-11-17 | 5000.00 | NULL | 10 |
| 7844 | TURNER | SALESMAN | 7698 | 1981-09-08 | 1500.00 | 0.00 | 30 |
| 7876 | ADAMS | CLERK | 7788 | 1983-01-12 | 1100.00 | NULL | 20 |
| 7900 | JAMES | CLERK | 7698 | 1981-12-03 | 950.00 | NULL | 30 |
| 7902 | FORD | ANALYST | 7566 | 1981-12-03 | 3000.00 | NULL | 20 |
| 7934 | MILLER | CLERK | 7782 | 1982-01-23 | 1300.00 | NULL | 10 |
+-------+--------+-----------+------+------------+---------+---------+--------+
14 rows in set (0.00 sec)

Suppose we wish to obtain the top 5 employees by salary, how can one do this ?

  1. Using the RANK() function.
    In databases which provide the RANK function, we can obtain the top 5 employees
    by salary using the following query:SELECT empno, ename, sal
    FROM (SELECT empno, ename, sal, RANK() OVER (ORDER BY sal DESC) sal_rank
    FROM emp)
    WHERE sal_rank <= 5;
    +-------+-------+---------+
    | empno | ename | sal |
    +-------+-------+---------+
    | 7839 | KING | 5000.00 |
    | 7788 | SCOTT | 3000.00 |
    | 7902 | FORD | 3000.00 |
    | 7566 | JONES | 2975.00 |
    | 7698 | BLAKE | 2850.00 |
    +-------+-------+---------+
    5 rows returned in 0.02 seconds
  2. Using an expression that limits the number of rows returned from an ordered SQL result set.

The list of expressions for the databases are shown below:

 

Database Expression
Oracle ROWCOUNT
MySQL/PostgreSQL/Vertica LIMIT
Sybase ROWNUM
MS SQL TOP

ii. Here are the corr. queries for each database:

Vertica/PostgreSQL/MySQL
SELECT empno, ename, sal FROM (SELECT empno, ename, sal ORDER BY sal DESC) a
LIMIT 5;

Oracle
SELECT * FROM (SELECT empno, ename, sal FROM emp ORDER BY sal DESC) A WHERE ROWNUM <= 5;
Sybase
SET ROWCOUNT 10;
SELECT empno, ename, sal FROM emp ORDER BY sal DESC;

MS SQL
SELECT TOP 5 FROM (SELECT empno, ename, sal FROM emp ORDER BY sal DESC);