Google Cloud Platform data pipeline optimization


#1

Hello!

I am currently setting up Snowplow on GCP. I got a first simple setup with no enrichments to work, the BigQuery table schema was created automatically and the data ingestion worked fine. Even with the Google Analytics and Page contexts enabled the schema got adjusted accordingly.

But after I enabled below enrichments, the BigQuery connection stopped working and no schemas were created anymore automatically. It also doesn’t work if I manually add the previous schema to the table. I always get the same error message in Dataflow:

java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
“code” : 400,
“errors” : [ {
“domain” : “global”,
“message” : “The destination table has no schema.”,
“reason” : “invalid”
} ],
“message” : “The destination table has no schema.”,
“status” : “INVALID_ARGUMENT”
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:777)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:813)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:122)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:94)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
“code” : 400,
“errors” : [ {
“domain” : “global”,
“message” : “The destination table has no schema.”,
“reason” : “invalid”
} ],
“message” : “The destination table has no schema.”,
“status” : “INVALID_ARGUMENT”
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1067)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$0(BigQueryServicesImpl.java:724)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

The enrichments I enabled are:

  • ua_parser_config
  • referer_parser
  • campaign_attribution
  • anon_ip

@anton, thank you very much for your great work, the rest works absolutely perfectly so far! :wink: Do you have an idea what could cause the automatic schema creation and/or altering to fail?

Best regards,
Ian

p.s.: I’m currently working on some improvements for the GCP setup and will share them once I tested everything.


#2

Hi @ian

Sorry to hear about this issue - it is not something we’ve seen before (at least this particular exception). Can you make sure that Mutator app is working? And what its output? Output can be quite verbose, but I’m interested if it recognizes new schemas and tries to create respective columns.

Also, can you show your configuration? Particularly load section.


#3

Hi @anton,

It works now! I pulled an all-nighter and was super tired, so probably it was just some stupid mistake that caused this issue. However, I’d like to use this thread now to optimize the setup since there are still minor issues.

I really like Simo’s automation approach and followed mostly below tutorial with some adjustments to reduce costs and add enrichments:

As you can see below, I added the enrichment configurations and downsized the Compute Engine instances (I also created a /srv/snowplow directory for everything Snowplow-related):

#! /bin/bash
enrich_version=“0.1.0”
bq_version=“0.1.0”
bucket_name=“BUCKET-NAME”
project_id=“PROJECT-ID”
region=“europe-west1”

sudo apt-get update
sudo apt-get -y install default-jre
sudo apt-get -y install unzip

mkdir /srv/snowplow
cd /srv/snowplow

wget https://dl.bintray.com/snowplow/snowplow-generic/snowplow_beam_enrich_$enrich_version.zip
unzip snowplow_beam_enrich_$enrich_version.zip

wget https://dl.bintray.com/snowplow/snowplow-generic/snowplow_bigquery_loader_$bq_version.zip
unzip snowplow_bigquery_loader_$bq_version.zip

wget https://dl.bintray.com/snowplow/snowplow-generic/snowplow_bigquery_mutator_$bq_version.zip
unzip snowplow_bigquery_mutator_$bq_version.zip

gsutil cp gs://$bucket_name/iglu_resolver.json .
gsutil cp gs://$bucket_name/bigquery_config.json .
gsutil cp -r gs://$bucket_name/enrichments .

./beam-enrich-$enrich_version/bin/beam-enrich --runner=DataFlowRunner --project=$project_id --streaming=true --region=$region --gcpTempLocation=gs://$bucket_name/temp-files --job-name=beam-enrich --raw=projects/$project_id/subscriptions/good-sub --enriched=projects/$project_id/topics/enriched-good --bad=projects/$project_id/topics/enriched-bad --resolver=iglu_resolver.json --workerMachineType=n1-standard-1 --enrichments=enrichments

./snowplow-bigquery-mutator-bq_version/bin/snowplow-bigquery-mutator create --config (cat bigquery_config.json | base64 -w 0) --resolver $(cat iglu_resolver.json | base64 -w 0)

./snowplow-bigquery-mutator-bq_version/bin/snowplow-bigquery-mutator listen --config (cat bigquery_config.json | base64 -w 0) --resolver $(cat iglu_resolver.json | base64 -w 0) &

./snowplow-bigquery-loader-bq_version/bin/snowplow-bigquery-loader --config=(cat bigquery_config.json | base64 -w 0) --resolver=$(cat iglu_resolver.json | base64 -w 0) --runner=DataFlowRunner --project=$project_id --region=$region --gcpTempLocation=gs://$bucket_name/temp-files --workerMachineType=n1-standard-1

My bigquery_config.json looks like this:

{
“schema”: “iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0”,
“data”: {
“name”: “Snowplow Atomic Events Data”,
“id”: “RANDOM-UUID”,
“projectId”: “PROJECT-ID”,
“datasetId”: “snowplow_dataset”,
“tableId”: “all_data”,
“input”: “enriched-good-sub”,
“typesTopic”: “bq-types”,
“typesSubscription”: “bq-types-sub”,
“badRows”: “bq-bad-rows”,
“failedInserts”: “bq-failed-inserts”,
“load”: {
“mode”: “STREAMING_INSERTS”,
“retry”: false
},
“purpose”: “ENRICHED_EVENTS”
}
}

My enrichment configurations contain below contents:

{
“schema”: “iglu:com.snowplowanalytics.snowplow/anon_ip/jsonschema/1-0-0”,
“data”: {
“name”: “anon_ip”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“anonOctets”: 1
}
}
}

{
“schema”: “iglu:com.snowplowanalytics.snowplow/campaign_attribution/jsonschema/1-0-1”,
“data”: {
“name”: “campaign_attribution”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“mapping”: “static”,
“fields”: {
“mktMedium”: [“utm_medium”],
“mktSource”: [“utm_source”],
“mktTerm”: [“utm_term”],
“mktContent”: [“utm_content”],
“mktCampaign”: [“utm_campaign”,“cmp”]
}
}
}
}

{
“schema”: “iglu:com.snowplowanalytics.snowplow/referer_parser/jsonschema/1-0-0”,
“data”: {
“name”: “referer_parser”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“internalDomains”: [
“FQDN-1”,
“FQDN-2”,
“FQDN-3”
]
}
}
}

{
“schema”: “iglu:com.snowplowanalytics.snowplow/ua_parser_config/jsonschema/1-0-0”,
“data”: {
“vendor”: “com.snowplowanalytics.snowplow”,
“name”: “ua_parser_config”,
“enabled”: true,
“parameters”: {}
}
}

Open issues:

  1. The ua_parser_config enrichment fails when I try to configure the database as mentioned here (complains about 2 parameters given while accepting only 0): https://github.com/snowplow/snowplow/wiki/ua-parser-enrichment#example

  2. When I enable the ip_lookups enrichment with below configuration I get an error in Dataflow complaining that http would be an unknown file type (or something like that, I will recreate it and share the exact error message). I guess it is related to the external resource but with Iglu this kind of technical scenario works:

{
“schema”: “iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0”,
“data”: {
“name”: “ip_lookups”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“geo”: {
“database”: “GeoLite2-City.mmdb”,
“uri”: “http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind
}
}
}
}

  1. Can you please explain how to use time-partitioning? The built-in option is to do it by ingestion time but your recommendation is to use derived_tstamp. In order to be able to use derived_tstamp and select the field as the partitioning field I added it to the schema manually (I didn’t add any other fields to the schema). Is that the correct approach? Should all the missing fields then be created automatically by the mutator? Unfortunately, it didn’t work, so I tried to use the built-in ingestion time partitioning but with no luck either. However, this can also be attributed to my all-nighter. :wink:

  2. When updating the BigQuery configuration to a new project, dataset or table, would you recommend generating a new UUID?

Final thoughts:

  1. I support your plan to remove the version numbers from all self-describing schemas.

  2. Have you ever considered trying Google App Engine Flexible Environment instead of Compute Engine instances? Unfortunately, I’m not a JVM expert but this looks to me as though it could work, see running JAR files: https://cloud.google.com/appengine/docs/flexible/java/dev-java-only#appyaml If I find some time over the holidays, I will try to deploy at least the collector to GAE Flex since I generally prefer managed services.

Thanks again for this great piece of software and your work!!! :slight_smile:

Best regards,
Ian


Custom enrichments and data modelling
#4

Hi @anton,

Happy New Year! Did you see my questions? I am especially interested in using time partitioning that you mentioned in your blog post:

Thanks,
Ian


#5

Hey @ian,

Sorry for delay (long holidays break).

  1. It seems there’s a typo in ua_parser_config docs. Only 1-0-1 can be configured with database. I’ll fix that.
  2. I think you should be able to use your own DB with gs:// URI
  3. Unfortunately Mutator cannot create partitioned tables yet - we’ll add this in next version. But right now you create partitioned table manually via BigQuery Console: Create table -> Schema edit as text -> Paste example atomic schema. Partitioning dropdown menu will automatically propose you to choose any datetime columns as partitioning key.
  4. Yes. No dramas if you didn’t - it is not used anywhere right now, but it will be eventually.

Hope that is still helpful.

  1. On explicit version - we’re still collecting feedback from our users. What are your main pain points with explicit versioning? Does COALESCE work for you? Very likely we’ll switch away from it, but no ETA yet.
  2. No, we didn’t explore GAE FE yet, but thanks for the tip!