Google Cloud Platform data pipeline optimization

Hello!

I am currently setting up Snowplow on GCP. I got a first simple setup with no enrichments to work, the BigQuery table schema was created automatically and the data ingestion worked fine. Even with the Google Analytics and Page contexts enabled the schema got adjusted accordingly.

But after I enabled below enrichments, the BigQuery connection stopped working and no schemas were created anymore automatically. It also doesn’t work if I manually add the previous schema to the table. I always get the same error message in Dataflow:

java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
“code” : 400,
“errors” : [ {
“domain” : “global”,
“message” : “The destination table has no schema.”,
“reason” : “invalid”
} ],
“message” : “The destination table has no schema.”,
“status” : “INVALID_ARGUMENT”
}
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:777)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:813)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:122)
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:94)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
“code” : 400,
“errors” : [ {
“domain” : “global”,
“message” : “The destination table has no schema.”,
“reason” : “invalid”
} ],
“message” : “The destination table has no schema.”,
“status” : “INVALID_ARGUMENT”
}
com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1067)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$0(BigQueryServicesImpl.java:724)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

The enrichments I enabled are:

  • ua_parser_config
  • referer_parser
  • campaign_attribution
  • anon_ip

@anton, thank you very much for your great work, the rest works absolutely perfectly so far! :wink: Do you have an idea what could cause the automatic schema creation and/or altering to fail?

Best regards,
Ian

p.s.: I’m currently working on some improvements for the GCP setup and will share them once I tested everything.

Hi @ian

Sorry to hear about this issue - it is not something we’ve seen before (at least this particular exception). Can you make sure that Mutator app is working? And what its output? Output can be quite verbose, but I’m interested if it recognizes new schemas and tries to create respective columns.

Also, can you show your configuration? Particularly load section.

Hi @anton,

It works now! I pulled an all-nighter and was super tired, so probably it was just some stupid mistake that caused this issue. However, I’d like to use this thread now to optimize the setup since there are still minor issues.

I really like Simo’s automation approach and followed mostly below tutorial with some adjustments to reduce costs and add enrichments:

As you can see below, I added the enrichment configurations and downsized the Compute Engine instances (I also created a /srv/snowplow directory for everything Snowplow-related):

#! /bin/bash
enrich_version=“0.1.0”
bq_version=“0.1.0”
bucket_name=“BUCKET-NAME”
project_id=“PROJECT-ID”
region=“europe-west1”

sudo apt-get update
sudo apt-get -y install default-jre
sudo apt-get -y install unzip

mkdir /srv/snowplow
cd /srv/snowplow

wget https://dl.bintray.com/snowplow/snowplow-generic/snowplow_beam_enrich_$enrich_version.zip
unzip snowplow_beam_enrich_$enrich_version.zip

wget https://dl.bintray.com/snowplow/snowplow-generic/snowplow_bigquery_loader_$bq_version.zip
unzip snowplow_bigquery_loader_$bq_version.zip

wget https://dl.bintray.com/snowplow/snowplow-generic/snowplow_bigquery_mutator_$bq_version.zip
unzip snowplow_bigquery_mutator_$bq_version.zip

gsutil cp gs://$bucket_name/iglu_resolver.json .
gsutil cp gs://$bucket_name/bigquery_config.json .
gsutil cp -r gs://$bucket_name/enrichments .

./beam-enrich-$enrich_version/bin/beam-enrich --runner=DataFlowRunner --project=$project_id --streaming=true --region=$region --gcpTempLocation=gs://$bucket_name/temp-files --job-name=beam-enrich --raw=projects/$project_id/subscriptions/good-sub --enriched=projects/$project_id/topics/enriched-good --bad=projects/$project_id/topics/enriched-bad --resolver=iglu_resolver.json --workerMachineType=n1-standard-1 --enrichments=enrichments

./snowplow-bigquery-mutator-$bq_version/bin/snowplow-bigquery-mutator create --config $(cat bigquery_config.json | base64 -w 0) --resolver $(cat iglu_resolver.json | base64 -w 0)

./snowplow-bigquery-mutator-$bq_version/bin/snowplow-bigquery-mutator listen --config $(cat bigquery_config.json | base64 -w 0) --resolver $(cat iglu_resolver.json | base64 -w 0) &

./snowplow-bigquery-loader-$bq_version/bin/snowplow-bigquery-loader --config=$(cat bigquery_config.json | base64 -w 0) --resolver=$(cat iglu_resolver.json | base64 -w 0) --runner=DataFlowRunner --project=$project_id --region=$region --gcpTempLocation=gs://$bucket_name/temp-files --workerMachineType=n1-standard-1

My bigquery_config.json looks like this:

{
“schema”: “iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0”,
“data”: {
“name”: “Snowplow Atomic Events Data”,
“id”: “RANDOM-UUID”,
“projectId”: “PROJECT-ID”,
“datasetId”: “snowplow_dataset”,
“tableId”: “all_data”,
“input”: “enriched-good-sub”,
“typesTopic”: “bq-types”,
“typesSubscription”: “bq-types-sub”,
“badRows”: “bq-bad-rows”,
“failedInserts”: “bq-failed-inserts”,
“load”: {
“mode”: “STREAMING_INSERTS”,
“retry”: false
},
“purpose”: “ENRICHED_EVENTS”
}
}

My enrichment configurations contain below contents:

{
“schema”: “iglu:com.snowplowanalytics.snowplow/anon_ip/jsonschema/1-0-0”,
“data”: {
“name”: “anon_ip”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“anonOctets”: 1
}
}
}

{
“schema”: “iglu:com.snowplowanalytics.snowplow/campaign_attribution/jsonschema/1-0-1”,
“data”: {
“name”: “campaign_attribution”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“mapping”: “static”,
“fields”: {
“mktMedium”: [“utm_medium”],
“mktSource”: [“utm_source”],
“mktTerm”: [“utm_term”],
“mktContent”: [“utm_content”],
“mktCampaign”: [“utm_campaign”,“cmp”]
}
}
}
}

{
“schema”: “iglu:com.snowplowanalytics.snowplow/referer_parser/jsonschema/1-0-0”,
“data”: {
“name”: “referer_parser”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“internalDomains”: [
“FQDN-1”,
“FQDN-2”,
“FQDN-3”
]
}
}
}

{
“schema”: “iglu:com.snowplowanalytics.snowplow/ua_parser_config/jsonschema/1-0-0”,
“data”: {
“vendor”: “com.snowplowanalytics.snowplow”,
“name”: “ua_parser_config”,
“enabled”: true,
“parameters”: {}
}
}

Open issues:

  1. The ua_parser_config enrichment fails when I try to configure the database as mentioned here (complains about 2 parameters given while accepting only 0): ua parser enrichment · snowplow/snowplow Wiki · GitHub

  2. When I enable the ip_lookups enrichment with below configuration I get an error in Dataflow complaining that http would be an unknown file type (or something like that, I will recreate it and share the exact error message). I guess it is related to the external resource but with Iglu this kind of technical scenario works:

{
“schema”: “iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0”,
“data”: {
“name”: “ip_lookups”,
“vendor”: “com.snowplowanalytics.snowplow”,
“enabled”: true,
“parameters”: {
“geo”: {
“database”: “GeoLite2-City.mmdb”,
“uri”: “http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind
}
}
}
}

  1. Can you please explain how to use time-partitioning? The built-in option is to do it by ingestion time but your recommendation is to use derived_tstamp. In order to be able to use derived_tstamp and select the field as the partitioning field I added it to the schema manually (I didn’t add any other fields to the schema). Is that the correct approach? Should all the missing fields then be created automatically by the mutator? Unfortunately, it didn’t work, so I tried to use the built-in ingestion time partitioning but with no luck either. However, this can also be attributed to my all-nighter. :wink:

  2. When updating the BigQuery configuration to a new project, dataset or table, would you recommend generating a new UUID?

Final thoughts:

  1. I support your plan to remove the version numbers from all self-describing schemas.

  2. Have you ever considered trying Google App Engine Flexible Environment instead of Compute Engine instances? Unfortunately, I’m not a JVM expert but this looks to me as though it could work, see running JAR files: The Java 8 runtime  |  Google App Engine flexible environment docs  |  Google Cloud If I find some time over the holidays, I will try to deploy at least the collector to GAE Flex since I generally prefer managed services.

Thanks again for this great piece of software and your work!!! :slight_smile:

Best regards,
Ian

2 Likes

Hi @anton,

Happy New Year! Did you see my questions? I am especially interested in using time partitioning that you mentioned in your blog post:

Thanks,
Ian

Hey @ian,

Sorry for delay (long holidays break).

  1. It seems there’s a typo in ua_parser_config docs. Only 1-0-1 can be configured with database. I’ll fix that.
  2. I think you should be able to use your own DB with gs:// URI
  3. Unfortunately Mutator cannot create partitioned tables yet - we’ll add this in next version. But right now you create partitioned table manually via BigQuery Console: Create table -> Schema edit as text -> Paste example atomic schema. Partitioning dropdown menu will automatically propose you to choose any datetime columns as partitioning key.
  4. Yes. No dramas if you didn’t - it is not used anywhere right now, but it will be eventually.

Hope that is still helpful.

  1. On explicit version - we’re still collecting feedback from our users. What are your main pain points with explicit versioning? Does COALESCE work for you? Very likely we’ll switch away from it, but no ETA yet.
  2. No, we didn’t explore GAE FE yet, but thanks for the tip!

Mainly replying here because I want to follow this thread.

And regarding Simo’s guide: I just created some scripts to automate his steps.
Check github here https://github.com/zjuul/snowplow-gcloud-installer
And a blogpost with slightly more infor here: https://stuifbergen.com/2019/01/scripted-snowplow-analytics-on-the-google-cloud-platform/

2 Likes

Hi @anton

I just tried to use this schema this with the bigquery loader 0.4.0. Everything is now going to my failed_inserts. Any ideas how to do this with 0.4.0?

Sam

Hi @sdbeuf,

Sorry I lost the track of this conversation. What schema we’re talking about? If ua_parser_config then it shouldn’t take any effect on BigQuery Loader.

Just before we go further into this, I’d like to double-check couple of things:

  • There’s nothing in enriched bad topic
  • There’s nothing in loader bad topic
  • All data goes into failed inserts topic (which is different from previous two)
  • Last change you did was updating ua_parser_config

If my understanding is correct, could you share on example of a failed insert (you can strip away the payload as it might contain sensetive info).

Hi,

Well the issues I’m having are not documented and while Googling I arrived here. And no, the enricher is perfect, I can follow the data in the dataflow steps and the streamingInsert steps outputs all the events to failed inserts when using the method and the schema that you’ve provided here. When I used the schema made by the bigquery mutator, everything comes in my streaming buffer from bigquery.

Just realised you’re talking about events table schema, not about ua_parser_config. Could you try start the BQ Repeater? It will sink all bad rows to GCS bucket, along with detailed failed message, containing a problematic column name and original payload.

Pay attention that you should be using Loader, Mutator and Repeater with the same version. 0.1.0 and 0.2.0+ have incompatibilities in how they process arrays.

Okay, I’ll take a look at it next week! Thanks for the tip.

I’ve rebooted the pipeline, when I use the table that is created by the mutator (so not time partitioned) everything works fine. When I delete the table and make it like you described in an earlier post everything goes into the failed streaming inserts, so booting the repeater seems rather pointless since everything would need to go through it.