Loading Snowplow events into Apache Spark and Zeppelin on EMR [tutorial]


#1

Apache Zeppelin is an open source project which allows users to create and share web-based notebooks for data exploration in Spark. We are a big fan of notebooks at Snowplow because of their interactive and collaborative nature. You can model the data using Scala, Python and Spark SQL, create and publish visualisations, and share the results with colleagues.

In this post, I’ll show how to get Zeppelin up and running on EMR and how to load Snowplow data from S3.

1. Launching an EMR cluster with Spark and Zeppelin

  1. Log on to the AWS management console.

  2. Navigate to EMR and choose Create cluster.

  3. Click on Go to advanced options.

  4. In the Software Configuration section, select Spark and Zeppelin-Sandbox. You don’t need to include Pig and Hue.

  5. Move on to the next step. In the Hardware Configuration section, change the instance type from m3.xlarge to r3.xlarge. These have more memory than general purpose instance types. You can keep the default number of instances (3 in this case).

  6. Move on to step 3 and give the cluster a name. You don’t need to change the other fields.

  7. Move on to the final step. In the Security Options section, pick an EC2 key pair that is available in your current region and for which you have the PEM file. If you don’t have one, follow the instructions on this page.

  8. Click Create cluster and consider getting a cup of coffee :coffee: (it will take a couple of minutes).

2. Connecting to the EMR cluster

  1. Once the cluster is up and running, click on the cluster name to view the details. Click on SSH in the Master public DNS field.

  2. Run chmod 400 ~/<YOUR-FILENAME.pem> to make sure the PEM file has the correct permissions.

  3. You can now SSH onto the cluster:

    ssh -i ~/<YOUR-FILENAME.pem> hadoop@ec2-<PLACEHOLDER>.compute.amazonaws.com
    
  4. Dismiss the warning.

3. Connecting to Zeppelin

  1. To connect to the Zeppelin UI, create an SSH tunnel between a local port (8889 in this example) and 8890 (the default for Zeppelin):

    ssh -i <YOUR-FILENAME.pem> -N -L 8889:ec2-<PLACEHOLDER>.compute.amazonaws.com:8890 hadoop@ec2-<PLACEHOLDER>.compute.amazonaws.com
    

    It’s possible this won’t work. If you get this error:

    channel 1: open failed: administratively prohibited: open failed
    

    You can try replacing 8889:ec2-<PLACEHOLDER>.compute.amazonaws.com:8890 with 8889:localhost:8890 (StackExchange discussion).

  2. Browse to http://localhost:8889 to access the Zeppelin UI:

4. Loading data from S3

  1. Choose Notebook and Create new note.

  2. You will need load the Snowplow Scala Analytics SDK first to be able to use the EventTransformer later on. Paste this into Zeppelin and click run:

    %dep
    z.reset()
    
    z.addRepo("Snowplow Analytics").url("http://maven.snplow.com/releases/")
    z.load("com.snowplowanalytics:snowplow-scala-analytics-sdk_2.10:0.1.0")
    

    It’s possible you will encounter this error:

    Must be used before SparkInterpreter (%spark) initialized
    

    If that’s the case, go to Interpreter and restart spark. You should now be able to rerun this paragraph.

  3. Let’s start with loading some enriched events from S3 into Spark:

    %spark
    
    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<REPLACE_WITH_ACCESS_KEY>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<REPLACE_WITH_SECRET_ACCESS_KEY>")
    
    val inDir = "s3n://path/to/enriched/events/*"
    val input = sc.textFile(inDir)
    
    input.first
    

    This creates an RDD (input), and input.first returns the first line of the RDD.

  4. The next step is to transform the complex TSV string into something that’s a little nicer to work with. Paste this into Zeppelin and click run:

    import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer
    
    val jsons = input.
      map(line => EventTransformer.transform(line)).
      filter(_.isSuccess).
      flatMap(_.toOption)
    
    jsons.first
    

    Note that the EvenTransformer requires that the events were enriched using Snowplow R75 or later (we plan to support older versions soon).

  5. You can now convert jsons into a DataFrame:

    import org.apache.spark.sql.SQLContext
    val sqlContext = new SQLContext(sc)
    
    import sqlContext.implicits._
    val df = sqlContext.read.json(jsons)
    
    df.printSchema()
    

This wraps up this guide on loading Snowplow data into Zeppelin on EMR. For more details on how to use the DataFrame API, check out this post:

http://discourse.snowplowanalytics.com/t/running-sql-queries-on-dataframes-in-spark-sql-updated/119


Debugging bad rows in Spark and Zeppelin [tutorial]
Minimal Enrich Setup?
#2

Just a shout out if you are following this guide - Make sure not to use Spark 2.0

When creating EMR cluster - make sure to pick a release that is < 5. The one used in this tutorial is emr-4.5.0.

i was having issues loading the dependencies here on spark 2.0:

%dep
z.reset()

z.addRepo("Snowplow Analytics").url("http://maven.snplow.com/releases/")
z.load("com.snowplowanalytics:snowplow-scala-analytics-sdk_2.10:0.1.0")

Thank you so much for the in-depth step by step guide Snowplow Team - you guys rock!!!