Running SQL queries on DataFrames in Spark SQL [updated]


#1

This post is an updated version of a recent blogpost on data modeling in Spark.

We have been thinking about Apache Spark for some time now at Snowplow. This post is the first in a series that will explore data modeling in Spark using Snowplow data. It’s similar to Justine’s write-up and covers the basics: loading events into a Spark DataFrame on a local machine and running simple SQL queries against the data.

Data modeling is a critical step in the Snowplow pipeline: it’s the stage at which business logic gets applied to the data. The event stream describes all that has happened up to a certain point in time. It therefore needs to be transformed before it becomes meaningful to an end user in the business. Because the logic gets applied at a later stage, it remains possible to revisit and iterate on earlier decisions.

Most Snowplow users do their data modeling in SQL using our open source tool SQL Runner or a BI tool such a Looker. We hope Spark will turn out to be a great addition to the data modeling toolkit.

Excited? Let’s get started!

Loading Snowplow data into Spark

Make sure git, Vagrant and VirtualBox are installed. To get started with Spark, clone the Snowplow repo, switch to the feature/spark-data-modeling branch, vagrant up and vagrant ssh onto the box:

host$ git clone https://github.com/snowplow/snowplow.git
host$ cd snowplow
host$ git checkout feature/spark-data-modeling
host$ vagrant up && vagrant ssh
guest$ cd /vagrant/5-data-modeling/spark
guest$ sbt console

This last step opens the Scala console, which gives us access to all the libraries included in the spark-data-modeling project. We start with defining a SparkContext:

import org.apache.spark.{SparkContext, SparkConf}

val sc = {
  val conf = new SparkConf()
    .setAppName("sparkDataModeling") // replace with app name
    .setMaster("local") // we are running Spark on a local machine
  new SparkContext(conf)
}

The SparkContext represents the connection to the Spark cluster.

Let’s now load some enriched events from S3 into Spark. I recommend creating a new S3 bucket with some Snowplow data that can serve as a sandbox. A single run should be enough to start with. The path to the enriched events should look something like this:

snowplow-enrichment-archive/enriched/good/run=2015-12-31-23-59-59/

We can now load this data into Spark and create a Resilient Distributed Dataset (RDD):

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "REPLACE_WITH_ACCESS_KEY")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "REPLACE_WITH_SECRET_ACCESS_KEY")

val inDir = "s3n://path/to/enriched/events/*"
val input = sc.textFile(inDir)

Make sure to add in the actual path and AWS credentials.

Let’s take a look at the first line of this RDD:

scala> input.first
res0: String = "demo	web	2015-12-01 08:32:35.048	2015-12-01 04:00:54.000	2015-12-01 03:57:08.986	page_view	f4b8dd3c-85ef-4c42-9207-11ef61b2a46e	co	js-2.5.0	clj-1.0.0-tom-0.2.0	hadoop-1.0.0-common-0.14.0	1316246087	82bc4fba034dn16b	9	3456beda-f4f8-4795-bd95-897d05d23a58	US	NY	New York	New York	Time Warner Cable	Time Warner Cable	http://snowplow.io/blog/	Latest news – Blog – Snowplow	http	snowplow.io	80	/blog/	Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/601.2.7 (KHTML, like Gecko) Version/9.0.1 Safari/601.2.7	Safari	Safari	9.0.1	Browser	WEBK...

This is what we would expect a TSV to look like.

Loading the data into a Spark DataFrame

We want to load our events into a Spark DataFrame, a distributed collection of data organized into named columns. This concept is similar to a data frame in R or a table in a relational database.

Let’s start with transforming the RDD into a more suitable format using the EventTransformer object (part of the Snowplow Scala Analytics SDK):

import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer

val jsons = input.
  map(line => EventTransformer.transform(line)).
  filter(_.isSuccess).
  flatMap(_.toOption).
  persist

Note that the EvenTransformer requires that the events were enriched using Snowplow R75 or later (we plan to support older versions soon).

The events are now in a format that is nicer to work with in Spark.

scala> jsons.first
res1: String = {"app_id":"demo","platform":"web","etl_tstamp":"2015-12-01T08:32:35.048Z","collector_tstamp":"2015-12-01T04:00:54.000Z","dvce_tstamp":"2015-12-01T03:57:08.986Z","event":"page_view","event_id":"f4b8dd3c-85ef-4c42-9207-11ef61b2a46e","txn_id":null,"name_tracker":"co","v_tracker":"js-2.5.0","v_collector":"clj-1.0.0-tom-0.2.0","v_etl":"hadoop-1.0.0-common-0.14.0","user_id":null,"user_fingerprint":"1316246087","domain_userid":"82bc4fba034dn16b","domain_sessionidx":9,"network_userid":"3456beda-f4f8-4795-bd95-897d05d23a58","geo_country":"US","geo_region":"NY","geo_city":"New York","ip_isp":"Time W...

We can now load this into a Spark DataFrame. First, create a SQL Context:

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

The SQL Context allows us to create DataFrames and execute SQL queries.

// this is used to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._

val df = sqlContext.read.json(jsons)

We have now converted the RDD into a DataFrame. To show the top 5 rows and print the schema, run:

scala> df.show(5)
scala> df.printSchema
root
 |-- app_id: string (nullable = true)
 |-- base_currency: string (nullable = true)
 |-- br_colordepth: string (nullable = true)
 |-- br_cookies: boolean (nullable = true)
 |-- br_family: string (nullable = true)
 |-- br_features_director: boolean (nullable = true)
 |-- br_features_flash: boolean (nullable = true)
 |-- br_features_gears: boolean (nullable = true)
 |-- br_features_java: boolean (nullable = true)
 |-- br_features_pdf: boolean (nullable = true)
 |-- br_features_quicktime: boolean (nullable = true)
 |-- br_features_realplayer: boolean (nullable = true)
 |-- br_features_silverlight: boolean (nullable = true)
 |-- br_features_windowsmedia: boolean (nullable = true)
 |-- br_lang: string (nullable = true)
 |-- br_name: string (nullable = true)
 |-- br_renderengine: string (nullable = true)
 |-- br_type: string (nullable = true)
 |-- br_version: string (nullable = true)
 |-- br_viewheight: long (nullable = true)
 |-- br_viewwidth: long (nullable = true)
 |-- collector_tstamp: string (nullable = true)
...

Running SQL queries on Spark DataFrames

Now that our events are in a DataFrame, we can run start to model the data. We will limit ourselves to simple SQL queries for now. In the next blogpost, we will start using the actual DataFrame API, which will enable us to build advanced data models.

To run SQL queries against the data, we first need to register a table:

df.registerTempTable("events")

We can now reference this table in subsequent SQL statements. For example:

scala> sqlContext.sql("SELECT domain_userid, COUNT(*) AS count FROM events GROUP BY domain_userid").show(5)
+----------------+-----+
|   domain_userid|count|
+----------------+-----+
|50e543349f257eb1|    1|
|4f9125032f38a282|   16|
|ddb077fa82bd1864|    8|
|0cb1263f234dabc4|    1|
|35a83cde08fdf4e1|    1|
+----------------+-----+

To store the output in another DataFrame, we run:

val dfVisitors = sqlContext.sql("SELECT domain_userid, MAX(domain_sessionidx) AS sessions FROM events GROUP BY domain_userid")

dfVisitors.registerTempTable("visitors")

Joins are also supported:

scala> sqlContext.sql("SELECT a.domain_userid, b.sessions, COUNT(*) AS count FROM events AS a LEFT JOIN visitors AS b ON a.domain_userid = b.domain_userid GROUP BY a.domain_userid, b.sessions").show(5)
+----------------+--------+-----+
|   domain_userid|sessions|count|
+----------------+--------+-----+
|50e543349f257eb1|       2|    1|
|4f9125032f38a282|       1|   16|
|ddb077fa82bd1864|       1|    8|
|0cb1263f234dabc4|      10|    1|
|35a83cde08fdf4e1|       3|    1|
+----------------+--------+-----+

It’s of course possible to run more complex SQL queries, even though not all functions one would use in Redshift are supported. To take full advantage of Spark, however, we will need to drop one level down and start to use the DataFrame API itself. This is what we will in explore in the next post.

In a future post, we will also start running Spark on larger datasets in both Databricks and EMR.

In the meantime, let us know if you have any questions or feedback!


Debugging bad rows in Spark and Zeppelin [tutorial]
Loading Snowplow events into Apache Spark and Zeppelin on EMR [tutorial]
#2

Awesome write-up @christophe - i did want to point out the link to Justine’s write-up mentioned at the top is pointing to discourse rather than snowplowanalytics.

current: http://discourse.snowplowanalytics.com/blog/2015/05/21/first-experiments-with-apache-spark/
updated: http://snowplowanalytics.com/blog/2015/05/21/first-experiments-with-apache-spark/

Thanks for this write-up. I am very interested in playing with Spark + Snowplow and happy to hear you guys are looking at this as well!


#3

Thanks @13scoobie. We’ve fixed it.