[SOLVED] - SQL Query Enrichment Doesn't Work

Hi guys!

Well, i’m exausted to correlating the existent documentation and another arcticles in snowplow blog about SQL Enrichment Proccess.

First, my problem, after my stack and configuration.

My problem:
Our team decide to use snowplow and iglu local repository. After the installation, we registry one single hell-world (yes, hell-world, not is a typo) event. All connections and endpoint resolution is OK, its possible validate the single event trough kafka queue and stored in S3.

We validate the single event using snowplow-tracking-cli as the traking component, but, when we configure this event to enrich in our postgresql db, doesn work. We follow all steps in Github and in snowplow blog, but, we dont have any success (in blog post we got a little bit confused about the Redshit DDL schema).

Our stack and config:

  1. using snowplow-tracking-cli as event-tracker

./snowplow-tracking-cli --collector snowplow.forno.quintoandar.com.br --appid sasquatch --method GET --protocol https --schema iglu:com.quintoandar/hell_world/jsonschema/1-0-0 --json "{\"hell\":\"04010-200\"}"

  1. snowplow_stream_enrich_kafka_0.21.0
    Config file:
enrich {
  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = SnowplowGoodSink
      raw = ${?ENRICH_STREAMS_IN_RAW}
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = SnowplowGoodEnrichedSink
      enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
      # Stream/topic where the event that failed enrichment will be stored
      bad = SnowplowBadEnrichedSink
      bad = ${?ENRICH_STREAMS_OUT_BAD}
      # Stream/topic where the pii tranformation events will end up
      pii = SnowplowOutEnrichedSinkPii
      pii = ${?ENRICH_STREAMS_OUT_PII}

      # Note: Nsq does not make use of partition key.
      partitionKey = domain_userid
      partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    sourceSink {
      enabled =  kafka
      enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

      initialTimestamp = "{{initialTimestamp}}"
      initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP}

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 3000
        minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
        maxBackoff = 5000
        maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
      }

      # Or Kafka (Comment out for other types)
      brokers = "kb0.forno.ourcompany.com.br:9092,kb1.forno.ourcompany.com.br:9092,kb2.forno.ourcompany.com.br:9092"
      # Number of retries to perform before giving up on sending a record
      retries = 2

    }


    buffer {
      byteLimit = 4500000 # number of bytes until flush buffer.  (@performance optimal)
      byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 10000 # Not supported by Kafka; will be ignored
      recordLimit = 500 #(@performance optimal)
      timeLimit = 250 #miliseconds to flush buffer (@performance optimal)
      timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # Used as the Google PubSub subscription name.
    appName = "snowplow-enrich"
    appName = ${?ENRICH_STREAMS_APP_NAME}
  }


  # Optional section for tracking endpoints
  monitoring {
    snowplow {
      collectorUri = "https://snowplow.forno.ourcompany.com.br"
      collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI}
      collectorPort = 8083
      collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT}
      appId = "snowplow-enricher"
      appId = ${?ENRICH_MONITORING_APP_ID}
      method = GET
      method = ${?ENRICH_MONITORING_METHOD}
    }
  }
}

Json Resolver:

{
   "schema": "iglu:com.ourcompany/resolver-config/jsonschema/1-0-0",
   "data": {
     "cacheSize": 1000,
     "repositories": [
       {
         "name": "ourcompany Schema Registry",
         "priority": 0,
         "vendorPrefixes": [ "com.ourcompany" ],
         "connection": {
           "http": {
             "uri": "https://iglu5a.forno.ourcompany.com.br/api"
           }
         }
       },
      {
        "name": "Iglu Central",
        "priority": 1,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://iglucentral.com"
          }
        }
      }
     ]
   }
 }
  1. JSON SQL Enrich
{
  "schema": "iglu:com.ourcompany/sql_query_enrichment_config/jsonschema/1-0-0",
  "data": {
    "name": "sql_max_enrichment_config",
    "vendor": "com.ourcompany",
    "enabled": true,
    "parameters": {
      "inputs": [
        {
          "placeholder": 1,
          "json": {
            "field": "unstruct_event",
            "schemaCriterion": "iglu:com.ourcompany/hell_world/jsonschema/1-0-0",
            "jsonPath": "$.hell"
          }
        }
      ],
      "database": {
        "mysql": {
          "host": "ourcompany-bi-db-gza.xsdfcgoiueaj.us-east-1.rds.amazonaws.com",
          "port": 5432,
          "sslMode": false,
          "username": "ourcompany",
          "password": "xxxxxx",
          "database": "db"
        }
      },
      "query": {
        "sql": "select max(field) max_filed from dw.dim_property where hell_field = ?;"
      },
      "output": {
        "expectedRows": "AT_MOST_ONE",
        "json": {
          "schema": "iglu:com.ourcompany/max_/jsonschema/1-0-0",
          "describes": "EVERY_ROW",
          "propertyNames": "AS_IS"
        }
      },
      "cache": {
        "size": 3000,
        "ttl": 60
      }
    }
  }
}
  1. Startup parameters

ENTRYPOINT ["java", "-jar", "snowplow-stream-enrich-kafka-0.21.0.jar", "--config", "application.conf", "--resolver", "file:iglu_resolver.json","--enrichments", "file:/opt/snowplow-enricher/com.enrichment.ourcompany"]

  1. Iglu Local Repo Version: iglu_server_0.4.0

Config file:

repo-server {
  interface = "0.0.0.0"
  baseURL = "iglu5a.forno.ourcompany.com.br"
  port = 8089
}

# 'postgres' contains configuration options for the postgre instance the server
# is using
postgres {
  host = "${IGLU_PG_HOSTNAME}"
  port = 5432
  dbname = "${IGLU_PG_DBNAME}"
  username = "${IGLU_PG_USERNAME}"
  password = "${IGLU_PG_PASSWORD}"
  driver = "org.postgresql.Driver"
}

akka {
    actor {
    debug {
      # enable DEBUG logging of all LoggingFSMs for events, transitions and timers
      fsm = on
    }
  }
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  log-config-on-start = on
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}

akka.http {
  server {
    request-timeout = 10 seconds
    remote-address-header = on
    parsing.uri-parsing-mode = relaxed
  }
}

Someone have a hint, about what happen? We really didnt know if enrichment process need a Redshift solution (and if yes, where is located the connection strings) or if configure the json located in --enrichment startup parameter is enough to enrich json events.

Tks!!!

@fbattestin, you haven’t specified what outcome of the ETL is - just that it “doesn work”. However, I do see the reason why. There could be more to it but let’s address the obvious - the SQL enrichment configuration file.

The enrichment will not work because you have not configured it correctly, specifically the below part

As per tutorial, that part should be

"name": "sql_query_enrichment_config",
"vendor": "com.snowplowanalytics.snowplow.enrichments",

I can see that the wiki is missing that part altogether. We will update it in there.

@ihor tks for the quickly help!

I made the suggested changes to sql_max_enrichment_config. I renamed like blog post and replaced the header as is. But it still doesn’t work.

Is our iglu resolver config file is set correctly?

This is our event stored in S3:

{“message”:“sasquatch\tsrv\t2019-08-05 20:01:56.039\t2019-08-05 20:01:50.972\t2019-08-05 20:01:49.920\tunstruct\ta09cb52a-b1d8-4290-bc18-4983dbd7965f\t\t\tgolang-2.1.0\tssc-0.15.0-kafka\tstream-enrich-0.21.0-common-0.37.0\t\t189.112.185.80\t\t\t\t3148744b-9e6c-4fec-8fbd-1c05d0583a45\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t{“data”:{“data”:{“hell”:“04010-200”},“schema”:“iglu:com.ourcompany/hell_world/jsonschema/1-0-0”},“schema”:“iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0”}\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tGo-http-client/1.1\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t2019-08-05 20:01:49.921\t\t\t\t\t2019-08-05 20:01:50.971\tcom.ourcompany\thell_world\tjsonschema\t1-0-0\t\t”,"@version":“1”,"@timestamp":“2019-08-05T20:01:56.149Z”}
`

Tks,

Hi guys!

I still trying enrich a single event, but, still doesn’t work. Someone can give a hint about a simple question:

What is the role of redshift for the data enrichment process?

Am I missing any enrichment engine configuration details? In every document I’ve looked at there is a DDL and JSON Path using igluctl, but, i didn’t see a step to configure a redshift instance.

This is my actual architecture:

@fbattestin, SQL Enrichment and loading shredded data to Redshift are not the same processes and they work independently.

If you want your Snowplow data in Redshift you need to implement Lambda architechture.

Do you have your hell_world context in your event in Elasticsearch?

@ihor tks!!!

The elasticsearch in this architecture is to check and track the events in our pipeline, es and kibana are the components for our data team following pipeline heatlh (kafka queues and missing json schemas for a specified event), they don’t play a crucial role in the collector or enrich the process.

My concern is with events that require SQL or API enrichment, in all tutorials or articles that refer to the DDL and json generation paths. I just want to make sure that I am on the right track about configuring the enrichment process, for example:

1) events sent by the collector to the enricher are certified if they have schemas in the iglu repository
2) consequently if they have enrichment for any of the json schema fields
3) json schemas are entered for these enrichment (return fields from a query in this case)
4) this result is appended to derivated_contexts field.

Am I right?
If nothing is missing, is there a way to track logs of this process?

Hi @fbattestin,

You can check bad rows - do you see anything related to the SQL enrichment there?

You can also check the Enrich logs but it will be a lot simpler to find the issue in bad rows.

There are a couple of scenarios in which the enrichment will simply be skipped without emitting a bad row, as per the docs:

Here are some clues on how this enrichment will handle some exceptional cases:

  • if provided JSONPath is invalid - all events attempted to being enriched will be sent to enriched/bad
  • if more than one context (derived or custom) matches schemaCriterion - first one will be picked, no matter if following have higher SchemaVer
  • if input’s value found more than in one sources - last one will be picked, so try to put more precise input last (for example to get longitude/latitude pair use data from IP Lookup enrichment first and GPS-derived longitude/latitude second)
  • if any of input key wasn’t found - SQL query won’t be performed and new context won’t be derived, but event will be processed as usual
  • if DB query wasn’t successful for any reason or timed-out - event will be sent to enriched/bad bucket
  • if DB connection was lost - event will be sent to enriched/bad bucket, but on next event, enrichment will try to reestablish connection
  • all non-primitive values (objects, arrays) and null s will be interpreted as not-found values

So, if what you’re seeing is the events passing validation with no attached derived context, then it’s likely because of one of the following:

  • if any of input key wasn’t found - SQL query won’t be performed and new context won’t be derived, but event will be processed as usual
  • all non-primitive values (objects, arrays) and null s will be interpreted as not-found values

Best,

Hi @Colm,

Thank you.
Still doesn’t work.

Q:You can check bad rows - do you see anything related to the SQL enrichment there?
A: Yes, as you can see in print below, all the events are sent to good enriched topic.

I already defined my JSON schema to my Postgres record and strored in my Iglu repository:

curl -X GET "https://iglu5a.forno.ourcompany.com.br/api/schemas/com.ourcompany/max_rent/jsonschema" -H "accept: application/json"

Response body:

{
  "$schema": "https://iglu5a.forno.ourcompany.com.br/api/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
  "description": "Schema for enrichment Max Rent in Zip Code",
  "self": {
    "vendor": "com.ourcompany",
    "name": "max_rent",
    "format": "jsonschema",
    "version": "1-0-0"
  },
  "type": "object",
  "properties": {
    "max_aluguel": {
      "type": "number"
    },
    "bairro": {
      "type": "string"
    },
    "cidade": {
      "type": "string"
    }
  },
  "minProperties": 3,
  "required": [
    "max_aluguel",
    "bairro",
    "cidade"
  ],
  "additionalProperties": false
}

My SQL enricher file are stored in enrichment path "--enrichments", "file:/opt/snowplow-enricher/com.enrichment.quintoandar" in enricher startup parameter.

And, this is my SQL Enricher json file:

{
   "schema":"iglu:com.ourcompany/sql_query_enrichment_config/jsonschema/1-0-0",
   "data":{
      "name": "sql_query_max_enrichment_config",
      "vendor": "com.ourcompany",
      "enabled": true,
      "parameters":{
         "inputs":[
          {
          "placeholder": 1,
          "pojo": {
            "field": "neighborhood_id"
            }
          },
            {
               "placeholder":1,
               "json":{
                  "field": "contexts",
                  "schemaCriterion": "iglu:com.ourcompany/user_neighborhood/jsonschema/1-0-1",
                  "jsonPath": "$.neighborhood_id"
               }
            }
         ],
         "database":{
            "mysql":{
               "host": "ourcompany-bi-ods-ciuoqxapzjot.us-east-1.rds.amazonaws.com",
               "port": 9999,
               "sslMode": false,
               "username": "ourcompany",
               "password": "lkhSdekjrs",
               "database": "osus"
            }
         },
         "query":{
            "sql": "select max(rent) max_rent,neighborhood,city from dw.dim_property where region_id= ? group by neighborhood,city order by max(rent) desc limit 1;"
         },
         "output":{
            "expectedRows": "AT_LEAST_ZERO",
            "json":{
               "schema": "iglu:com.ourcompany/max_rent/jsonschema/1-0-0",
               "describes": "EVERY_ROW",
               "propertyNames": "AS_IS"
            }
         },
         "cache":{
            "size": 3000,
            "ttl": 60
         }
      }
   }
}

And this is my JSON schema that will be enriched:

{
  "$schema": "https://iglu5a.forno.ourcompany.com.br/api/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
  "description": "User neighborhood - just for test proposes",
  "self": {
    "vendor": "com.ourcompany",
    "name": "user_neighborhood",
    "format": "jsonschema",
    "version": "1-0-0"
  },
  "type": "object",
  "properties": {
    "user_neighborhood": {
      "type": [
        "string",
        "null"
      ]
    },
    "neighborhood_id": {
      "type": [
        "integer",
        "null"
      ]
    }
  },
  "required": [
    "user_neighborhood",
    "neighborhood_id"
  ],
  "additionalProperties": true
}

I try to correlate this post and the doc to do that, but …

Tks for all, for help and patience.

@fbattestin, since all events are going to good stream (no bad data),

  • how do the enriched events look like?
  • do you have any other enrichments enabled?
  • have you rebooted your Stream Enrich after adding the enrichment to the pipeline (to clear cache)?

@ihor

this is my event:

CFe23a	
mob	
2019-08-22 15:11:51.827	
2019-08-22 15:11:47.197	
2019-08-22 15:11:46.552	
page_view	
72e29e0d-29da-430a-b8b3-bc0ea1b32549		
cf	
js-2.5.1	
ssc-0.15.0-kafka	
stream-enrich-0.21.0-common-0.37.0		
189.112.185.80	
2885776124	
cc92c9ae426ced2f	
1	
cdf3fa04-2354-43db-92f3-2352812fba2f	
BR	
MG	
Presidente Olegario	
38750	
-18.1619	
-46.4063	
Minas Gerais					
file://file:///home/void/repo/event-test/async-small.html/overridden-url/	async snowplow.js 
event		
file	
file	
80	
///home/void/repo/event-test/async-small.html/overridden-url/																	
{
	"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"
		,"data":
				[
					{
						"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0"
							,"data":
								{
									"id":"183bbf96-0726-4ba2-a6b2-5dbc5a45473c"
								}
					},
					{
						"schema":"iglu:com.google.analytics/cookies/jsonschema/1-0-0"
							,"data":
								{
									
								}
					},
					{
						"schema":"iglu:com.ourcompany/page/jsonschema/1-0-0"
							,"data":
								{
								"pageType":"test"
								,"lastUpdated":"2019-09-01T03:00:00.000Z"
								}
					},
					{
						"schema":"iglu:com.ourcompany/user/jsonschema/1-0-0"
							,"data":
								{
								"userType":"looker"
								}
					},
					{
						"schema":"iglu:com.ourcompany/user_neighborhood/jsonschema/1-0-1"
							,"data":
								{
								"user_neighborhood":"Vila Mariana"
								,"neighborhood_id":51
								}
					},
					{
						"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0"
							,"data":
								{
									"navigationStart":1566486706457
									,"unloadEventStart":1566486706475
									,"unloadEventEnd":1566486706475
									,"redirectStart":0
									,"redirectEnd":0
									,"fetchStart":1566486706459
									,"domainLookupStart":1566486706459
									,"domainLookupEnd":1566486706459
									,"connectStart":1566486706459
									,"connectEnd":1566486706459
									,"secureConnectionStart":0
									,"requestStart":1566486706459
									,"responseStart":1566486706459
									,"responseEnd":1566486706463
									,"domLoading":1566486706488
									,"domInteractive":1566486706508
									,"domContentLoadedEventStart":1566486706509
									,"domContentLoadedEventEnd":1566486706509
									,"domComplete":0
									,"loadEventStart":0
									,"loadEventEnd":0
									,"chromeFirstPaint":1566486706517
								}
					}]
}																									
Mozilla/5.0 (X11; Linux x86_64) 
AppleWebKit/537.36 (KHTML, like Gecko) 
Chrome/76.0.3809.100 Safari/537.36	
Chrome	
Chrome	76.0.3809.100	
Browser
WEBKIT	
en-US	
1	
0	
0	
0	
0	
0	
0	
0	
0	
1	
24	
1920	
1008	
Linux	
Linux	
Other	
America/Sao_Paulo	
Computer	
0	
1920	
1080	
UTF-8	
1920	
1008								
America/Sao_Paulo							
	{
		"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"
			,"data":
			[
				{
					"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-0"
						,"data":
							{
								"deviceBrand":"Unknown"
								,"deviceName":"Linux Desktop"
								,"layoutEngineNameVersion":"Blink 76.0"
								,"operatingSystemNameVersion":"Linux Intel x86_64"
								,"layoutEngineNameVersionMajor":"Blink76"
								,"operatingSystemName":"Linux"
								,"agentVersionMajor":"76"
								,"layoutEngineVersionMajor":"76"
								,"deviceClass":"Desktop"
								,"agentNameVersionMajor":"Chrome 76"
								,"deviceCpuBits":"64"
								,"operatingSystemClass":"Desktop"
								,"layoutEngineName":"Blink"
								,"agentName":"Chrome"
								,"agentVersion":"76.0.3809.100"
								,"layoutEngineClass":"Browser"
								,"agentNameVersion":"Chrome 76.0.3809.100"
								,"operatingSystemVersion":"Intel x86_64"
								,"deviceCpu":"Intel x86_64"
								,"agentClass":"Browser"
								,"layoutEngineVersion":"76.0"
							}
				}
				,{
					"schema":"iglu:org.ietf/http_cookie/jsonschema/1-0-0"
						,"data":
							{
								"name":"sp"
								,"value":"cdf3fa04-2354-43db-92f3-2352812fba2f"
							}
				}
			]
		}	
95e6ad51-c5dd-470d-b09b-21a6efaf10cd	
2019-08-22 15:11:47.197	
com.snowplowanalytics.snowplow	
page_view	
jsonschema	
1-0-0	
61ea5e2571dc60c5c9df50bfb6515463
  • do you have any other enrichments enabled?
    I have only default enrichments enabled, because I don’t understand until now the act (and necessity) of generating jsonpaths and ddl into redshift instance. I really miss the part of redshift.

  • have you rebooted your Enrich Stream after adding the enrichment to the pipeline (to clear cache)?
    yes, i use snowplow stack in k8s. For each update in my stream enrichment configuration i have a new container deployment.

Tks!!!

@fbattestin, you mentioned earlier that you fixed the enrichment configuration files as per my earlier comment. However, your latest SQL enrichment config you showed still depicts the wrong name and vendor. Was it a typo or you have reverted those values? Again, the config file has to have the values as below:

"name": "sql_query_enrichment_config",
"vendor": "com.snowplowanalytics.snowplow.enrichments",

Otherwise, it is equivalent to no having the SQL enrichment enabled.

If it was a typo and you do have correct values, what do you get when running

SELECT max(rent) max_rent,
       neighborhood,
       city
FROM dw.dim_property
WHERE region_id = 51
GROUP BY neighborhood,
         city
ORDER BY max(rent) DESC
LIMIT 1;

against your osus database?

I also noticed you have

{
  "placeholder": 1,
  "pojo": {
    "field": "neighborhood_id"
    }
}

That is the very same placeholder, neighborhood_id, mentioned twice - as POJO and as JSON. There’s no POJO property called neighborhood_id in the events’ TSV. You need to delete that placeholder.

2 Likes

@ihor It worked!

Please don’t be pissed with me! I really enjoy your work and all the tech behind the snowplow!
As I am using a local iglu server, in my concept the vendor name was free as long as the vendor name followed the json schema registration logic.
I really didn’t understand that for SQL-Enriched events it was necessary to set the vendor name to
com.snowplowanalytics.snowplow.enrichments

I think that the same logic should be applied to the other custom enrichments (like API, for example)?

Finally, I am very grateful for your patience (and of everybody).

ps. sorry for my bad english.

1 Like

:partying_face: Great to hear you got it running!

I think that the same logic should be applied to the other custom enrichments (like API, for example)?

Yes, it’s the same for all. This refers to the schema for the configuration file itself - in layman’s terms it tells the pipeline how to interpret the configuration file. They’re all hosted under com.snowplowanalytics.snowplow.enrichments.

1 Like

@Colm really thanks!

Now, it makes totally sense for me. Today i sleep like a baby, no more dynamite on my pillow.