Issue implementing the Scala Analytics SDK

Hi,

I created real time data pipeline, and using the Stream Enrich I’m getting the tab delimited data in a kinesis stream.

But I want go get the data in a JSON format, so on some searching I find the option to use snowplow-scala-analytics-sdk-0.1.0 released.

I used the installation step by:

// Resolvers
val snowplowRepo = "Snowplow Analytics" at "http://maven.snplow.com/releases/"
// Dependency
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % "0.1.0"

And then used in Apache Spark application as:

import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer
val events = input
    .map(line => EventTransformer.transform(line))
    .filter(_.isSuccess)
    .flatMap(_.toOption)

But when I run the Spark app, I got the below error

Exception in thread "main" java.lang.NoClassDefFoundError: scalaz/Validation_
         at SparkEnrich$.main(sparkenrich.scala:12)_
         at SparkEnrich.main(sparkenrich.scala)_
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)_
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_
         at java.lang.reflect.Method.invoke(Method.java:606)_
         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)_
         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)_
         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)_
         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)_
         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)_
Caused by: java.lang.ClassNotFoundException: scalaz.Validation_

I don’t see much documentation around some implementing the SDK. Please can someone help to fix this or provide sample code to use this in a Spark Scala app.

My end goal is to transform the tab delimited data from enrich job in a JSON format.

Thanks
Puneet

Hi Puneet,

Sorry to hear you are having problems! It looks like at least one dependency (scalaz) is missing when you run your Spark Scala app. How are you running your app? Are you creating a fat jar which bundles all dependencies?

Cheers,

Alex

@alex thanks the fat jar resolved the error.

Now I should point the enrich stream from the Kinesis as an input to the EventTransformer in my spark app to?

And this will give me the json response.

Thanks
Puneet Babbar

Hi @PuneetBabbar - that’s good news. Indeed you always need to deploy any Scala app using the Snowplow Scala Analytics SDK as a fatjar.

Yes, you should configure Spark Streaming with a Kinesis source, and apply the EventTransformer to that source.

Hi Alex,

How are doing, need a little help again.

So I managed to pull in kinesis stream and even able to store the raw beacon from the spark streaming to S3 path.

But now when i apply the transform function of the Analytic SDK it give empty response, ideally it should return a json response.

import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer
val kinesisStreams = (0 until numShards).map { i =>
		  KinesisUtils.createStream(ssc, streamName, endpointUrl,  Seconds(10),
			InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_AND_DISK_2)
		}
	
	
    // Union all the streams
    val unionStream = ssc.union(kinesisStreams)

	// convert each record of type Byte to string
	val words = unionStream.flatMap(new String(_).split("\\s+"))
	val prefix = "s3://cdpsnowplow/2016-07-25/"
	words.foreachRDD(wordRdd => {
		if (!wordRdd.isEmpty) {
			val events =wordRdd.map(line => EventTransformer.transform(line)).flatMap(_.toOption)
			events.saveAsTextFile(prefix)
		}
	})
	// Start the streaming context and await termination
        ssc.start()
      ssc.awaitTermination()

Please can you help, where I’m going wrong.

Thanks
Puneet

Scala’s not my thing but it could mean the transform failed for some reason. In my experience transform() returns nothing on lines that failed to parse.

If that’s the case then: EventTransformer.transform(line)).filter(_.isFailure).flatMap(_.toOption).size() should return greater than 0.

edit - above should be: EventTransformer.transform(line)).filter(_.isFailure).size()

@Shin is right - most likely is that your lines are ending up on the Failure[] side of the transformation output (rather than as a Success[String]. You need to dig into the failure messages to find out what is going wrong…

Thanks alot guys.

so finally , this is what im getting

Failure(NonEmptyList(Expected 131 fields, received 1 fields. This may be caused by attempting to use this SDK version on an older or newer version of Snowplow enriched events.))

I’m using snowplow-stream-enrich-0.8.0 enrichment version. And this is latest what is provided in Hosted assets · snowplow/snowplow Wiki · GitHub link.

Before the transform step the raw kinesis stream looks fine. But not sure where im going wrong.

Thanks
Puneet

Hey @PuneetBabbar - it doesn’t sound like you are feeding the transformer with Snowplow enriched events. The strings you are providing to the transformer have no tabs in them, which does not fit with the Snowplow enriched event at all.

Could you draw out your Kinesis pipeline using ASCII art?

I haven’t quite figured out what to do with bad rows coming from the enriched stream (we’re not in production yet) but for debugging I’m using one of the many “Kinesis tail” utilities to watch the bad event stream and dump them to console.

sorry, my bad. I was not converting the dstream array to string properly and was getting this issue.

It is fixed now.

Thanks

Great, thanks for letting us know @PuneetBabbar!