Spark Adventures –
Processing Multi-line JSON files

This series of blog posts will cover unusual problemsĀ I’ve encountered on my Spark journey for which the solutions are not obvious.

Everyone who has read the seminal book Learning Spark has encountered this example in chapter 9 – Spark SQL on how to ingest JSON data from a file using the Hive context to produce a resulting Spark SQL DataFrame:

In [84]: hiveCtx = sqlContext
In [85]: inputFile="/home/myuser/testweet.json"
         tweets_df = hiveCtx.jsonFile(inputFile)

In [86]: type(tweets_df)
Out[86]: pyspark.sql.dataframe.DataFrame

In [87]:tweets_df.printSchema()
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
# Register the input DataFrame
In [88]:tweets_df.registerTempTable("tweets")
In [89]: topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
In [90]: print topTweets
         DataFrame[text: string, retweetCount: bigint]
In [91]:
|                text|retweetCount|
|Adventures With C...|           0|

I am using PySpark above, and the hive context is already available.
However, this works only when the JSON file is well formatted i.e. each line of the file is a JSON object. The file above looks like this:

{"createdAt":"Nov 4, 2014 4:56:59 PM","id":529799371026485248,"text":"Adventures With Coffee, Code, and Writing.","source":"\u003ca href\u003d\"\" rel\u003d\"nofollow\"\u003eTwitter Web Client

However, for a json file that looks like this:

  "created_at": "2015-12-23T23:39:06Z", 
  "trends": [
    "url": "", 
    "query": "%23HappyBirthdayLouis", 
    "tweet_volume": 658249, 
    "name": "#HappyBirthdayLouis", 
    "promoted_content": null

this is what we get when we use the approach above:

[In 93]: inputFile="/home/myuser/tweet_trends.json"
         tweet_trends = hiveCtx.jsonFile(inputFile)
#Display schema
In [94]: tweet_trends.printSchema()
In [97]: tweet_trends.registerTempTable("tweet_trends")
In [98]: tweet_trends = hiveCtx.sql("""SELECT *  FROM tweet_trends LIMIT 10""")
In [99]:  print tweet_trends

This results in an empty dataframe.
To resolve this we can use the SparkContext.wholeTextFiles() method:

In [100]: import json
Out[100]: pyspark.rdd.RDD

This produces an RDD that looks like this:

In [102]:multiline_rdd.take(1)
  u' {\n  "created_at": "2015-12-23T23:39:06Z", \n  "trends": [\n   {\n    "url": "", \n    "query": "%23HappyBirthdayLouis", \n    "tweet_volume": 658249, \n    "name": "#HappyBirthdayLouis", \n    "promoted_content": null\n   }

The RDD consists of a tuple whose 1st element is a filename and whose 2nd element is the data with the lines separated by whitespace.
In order to prepare the data for proper ingestion by Spark SQL, we utilize the following transformation:

In [59]: #Remove all whitespace chars
         import re
         json_rdd = x : x[1])\
                    .map(lambda x : re.sub(r"\s+", "", x, \

The resulting rdd now looks like this:

In [105]: json_rdd.take(1)
In [106]: type(json_rdd)
Out[106]: pyspark.rdd.PipelinedRDD

We can now use the jsonRDD() method to produce a Spark SQL DataFrame:

In [110]: json_df=hiveCtx.jsonRDD(json_rdd)
In [111]: type(json_RDD)
Out[111]: pyspark.sql.dataframe.DataFrame

In [114]: json_df.printSchema()
          |-- as_of: string (nullable = true)
          |-- created_at: string (nullable = true)
          |-- locations: array (nullable = true)
          |    |-- element: struct (containsNull = true)
          |    |    |-- name: string (nullable = true)
          |    |    |-- woeid: long (nullable = true)

Now register a temp table in Hive and select some data:

In [116]: json_df.registerTempTable("tweet_trends")
In [119]: trends_data = hiveCtx.sql("SELECT locations FROM 
In [120]: type(trends_data)
Out[120]: pyspark.sql.dataframe.DataFrame
In [122]: trends_data
In [123]:
          |           locations|

5 thoughts on “Spark Adventures –
Processing Multi-line JSON files”

  1. Its almost a week i was struggling to process the complex json, am relaxed now, Its working perfectly as expected, Thanks for the post. Keep it up!!!

    1. here you go in scala..

      val jsonRDD = sc.wholeTextFiles(“s3a://testing/*.json”)

      val JsonRDDValue =>x.replace(“//s”,””))

      val df =

Leave a Reply

Your email address will not be published. Required fields are marked *