Use Lambda for snowplow

In many blogs, i have found suggested to use lambda function to transform TSV to JSON to save to S3 bucket. They have given link to function source code but no clear steps to implement Lambda function in snowplow. Can anyone help me how to implement lambda in snowplow?

1 Like

You would create the Lambda function using AWS Lambda. https://aws.amazon.com/lambda/

However, if your goal is to write to S3, you will probably have more luck using the Snowplow S3 Loader: https://docs.snowplowanalytics.com/docs/setup-snowplow-on-aws/setup-destinations/

For future reference, you might find this an interesting read on consuming the Kinesis stream using an AWS Lambda in realtime: Real-time reporting using AWS Lambda and DynamoDB: a tutorial to compute the number of players in a game level on the Snowplow event stream (1/2)

@PaulBoocock at first, i have used Snowplow S3 Loader but it has done nothing so i was forced to use Aws Data Fireshose. When i try to use Aws Lambda in Data Fireshose, it give error

{"attemptsMade":4,"arrivalTimestamp":1600154159619,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"****","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:snowplow-json-transformer-lambda:$LATEST"}
    {"attemptsMade":4,"arrivalTimestamp":1600154161523,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"*****=","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:snowplow-json-transformer-lambda:$LATEST"}

If i dont use lambda, log is created in S3 Good Enriched Bucket for page view event, but at the same time, log is created in S3 bad Enriched Bucket for the same page view event saying

{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/collector_payload_format_violation/jsonschema/1-0-0","data":{"processor":{"artifact":"snowplow-stream-enrich","version":"1.0.0"},"failure":{"timestamp":"2020-09-15T07:16:02.488Z","loader":"thrift","message":{"error":"error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 1 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}},"payload":"****="}}

I have followed your documentation repeatedly but i am confused in the setup for stream enrich. What i did not understand is that , do we need to setup database for stream enrich if we are not using custom schema ? Because since i am trying to test with Page View event from Javascript Tracker, I have not setup any database, just provided access to create Dyanamo Db because strangly it is calling DyanmoDb even i have not used DyanmoDb prefix in the command. If this will not help to find the issue, i can show you all my configurations.
Please help me to setup snowplow. @PaulBoocock

@PaulBoocock here are my configurations:

collector.conf

collector.conf
collector {
 
  interface = "0.0.0.0"
  interface = ${?COLLECTOR_INTERFACE}
  port = 8181
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}
  }

  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
    domains = [ "*" ]
    domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
    secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
  }

  cookie {
    enabled = true
    enabled = ${?COLLECTOR_COOKIE_ENABLED}
    expiration = "365 days"
    expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
    # Network cookie name
    name = zanui_collector_cookie
    name = ${?COLLECTOR_COOKIE_NAME}
    domains = [
        "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
        "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    ]
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    fallbackDomain = ""
    fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    sameSite = "{{cookieSameSite}}"
    sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  doNotTrackCookie {
    enabled = false
    enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
   # name = {{doNotTrackCookieName}}
    name = zanui-collector-do-not-track-cookie
   # value = {{doNotTrackCookieValue}}
    value = zanui-collector-do-not-track-cookie-value
  }

 
  cookieBounce {
    enabled = false
    enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    name = "n3pc"
    name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    forwardedProtocolHeader = "X-Forwarded-Proto"
    forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }
  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
  redirectMacro {
    enabled = false
    enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }
  rootResponse {
    enabled = false
    enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }
  cors {
    accessControlMaxAge = 5 seconds
    accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  # Configuration of prometheus http metrics
  prometheusMetrics {
    enabled = false
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow-collected-good-events-stream
    good = ${?COLLECTOR_STREAMS_GOOD}

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = snowplow-collected-bad-events-stream
    bad = ${?COLLECTOR_STREAMS_BAD}

    useIpAddressAsPartitionKey = false
    useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    sink {
      enabled = kinesis
      enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}

      # Region where the streams are located
      region = ap-southeast-2
      region = ${?COLLECTOR_STREAMS_SINK_REGION}

      threadPoolSize = 10
      threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
      aws {
        accessKey = env
        accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
        secretKey = env
        secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        #minBackoff = {{minBackoffMillis}}
        minBackoff = 10
        #maxBackoff = {{maxBackoffMillis}}
        maxBackoff = 10      
        }

 }

    buffer {
      byteLimit = 4500000
      byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit =500 # Not supported by Kafka; will be ignored
      recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 5000
      timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
    }
  }

}

akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  http.server {
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }

}

Run Command:
java -Dcom.amazonaws.sdk.disableCbor -jar snowplow-stream-collector-kinesis-1.0.0.jar --config collector.conf

================================
enricher.conf

enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = snowplow-collected-good-events-stream
      raw = ${?ENRICH_STREAMS_IN_RAW}
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = snowplow-collected-good-events-stream
     
      # Stream/topic where the event that failed enrichment will be stored
      bad = snowplow-collected-bad-events-stream
      bad = ${?ENRICH_STREAMS_OUT_BAD}
      # Stream/topic where the pii tranformation events will end up
     # pii = {{outPii}}
     # pii = ${?ENRICH_STREAMS_OUT_PII}

      partitionKey = event_id
      partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    sourceSink {

      enabled =  kinesis
      enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

      region = ap-southeast-2

       aws {
         accessKey = env
         accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY}
         secretKey = env
         secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY}
       }

       maxRecords = 10000

      initialPosition = TRIM_HORIZON
      initialTimestamp = "2020-09-10T10:00:00Z"

      backoffPolicy {
        minBackoff = 1000
        minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
        maxBackoff = 5000
        maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
      }


    }


    buffer {
      byteLimit = 1000000000
      byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 10 # Not supported by Kafka; will be ignored
      recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 5000
      timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
    }

    appName = "zanui-enricher-app"
    appName = ${?ENRICH_STREAMS_APP_NAME}
  }

}

Run Command:
java -jar snowplow-stream-enrich-kinesis-1.0.0.jar --config enricher.conf --resolver file:resolver.json

====================
S3 Loader Config

source = "kinesis"

sink = "kinesis"

aws {
  accessKey = "env"
  secretKey = "env"
}

# Config for NSQ
 nsq {
  channelName = "nsqSourceChannelName"
    
  # Host name for NSQ tools
  host = "127.0.0.1"

  # HTTP port for nsqd
   port = 4150

  # HTTP port for nsqlookupd
   lookupPort = 4161
}

kinesis {

  initialPosition = "TRIM_HORIZON"
  initialTimestamp = "2017-05-17T10:00:00Z"
  maxRecords = 10000
  region = "ap-southeast-2"
  appName = "zanui-enricher-app"
}

streams {
  inStreamName = "snowplow-collected-good-events-stream"
  outStreamName = "snowplow-collected-bad-events-stream"

  buffer {
    byteLimit = 1000000000 # Not supported by NSQ; will be ignored
    recordLimit = 10
    timeLimit = 5000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "ap-southeast-2"
  bucket = "snowplow-enriched-good-events"
  partitionedBucket = "snowplow-enriched-good-events/partitioned"
  dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
  outputDirectory = "zanui-enriched/good"
  filenamePrefix = "zanui-output"
  format = "gzip"
  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 300000 # 5 minutes
}

Run Command:
java -jar snowplow-s3-loader-0.6.0.jar --config my.conf

But this Snowplow S3 Loader not doing anything so i used Data Fireshose to transfer stream to S3 bucket.

@PaulBoocock now can you help me?

You might find this post useful as others have faced similar questions when around Enrich, S3 Loader and DynamoDB: Does Stream Enrich need DynamoDB connection?

Also searching for DynamoDB on this forum might yield you some help.

Unfortunately I’m not really able to help you much further but I this does seem to be an issue with how you have configured things on AWS, perhaps permissions.

@PaulBoocock i have gone through all those post and have given permission to create, write, read to dynamoDB. I think that is not the issue. :frowning: :disappointed_relieved: :cry:

Ok can you make me sure do we need to do database setup if we are not using custom schema? If not where those data are saved?

Yes you do need DynamoDB as Stream Enrich uses the KCL (Kinesis Client Library) under the hood which in turn uses DynamoDB to manage its state.

@PaulBoocock, that was created in dynamodb already. It is working fine.

So whilst you might not be using custom schemas in your pipeline, the pipeline will still leverage schemas that are available on iglucentral.com and it will cache those in the pipeline. So, generally speaking, the idea of a custom schema is the same as the core schemas that are available publically for standard snowplow events, so you’ll need to make sure your pipeline is set up to work with these.

For example, when you track a page view using the JavaScript tracker, then it will use the web_page schema.

so do we need to register to iglucentral.com to setup the pipeline? I have used default resolver.json.

You don’t need to register, all the schemas are public, but you should make sure iglucentral.com is listed in your resolver.json.

{
    "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
    "data": {
      "cacheSize": 500,
      "repositories": [
        {
          "name": "Iglu Central",
          "priority": 0,
          "vendorPrefixes": [ "com.snowplowanalytics" ],
          "connection": {
            "http": {
              "uri": "http://iglucentral.com"
            }
          }
        },
        {
          "name": "Iglu Central - GCP Mirror",
          "priority": 1,
          "vendorPrefixes": [ "com.snowplowanalytics" ],
          "connection": {
            "http": {
              "uri": "http://mirror01.iglucentral.com"
            }
          }
        }
      ]
    }
  }
1 Like

That all looks fine. I can’t be sure what the Firehose error is regarding as I’ve never used that with Snowplow data. However, when I consider that error along with the error in the S3 loader, it does look like something is wrong with the payloads that they are trying to read. I’m not quite sure where the error is unfortunately as I’ve never seen that before.
I don’t see anything that is glaringly wrong with the configurations so my guess is this is something to do with how Kinesis is configured (for raw events) or something to do with the server configurations you are running the JARs on (either the collector or enrich) but if I’m honest, I’m really not sure at this point. You could attempt to set it up with the docker images (available on Docker Hub) instead but I appreciate that is quite a change from the approach you have taken so far.

@PaulBoocock i had setup Snowplow S3 loader but nothing happens when i trigger JavaScript trigger in the console where S3 loader is running. So i had to use aws firehose.
My snowplow structure:

And the configuration of Snowplow S3 loader:

source = "kinesis"
sink = "kinesis"

aws {
  accessKey = "env"
  secretKey = "env"
}


 nsq {
  channelName = "nsqSourceChannelName"
  host = "127.0.0.1"
  port = 4150
  lookupPort = 4161
}

kinesis {
  initialPosition = "TRIM_HORIZON"
  initialTimestamp = "2017-05-17T10:00:00Z"
  maxRecords = 10000
  region = "ap-southeast-2"
  appName = "application-enricher-app"
}

streams {
  
  inStreamName = "snowplow-collected-good-events-stream"
  outStreamName = "snowplow-collected-bad-events-stream"
  buffer {
    byteLimit = 1000000000 # Not supported by NSQ; will be ignored
    recordLimit = 10
    timeLimit = 5000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "ap-southeast-2"
  bucket = "snowplow-enriched-good-events"
  partitionedBucket = "snowplow-enriched-good-events/partitioned"
  dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
  outputDirectory = "application-enriched/good"
  filenamePrefix = "application-output"
  format = "gzip"
  maxTimeout = 300000 # 5 minutes
}