Need enriched raw data in JSON

Hi again folks,

I have setup snowplow stack on my ubuntu machine and working fine till enrichment. My query is I need enriched data in JSON format but I am getting raw data after enrichment. Pls suggest how to do this thing.

My current stack is something like this shown below: →

tracker → collector → kafka(good and bad topic) → enricher(using resolver config) → kafka (enriched_data and badenriched_data)

My tracker code for firing event is:

window.snowplow_name_here(‘trackUnstructEvent’,
{
schema: “iglu:com.company/selfdesc/jsonschema/1-0-0”,
data: {
examplePropertyOne: “Hello shakti”,
examplePropertyTwo: “World”,
examplePropertyThree: 100.00
}
},
[{
schema: “iglu:com.company/customevent/jsonschema/1-0-0”,
data: {
userBirthday: “2016-01-01T00:00:00Z”,
travId: “abc123”,
isAwesome: true,
twitterHandle: “@travisdevitt”,
firstName: “Travis”,
lastName: “Devitt”
}
}]);

Below are my two schemas-
(1)schemas/com.company/selfdesc/jsonschema/1-0-0
{
“$schema”: “http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#”,
“description”: “Schema for an example event”,
“self”: {
“vendor”: “com.company”,
“name”: “selfdesc”,
“format”: “jsonschema”,
“version”: “1-0-0”
},

"type": "object",
"properties": {
    "examplePropertyOne": {
        "description": "Just a text property",
        "type": ["string","null"]
    },
    "examplePropertyTwo": {
        "description": "Just a second text property",
        "type": ["string","null"]
    },
    "examplePropertyThree": {
        "description": "Just some third property that happens to be numeric",
        "type": ["number","null"]
    }
},
"additionalProperties": false

}

(2) {
“$schema”: “http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#”,
“description”: “Schema for an example custom context”,
“self”: {
“vendor”: “com.company”,
“name”: “customevent”,
“format”: “jsonschema”,
“version”: “1-0-0”
},

"type": "object",
"properties": {
    "userBirthday": {
        "description": "Birthday input by the user",
        "type": ["string","null"],
        "format": "date-time"
    },
    "travId": {
        "description": "Unique ID of the user assigned by Travis",
        "type": ["string","null"],
        "maxLength": 1024
    },
    "isAwesome": {
        "description": "Is the user awesome?",
        "type": ["boolean","null"]
    },
    "twitterHandle": {
        "description": "Twitter handle of the user",
        "type": ["string","null"],
        "maxLength": 50
    },
    "firstName": {
        "description": "First name of the user",
        "type": ["string","null"],
        "maxLength": 200
    },
    "lastName": {
        "description": "Last name of the user",
        "type": ["string","null"],
        "maxLength": 200
    }
},
"required": ["travId"],
"additionalProperties": false

}
Whlle enrichment is am using resolver file contains schema like -

{
“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1”,
“data”: {
“cacheSize”: 500,
“repositories”: [
{
“name”: “Iglu Central”,
“priority”: 0,
“vendorPrefixes”: [ “com.snowplowanalytics” ],
“connection”: {
“http”: {
“uri”: “http://iglucentral.com
}
}
},
{
“name”: “Company’s Iglu Repo”,
“priority”: 1,
“vendorPrefixes”: [ “com.company” ],
“connection”: {
“http”: {
“uri”: “http://company.com/
}
}
}
]
}
}

While enrichment I am getting enriched log in kafka like-
app1 mob 2017-05-23 07:49:27.978 2017-05-23 07:49:27.956 2017-05-23 07:49:27.923 unstruct b7fd3e48-468f-43a5-b0ec-39871b94efbe cf js-2.6.2 ssc-0.9.0-kafka kinesis-0.10.0-common-0.24.0 127.0.0.1 404893028 cb3de8ea-e92d-428b-91a0-b608c8798f7a 11 b47f749e-77fe-4c65-9b68-36016be7f356 http://localhost/snowplownew.html http localhost80 /snowplownew.html {“schema”:“iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0”,“data”:[{“schema”:“iglu:com.company/customevent/jsonschema/1-0-0”,“data”:{“userBirthday”:“2016-01-01T00:00:00Z”,“travId”:“abc123”,“isAwesome”:true,“twitterHandle”:“@travisdevitt”,“firstName”:“Travis”,“lastName”:“Devitt”}}]} {“schema”:“iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0”,“data”:{“schema”:“iglu:com.company/selfdesc/jsonschema/1-0-0”,“data”:{“examplePropertyOne”:“Hello shakti”,“examplePropertyTwo”:“World”,“examplePropertyThree”:100}}} Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0 en-US 124 1366 621 Asia/Kolkata 1366 768 UTF-8 1366 621 2017-05-23 07:49:27.927 e276b0c3-bca2-418d-bbfc-24e5aea02b4a 2017-05-23 07:49:27.952 com.company selfdesc jsonschema 1-0-0

But I want this data in JSON format as default snowplow data are not having any key to describe what the value is for.
Pls help me here.

I think your question might be as to why the event in the enriched stream isn’t a JSON object?

Items that go into the enriched stream (or topic) are in TSV (tab delimited) format - if it’s something you’d like to convert to JSON you can use something like the Scala Analytics SDK which has a JSON Event Transformer or alternately use something like a Lambda function to convert the TSV to JSON.

Hi @shaktigupta200,

The easiest way is to pickup records, split them by tab (this are your values) and zip them with list of keys (you can find them here for example). I have been doing similar apporach with Python and Ruby.

Note extra throughput load for JSON against TSV.

Hi @shaktigupta200,

If you want to use an SDK, like @mike suggested, you can find them here.

We have an SDK for Scala (Github repo) and for Python (Github repo).

The solution @grzegorzewald offered can also work, of course.

1 Like

Thanks @mike , @grzegorzewald and @leon for the quick support. I am working on implementing Scala Analytics SDK to convert enriched data consumed from kafka to JSON.

Hi all,
@leon , @grzegorzewald , @mike
Is it possible to use Scala Analytics SDK using kafka as source of enriched data as I am not using Kinesis or S3?

What I need to do is like

kafka <---- Consume from Kafka----- Scala Analytics SDK(to convert to JSON) ----- publish again to Kafka -----> Kafka

Kindly suggest to do this.

Both the Scala and Python Analytics SDKs don’t force you to read from a certain datasource - so it’s certainly possible to do combine the Scala Analytics SDK with the Spark Streaming Kafka integration for example to achieve that transformation and output.

There is also a flink app created by @goliasz.