Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problemsĀ I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
...
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]: topTweets.show()
+--------------------+------------+
|                text|retweetCount|
+--------------------+------------+
|Adventures With C...|           0|
+--------------------+------------+

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"http://twitter.com\" rel\u003d\"nofollow\"\u003eTwitter Web Client
...

However, for a json file that looks like this:

{
  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
   {
    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null
   }, 
...

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
         root
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends
          DataFrame[]

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
          multiline_rdd=sc.wholeTextFiles(inputFile)
          type(multiline_rdd)
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
Out[102]:[(u'file:/home/myuser/twitter_trends.json',
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }
...

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = multiline_rdd.map(lambda x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \
                    flags=re.UNICODE))

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
Out[105]:
[u'{"created_at":"2015-12-23T23:39:06Z","trends":[{"url":"http://twitter.com/search?q=%23HappyBirthdayLouis","query":"%23HappyBirthdayLouis","tweet_volume":658249,"name":"#HappyBirthdayLouis","promoted_content":null},{"url":"http://twitter.com/search?q=%23ShadeHour","query":"%23ShadeHour","tweet_volume":null,"name":"#ShadeHour","promoted_content":null},
,,,
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          root
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)
          ...

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
                                    tweet_trends")
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
Out[122]:DataFrame[locations:array<struct<name:string,woeid:bigint>>]
In [123]: trends_data.show()
          +--------------------+
          |           locations|
          +--------------------+
          |List([SouthAfrica...|
          +--------------------+

How to set up an Apache Spark Cluster

I now detail the steps I took to setup an Apache Spark Cluster on my home network.
I have the following 2 hosts:

  • G50 Lenovo laptop running Ubuntu 15.04 (slave/worker)
  • iMac desktop running Mac OS X 10 (master)

Step 1 – Install Spark into same location on both machines

My first step was to make sure I installed Apache Spark into the same location : /opt/spark-1.4.1
The steps for doing this are detailed here: How to install Apache Spark

Step 2 – Modify the config file on master machine

Edit the following file on the master machine, making sure to add
the host name of the slave machine: $SPARK_HOME/conf/slaves

Step 3 – Setup ssh password-less login

The master needs to be able to login without a password on the slave machine. To enable this, run the following on the master machine:

ssh-keygen -t dsa
Enter file in which to save the key (/home/youruser/.ssh/id_dsa): [ENTER]
Enter passphrase (empty for no passphrase): [EMPTY]
Enter same passphrase again: [EMPTY]

Next, copy this file from the master to the the slave machine
/home/youruser/.ssh/id_dsa.pub

and then run:

$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
$ chmod 644 ~/.ssh/authorized_keys

 

Step 4 – Start the cluster from the master machine

Run the following: sbin/start-all.sh

This should start up the cluster and produce output that looks like
this:

starting org.apache.spark.deploy.master.Master, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.master.Master-1-macdesk.fios-router.home.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-macdesk.fios-router.home.out
fembuntu: starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark-1.4.1/sbin/../logs/spark-femibyte-org.apache.spark.deploy.worker.Worker-1-fembuntu.out

Stopping the cluster

The cluster can be stopped as follows: sbin/stop-all.sh
and produces output that looks like:

localhost: stopping org.apache.spark.deploy.worker.Worker
fembuntu: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master

Monitoring from the master web ui

The cluster can be monitored from the web ui of the master node which is accessible from port 8080 (default). It should look similar to the following:

spark_web_ui

Troubleshooting

I obtained the following errors on the slave machine:

16/01/03 05:40:24 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@master:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: master/192.269.8.153:7077
16/01/03 05:41:30 ERROR Worker: All masters are unresponsive! Giving up.

The problem is that I had 2 separate ip addresses pointing to the
same hostname for my slave machine (it was both on the wifi network and connected via ethernet as well).

Reference links

How to install Apache Spark

Mac OS/X

Installing Spark on Mac OS/X will involve downloading the required
archive files into the final location where we want to run Spark and also Scala from. On my machine that directory location is:
/Users/myuser/local

  1. cd to destination directory:
    cd /Users/myuser/local
  2. Download Spark from http://spark.apache.org/downloads.html
    wget http://apache.arvixe.com/spark/spark-1.4.1/spark-1.4.1.tgz
  3. Download scala from http://www.scala-lang.org/download/
    wget http://downloads.typesafe.com/scala/2.11.7/scala-2.11.7.tgz
    
  4. Uncompress the spark and scala archive files you downloaded:
    tar -xvzf spark-1.4.1.tgz
    tar -xvzf scala-2.11.7.tgz
    

    The above step uncompresses the file contents into these directories: spark-1.4.1, scala-2.11.7

  5. Set the following environment variables – SCALA_HOME, SPARK_HOME, PATH:

    export SCALA_HOME=/Users/myuser/local/scala-2.11.7
    export SPARK_HOME=/Users/myuser/local/spark-1.4.1
    export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
    
  6. Run sbt – Scala build tool
    cd $SPARK_HOME
    build/sbt clean assembly
    
  7. Run spark shell
     ./bin/spark-shell 

    Sometimes you may get an error about an invalid sbt-launch file.
    To resolve this do the following:

    cd build
    wget http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/sbt-launch.jar
    mv sbt-launch.jar sbt-launch.0.13.7.jar
    

Reference Links

Daily Algo: Primality of a given n

There are various questions one can ask while investigating primes algorithmically. One might be : Given a number n, is it prime ? A naive, brute force approach would be to iterate through all the numbers i from 0 to n-1, and test if n%i ==0. However, this approach would be naive in the extreme, and a more efficient approach can be devised by noting a few points.

  1. 0 and 1 are not primes
  2. 2 is a prime. However, all multiples of 2 are not prime. Hence we can reduce the numbers we have to inspect by 50%.
  3. If a number m divides n, then n/m also divides n, and n/m <=m or vice-versa. Hence all we need is to examine numbers up to sqrt(n) and no further.

The Python code is shown below:

In [15]: import math
def isPrime(n):
    if n <= 1 or n%2==0: return False 
    if n==2: return True
    ulim = int(math.sqrt(n))+1
    for i in range(3,ulim+1,2):
        if n%i ==0:
            return False
    return True

In [16]: isPrime(6767)
Out[16]: False
In [17]: isPrime(62417)
Out[17]: True

The code is available for download here : primes.py