Inferring schema from json


#1

Hi, I’m trying to load snowplow data into Spark and build some analytical jobs.

The readme for the Scala SDK suggests to load data like this:

import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer

val events = input
  .map(line => EventTransformer.transform(line))
  .filter(_.isSuccess)
  .flatMap(_.toOption)

val dataframe = ctx.read.json(events)

On first site this works great - the data has been loaded with just a few lines of code, and the schema is known!

However, as far as I can see there are two drawbacks to this approach:

  1. All json needs to be scanned to infer the schema - this is not the most performant
  2. The inferred schema will depend on whatever attributes, contexts etc happen to be present in the dataset

Point 2 becomes a problem if you try to access data from on of the contexts - sometimes the dataset does not contain that context, and therefore the schema is not inferred, and the field is not known, and the Spark job will fail.

Has anyone experienced this same issue, and how did you solve it? My feeling is that the context should be parsed using a known schema somehow.


#2

I ended up encoding the schema as a StructType, including our customized contexts.


#3

I’m just seeing this and I have a few questions:

  • do you have a uniform schema for every row in your dataframe? If not AFAIK, the inferred schema will be the disjunction of all possible schemas in your data which seems to be the best behavior possible
  • did you specify the StructType by hand? This seems a lot of boilerplate and I’m wondering if it’s not coercive/lossy

#4

Hi Ben,

Thanks for your reply. Those are valid points. I must admit I wasn’t entirely sure whether this is the right path, which is why I opened this thread.

The disjunction of all schemas would indeed be preferable (if it could be inferred in a performant manner). However, in practice we process just a daily batch and not the entire history of the events. So it is well possible that the inferred schema would just contain a subset of the fields.

To specify the schema by hand involved a bit of copying, but it’s definitely doable. The Scala SDK already contains the fields in the EventTransformer class which can be reused. It does require some maintenance, however, I reasoned that the same maintenance is needed for your datawarehouse (e.g. adding columns in your database tables) so upgrading Snowplow will require some effort anyhow.

Specifying a schema is definitely coercive, which is kind of the point - in my opinion this is the only way to build a robust pipeline without undefined behaviour downstream. As a concrete example, we have many trackers on different sites and some events contained field ‘categoryid’ and others ‘categoryId’ (capital I), and Spark would happily process this and then blow up when saving to ORC format - which cannot handle two fields that only differ by capitalization. (Obviously this should be fixed upstream in the javascript, but I’d rather have some fields in the data from one site be null, than our entire pipeline for all our sites broken)

In general, I think that Json(lines) is not a great format to have data in, I’d rather have something more structured. I think this is also the direction that Spark is taking, with the Dataframe API, Structured streaming, etc.

As for the schemas of the contexts, I briefly looked into parsing them from the Iglu schemas, but didn’t go so far yet. However, I think that would be nice.

Anyway, your thoughts and those of others are greatly appreciated!


#5

The disjunction of all schemas would indeed be preferable (if it could be inferred in a performant manner). However, in practice we process just a daily batch and not the entire history of the events.

Theorically, it doesn’t add to the complexity of your job since the inference takes O(n) and is done once. However, I do agree that in practice it takes a bit of time.

So it is well possible that the inferred schema would just contain a subset of the fields.

It’s true that the inferred schema would not be invariant to job runs, unfortunately.

As for the schemas of the contexts, I briefly looked into parsing them from the Iglu schemas, but didn’t go so far yet. However, I think that would be nice.

That’d be interesting to provide a way to go from Iglu schema to StructType or even case class.


#6

Hi,

I was working on a project to convert snowplow shredded JSON to Parquet to be able to run some analysis on AWS Athena.
To do that I had to generate some Parquet files with different schema version and I didn’t want to define all of these schema manually.

I found the following library by Zalando which is able to parse a JSON schema and to generate the StructType version of it : https://github.com/zalando-incubator/spark-json-schema

If you use self describing events you’ll have to create the StructType in 3 part, the meta data one, plus the data that you usually describe in your own schema.


#7

Interesting stuff!