Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problemsĀ I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
...
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]: topTweets.show()
+--------------------+------------+
|                text|retweetCount|
+--------------------+------------+
|Adventures With C...|           0|
+--------------------+------------+

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"http://twitter.com\" rel\u003d\"nofollow\"\u003eTwitter Web Client
...

However, for a json file that looks like this:

{
  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
   {
    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null
   }, 
...

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
         root
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends
          DataFrame[]

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
          multiline_rdd=sc.wholeTextFiles(inputFile)
          type(multiline_rdd)
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
Out[102]:[(u'file:/home/myuser/twitter_trends.json',
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "http://twitter.com/search?q=%23HappyBirthdayLouis", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }
...

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = multiline_rdd.map(lambda x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \
                    flags=re.UNICODE))

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
Out[105]:
[u'{"created_at":"2015-12-23T23:39:06Z","trends":[{"url":"http://twitter.com/search?q=%23HappyBirthdayLouis","query":"%23HappyBirthdayLouis","tweet_volume":658249,"name":"#HappyBirthdayLouis","promoted_content":null},{"url":"http://twitter.com/search?q=%23ShadeHour","query":"%23ShadeHour","tweet_volume":null,"name":"#ShadeHour","promoted_content":null},
,,,
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          root
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)
          ...

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
                                    tweet_trends")
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
Out[122]:DataFrame[locations:array<struct<name:string,woeid:bigint>>]
In [123]: trends_data.show()
          +--------------------+
          |           locations|
          +--------------------+
          |List([SouthAfrica...|
          +--------------------+

5 thoughts on “Spark Adventures –
Processing Multi-line JSON files”

  1. Its almost a week i was struggling to process the complex json, am relaxed now, Its working perfectly as expected, Thanks for the post. Keep it up!!!

    1. here you go in scala..

      val jsonRDD = sc.wholeTextFiles(“s3a://testing/*.json”)

      val JsonRDDValue = jsonRDD.values.map(x=>x.replace(“//s”,””))

      val df = hiveContext.read.json(JsonRDDValue)

      df.show

Leave a Reply

Your email address will not be published. Required fields are marked *