SQL Query Enrichment Doesn't Work

Hi guys!

Well, i’m exausted to correlating the existent documentation and another arcticles in snowplow blog about SQL Enrichment Proccess.

First, my problem, after my stack and configuration.

My problem:
Our team decide to use snowplow and iglu local repository. After the installation, we registry one single hell-world (yes, hell-world, not is a typo) event. All connections and endpoint resolution is OK, its possible validate the single event trough kafka queue and stored in S3.

We validate the single event using snowplow-tracking-cli as the traking component, but, when we configure this event to enrich in our postgresql db, doesn work. We follow all steps in Github and in snowplow blog, but, we dont have any success (in blog post we got a little bit confused about the Redshit DDL schema).

Our stack and config:

  1. using snowplow-tracking-cli as event-tracker

./snowplow-tracking-cli --collector snowplow.forno.quintoandar.com.br --appid sasquatch --method GET --protocol https --schema iglu:com.quintoandar/hell_world/jsonschema/1-0-0 --json "{\"hell\":\"04010-200\"}"

  1. snowplow_stream_enrich_kafka_0.21.0
    Config file:
enrich {
  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = SnowplowGoodSink
      raw = ${?ENRICH_STREAMS_IN_RAW}
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = SnowplowGoodEnrichedSink
      enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
      # Stream/topic where the event that failed enrichment will be stored
      bad = SnowplowBadEnrichedSink
      bad = ${?ENRICH_STREAMS_OUT_BAD}
      # Stream/topic where the pii tranformation events will end up
      pii = SnowplowOutEnrichedSinkPii
      pii = ${?ENRICH_STREAMS_OUT_PII}

      # Note: Nsq does not make use of partition key.
      partitionKey = domain_userid
      partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    sourceSink {
      enabled =  kafka
      enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

      initialTimestamp = "{{initialTimestamp}}"
      initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP}

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 3000
        minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
        maxBackoff = 5000
        maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
      }

      # Or Kafka (Comment out for other types)
      brokers = "kb0.forno.ourcompany.com.br:9092,kb1.forno.ourcompany.com.br:9092,kb2.forno.ourcompany.com.br:9092"
      # Number of retries to perform before giving up on sending a record
      retries = 2

    }


    buffer {
      byteLimit = 4500000 # number of bytes until flush buffer.  (@performance optimal)
      byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 10000 # Not supported by Kafka; will be ignored
      recordLimit = 500 #(@performance optimal)
      timeLimit = 250 #miliseconds to flush buffer (@performance optimal)
      timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # Used as the Google PubSub subscription name.
    appName = "snowplow-enrich"
    appName = ${?ENRICH_STREAMS_APP_NAME}
  }


  # Optional section for tracking endpoints
  monitoring {
    snowplow {
      collectorUri = "https://snowplow.forno.ourcompany.com.br"
      collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI}
      collectorPort = 8083
      collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT}
      appId = "snowplow-enricher"
      appId = ${?ENRICH_MONITORING_APP_ID}
      method = GET
      method = ${?ENRICH_MONITORING_METHOD}
    }
  }
}

Json Resolver:

{
   "schema": "iglu:com.ourcompany/resolver-config/jsonschema/1-0-0",
   "data": {
     "cacheSize": 1000,
     "repositories": [
       {
         "name": "ourcompany Schema Registry",
         "priority": 0,
         "vendorPrefixes": [ "com.ourcompany" ],
         "connection": {
           "http": {
             "uri": "https://iglu5a.forno.ourcompany.com.br/api"
           }
         }
       },
      {
        "name": "Iglu Central",
        "priority": 1,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://iglucentral.com"
          }
        }
      }
     ]
   }
 }
  1. JSON SQL Enrich
{
  "schema": "iglu:com.ourcompany/sql_query_enrichment_config/jsonschema/1-0-0",
  "data": {
    "name": "sql_max_enrichment_config",
    "vendor": "com.ourcompany",
    "enabled": true,
    "parameters": {
      "inputs": [
        {
          "placeholder": 1,
          "json": {
            "field": "unstruct_event",
            "schemaCriterion": "iglu:com.ourcompany/hell_world/jsonschema/1-0-0",
            "jsonPath": "$.hell"
          }
        }
      ],
      "database": {
        "mysql": {
          "host": "ourcompany-bi-db-gza.xsdfcgoiueaj.us-east-1.rds.amazonaws.com",
          "port": 5432,
          "sslMode": false,
          "username": "ourcompany",
          "password": "xxxxxx",
          "database": "db"
        }
      },
      "query": {
        "sql": "select max(field) max_filed from dw.dim_property where hell_field = ?;"
      },
      "output": {
        "expectedRows": "AT_MOST_ONE",
        "json": {
          "schema": "iglu:com.ourcompany/max_/jsonschema/1-0-0",
          "describes": "EVERY_ROW",
          "propertyNames": "AS_IS"
        }
      },
      "cache": {
        "size": 3000,
        "ttl": 60
      }
    }
  }
}
  1. Startup parameters

ENTRYPOINT ["java", "-jar", "snowplow-stream-enrich-kafka-0.21.0.jar", "--config", "application.conf", "--resolver", "file:iglu_resolver.json","--enrichments", "file:/opt/snowplow-enricher/com.enrichment.ourcompany"]

  1. Iglu Local Repo Version: iglu_server_0.4.0

Config file:

repo-server {
  interface = "0.0.0.0"
  baseURL = "iglu5a.forno.ourcompany.com.br"
  port = 8089
}

# 'postgres' contains configuration options for the postgre instance the server
# is using
postgres {
  host = "${IGLU_PG_HOSTNAME}"
  port = 5432
  dbname = "${IGLU_PG_DBNAME}"
  username = "${IGLU_PG_USERNAME}"
  password = "${IGLU_PG_PASSWORD}"
  driver = "org.postgresql.Driver"
}

akka {
    actor {
    debug {
      # enable DEBUG logging of all LoggingFSMs for events, transitions and timers
      fsm = on
    }
  }
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  log-config-on-start = on
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}

akka.http {
  server {
    request-timeout = 10 seconds
    remote-address-header = on
    parsing.uri-parsing-mode = relaxed
  }
}

Someone have a hint, about what happen? We really didnt know if enrichment process need a Redshift solution (and if yes, where is located the connection strings) or if configure the json located in --enrichment startup parameter is enough to enrich json events.

Tks!!!

@fbattestin, you haven’t specified what outcome of the ETL is - just that it “doesn work”. However, I do see the reason why. There could be more to it but let’s address the obvious - the SQL enrichment configuration file.

The enrichment will not work because you have not configured it correctly, specifically the below part

As per tutorial, that part should be

"name": "sql_query_enrichment_config",
"vendor": "com.snowplowanalytics.snowplow.enrichments",

I can see that the wiki is missing that part altogether. We will update it in there.

@ihor tks for the quickly help!

I made the suggested changes to sql_max_enrichment_config. I renamed like blog post and replaced the header as is. But it still doesn’t work.

Is our iglu resolver config file is set correctly?

This is our event stored in S3:

{“message”:“sasquatch\tsrv\t2019-08-05 20:01:56.039\t2019-08-05 20:01:50.972\t2019-08-05 20:01:49.920\tunstruct\ta09cb52a-b1d8-4290-bc18-4983dbd7965f\t\t\tgolang-2.1.0\tssc-0.15.0-kafka\tstream-enrich-0.21.0-common-0.37.0\t\t189.112.185.80\t\t\t\t3148744b-9e6c-4fec-8fbd-1c05d0583a45\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t{“data”:{“data”:{“hell”:“04010-200”},“schema”:“iglu:com.ourcompany/hell_world/jsonschema/1-0-0”},“schema”:“iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0”}\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tGo-http-client/1.1\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t2019-08-05 20:01:49.921\t\t\t\t\t2019-08-05 20:01:50.971\tcom.ourcompany\thell_world\tjsonschema\t1-0-0\t\t”,"@version":“1”,"@timestamp":“2019-08-05T20:01:56.149Z”}
`

Tks,

Hi guys!

I still trying enrich a single event, but, still doesn’t work. Someone can give a hint about a simple question:

What is the role of redshift for the data enrichment process?

Am I missing any enrichment engine configuration details? In every document I’ve looked at there is a DDL and JSON Path using igluctl, but, i didn’t see a step to configure a redshift instance.

This is my actual architecture:

@fbattestin, SQL Enrichment and loading shredded data to Redshift are not the same processes and they work independently.

If you want your Snowplow data in Redshift you need to implement Lambda architechture.

Do you have your hell_world context in your event in Elasticsearch?

@ihor tks!!!

The elasticsearch in this architecture is to check and track the events in our pipeline, es and kibana are the components for our data team following pipeline heatlh (kafka queues and missing json schemas for a specified event), they don’t play a crucial role in the collector or enrich the process.

My concern is with events that require SQL or API enrichment, in all tutorials or articles that refer to the DDL and json generation paths. I just want to make sure that I am on the right track about configuring the enrichment process, for example:

1) events sent by the collector to the enricher are certified if they have schemas in the iglu repository
2) consequently if they have enrichment for any of the json schema fields
3) json schemas are entered for these enrichment (return fields from a query in this case)
4) this result is appended to derivated_contexts field.

Am I right?
If nothing is missing, is there a way to track logs of this process?

Hi @fbattestin,

You can check bad rows - do you see anything related to the SQL enrichment there?

You can also check the Enrich logs but it will be a lot simpler to find the issue in bad rows.

There are a couple of scenarios in which the enrichment will simply be skipped without emitting a bad row, as per the docs:

Here are some clues on how this enrichment will handle some exceptional cases:

  • if provided JSONPath is invalid - all events attempted to being enriched will be sent to enriched/bad
  • if more than one context (derived or custom) matches schemaCriterion - first one will be picked, no matter if following have higher SchemaVer
  • if input’s value found more than in one sources - last one will be picked, so try to put more precise input last (for example to get longitude/latitude pair use data from IP Lookup enrichment first and GPS-derived longitude/latitude second)
  • if any of input key wasn’t found - SQL query won’t be performed and new context won’t be derived, but event will be processed as usual
  • if DB query wasn’t successful for any reason or timed-out - event will be sent to enriched/bad bucket
  • if DB connection was lost - event will be sent to enriched/bad bucket, but on next event, enrichment will try to reestablish connection
  • all non-primitive values (objects, arrays) and null s will be interpreted as not-found values

So, if what you’re seeing is the events passing validation with no attached derived context, then it’s likely because of one of the following:

  • if any of input key wasn’t found - SQL query won’t be performed and new context won’t be derived, but event will be processed as usual
  • all non-primitive values (objects, arrays) and null s will be interpreted as not-found values

Best,