Enriched events are storing in TSV file as one line


#1

Hi everyone!
I’m trying to setup Snowplow with AWS infrastructure and I faced an issue: I get events stored at s3 in TSV files after enrichment in one line. I have configure to performe partioning by event_id, so every file contains events with the same event_id, but all events are stored in one line (no \n between rows). So I wonder why does it happen? Shouldn’t it split events with new line symbol?

I’m going import this data to redshift on next step, but going to make it with Kinesis Firehose without Snowplow RDS Loader, but this issue does not allow to do it.


#2

What’s your current setup and infrastructure with Snowplow?

The enrichment process currently produces one TSV line per event - you shouldn’t be seeing any more than a single event on a line in S3.


#3

There is 2 EC2 instances that are used as collector and enricher and used Kinesis. We are using JS tracker for events.
Configs of collector and enricher are looks like this:

collector {
    # The collector runs as a web service specified on the following interface and port.
    interface = "0.0.0.0"
    port = "8080"

    # Configure the P3P policy header.
    p3p {
      policyRef = "/w3c/p3p.xml"
      CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
    }

    # Cross domain policy configuration.
    # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
    # route.
    crossDomain {
      enabled = true
      # Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
      domain = "*.ourdomain.com"
      # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
      secure = false
    }

    # The collector returns a cookie to clients for user identification
    # with the following domain and expiration.
    cookie {
      enabled = true
      expiration = "365 days" # e.g. "365 days"
      # Network cookie name
      name = "_some_name"
      # The domain is optional and will make the cookie accessible to other
      # applications on the domain. Comment out this line to tie cookies to
      # the collector's full domain
      # domain = "*.ourdomain.com"
    }

    # When enabled and the cookie specified above is missing, performs a redirect to itself to check
    # if third-party cookies are blocked using the specified name. If they are indeed blocked,
    # fallbackNetworkId is used instead of generating a new random one.
    cookieBounce {
      enabled = false
      # The name of the request parameter which will be used on redirects checking that third-party
      # cookies work.
      name = "n3pc"
      # Network user id to fallback to when third-party cookies are blocked.
      fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
      # Optionally, specify the name of the header containing the originating protocol for use in the
      # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
      # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
      forwardedProtocolHeader = "X-Forwarded-Proto"
    }

    # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
    # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
    # specified, the default value is `${SP_NUID}`.
    redirectMacro {
      enabled = false
      # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
      placeholder = "[TOKEN]"
    }

    streams {
      # Events which have successfully been collected will be stored in the good stream/topic
      good = "snowplow-events-stream"

      # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
      bad = "snowplow-events-bad-stream"

      # Whether to use the incoming event's ip as the partition key for the good stream/topic
      # Note: Nsq does not make use of partition key.
      useIpAddressAsPartitionKey = false

      # Enable the chosen sink by uncommenting the appropriate configuration
      sink {
        # Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
        # To use stdout, comment or remove everything in the "collector.streams.sink" section except
        # "enabled" which should be set to "stdout".
        enabled = kinesis

        # Region where the streams are located
        region = "us-east-1"

        # Thread pool size for Kinesis API requests
        threadPoolSize = 10

        # The following are used to authenticate for the Amazon Kinesis sink.
        # If both are set to 'default', the default provider chain is used
        # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
        # If both are set to 'iam', use AWS IAM Roles to provision credentials.
        # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
        aws {
          accessKey = env
          secretKey = env
        }

        # Minimum and maximum backoff periods, in milliseconds
        backoffPolicy {
          minBackoff = 3000
          maxBackoff = 600000
        }

      }

      # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
      # Note: Buffering is not supported by NSQ.
      # The buffer is emptied whenever:
      # - the number of stored records reaches record-limit or
      # - the combined size of the stored records reaches byte-limit or
      # - the time in milliseconds since the buffer was last emptied reaches time-limit
      buffer {
        byteLimit = 4500000
        recordLimit = 1000 # Not supported by Kafka; will be ignored
        timeLimit = 60000
      }
    }
  }

  # Akka has a variety of possible configuration options defined at
  # http://doc.akka.io/docs/akka/current/scala/general/configuration.html
  akka {
    loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
    loggers = ["akka.event.slf4j.Slf4jLogger"]

    # akka-http is the server the Stream collector uses and has configurable options defined at
    # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
    http.server {
      # To obtain the hostname in the collector, the 'remote-address' header
      # should be set. By default, this is disabled, and enabling it
      # adds the 'Remote-Address' header to every request automatically.
      remote-address-header = on

      raw-request-uri-header = on

      # Define the maximum request length (the default is 2048)
      parsing {
        max-uri-length = 32768
        uri-parsing-mode = relaxed
      }
    }
  }

Enricher:

enrich {

      streams {

        in {
          # Stream/topic where the raw events to be enriched are located
          raw = "snowplow-events-stream"
        }

        out {
          # Stream/topic where the events that were successfully enriched will end up
          enriched = "snowplow-events-stream-enriched-good"
          # Stream/topic where the event that failed enrichment will be stored
          bad = "snowplow-events-stream-enriched-bad"

          # How the output stream/topic will be partitioned.
          # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
          # user_ipaddress, domain_sessionid, user_fingerprint.
          # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
          # possible parittion keys correspond to.
          # Otherwise, the partition key will be a random UUID.
          # Note: Nsq does not make use of partition key.
          partitionKey = "event_id"
        }

        # Configuration shown is for Kafka, to use another uncomment the appropriate configuration
        # and comment out the other
        # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
        # "enabled" which should be set to "stdin".
        sourceSink {
          # Sources / sinks currently supported are:
          # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
          # Kinesis stream
          # 'googlepubsub' for reading / writing to a Google PubSub topic
          # 'kafka' for reading / writing to a Kafka topic
          # 'nsq' for reading / writing to a Nsq topic
          # 'stdin' for reading from stdin and writing to stdout and stderr
          type =  kinesis
          enabled =  kinesis

          # Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
          region = "us-east-1"

          # AWS credentials (pertinent to kinesis sink/source type)
          # If both are set to 'default', use the default AWS credentials provider chain.
          # If both are set to 'iam', use AWS IAM Roles to provision credentials.
          # If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
          aws {
            accessKey = iam
            secretKey = iam
          }

          # Maximum number of records to get from Kinesis per call to GetRecords
          maxRecords = 10000

          # LATEST: most recent data.
          # TRIM_HORIZON: oldest available data.
          # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
          # Note: This only effects the first run of this application on a stream.
          # (pertinent to kinesis source type)
          initialPosition = TRIM_HORIZON

          # Need to be specified when initial-position is "AT_TIMESTAMP".
          # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
          # Ex: "2017-05-17T10:00:00Z"
          # Note: Time need to specified in UTC.
          initialTimestamp ="2018-08-01T10:00:00Z"

          # Minimum and maximum backoff periods, in milliseconds
          backoffPolicy {
            minBackoff = 3000
            maxBackoff = 600000
          }

          # Or Google PubSub
          #googleProjectId = my-project-id
          ## Size of the subscriber thread pool
          #threadPoolSize = 4
          ## Minimum, maximum and total backoff periods, in milliseconds
          ## and multiplier between two backoffs
          #backoffPolicy {
          #  minBackoff = {{enrichStreamsOutMinBackoff}}
          #  maxBackoff = {{enrichStreamsOutMaxBackoff}}
          #  totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
          #  multiplier = {{enrichStreamsOutTotalBackoff}}
          #}

          # Or Kafka (Comment out for other types)
          brokers = "{{kafkaBrokers}}"
          # Number of retries to perform before giving up on sending a record
          retries = 0

          # Or NSQ
          ## Channel name for nsq source
          ## If more than one application is reading from the same NSQ topic at the same time,
          ## all of them must have the same channel name
          #rawChannel = "{{nsqSourceChannelName}}"
          ## Host name for nsqd
          #host = "{{nsqHost}}"
          ## TCP port for nsqd, 4150 by default
          #port = {{nsqdPort}}
          ## Host name for lookupd
          #lookupHost = "{{lookupHost}}"
          ## HTTP port for nsqlookupd, 4161 by default
          #lookupPort = {{nsqlookupdPort}}
        }

        # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
        # Note: Buffering is not supported by NSQ.
        # The buffer is emptied whenever:
        # - the number of stored records reaches recordLimit or
        # - the combined size of the stored records reaches byteLimit or
        # - the time in milliseconds since it was last emptied exceeds timeLimit when
        #   a new event enters the buffer
        buffer {
          byteLimit = 45000
          recordLimit = 200 # Not supported by Kafka; will be ignored
          timeLimit = 10000
        }

        # Used for a DynamoDB table to maintain stream state.
        # Used as the Kafka consumer group ID.
        # Used as the Google PubSub subscription name.
        appName = "snowplow-env"
      }
    }

#4

That looks good - you should be getting one event per record in the Kinesis stream and as a result getting one event per line on S3. How are you sinking the data from Kinesis to S3?


#5

Yes, it is writing from Kinesis to S3. Actually, it is working as expected: I have parioning by event_id, so I get one file per one event id, but for some reason in some file there is several events with the same event_id - that is the root cause of the issue. In case there should be only one event per file, then seems like I get a problem with correct partitioning or tracker produce events with the same event_id for some reason. Should I change portioning field? Can I create compose key for partitioning?


#6

I was incorrect: for some reason in one file are going events with the same user_fingerprint and network_userid, but with different event_id. Any idea why it can be?

Am I correct that in any case there should be one event per one file?


#7

One more thing: it is really strange, but I have created Firehose pipeline to move data to Redshift and it fails, as input was as one line with several events. I have to create custom lambda function to preprocess events and in debug I findout that in lambda function events are coming in batch as they are stored in a file, but they can be iterated one by one with no issues. I have add to every event ‘\n’ at the end and return result as base64 encoded (as it expected by Firehose) and it starts to import data to redshift as expected.

I’m really confused, why it does not work before. Should it be always one event per one file or events can be packed in one file, but separator are missing for some reason?


#8

At the moment the supported way to load data into Redshift is via RDB loader. Have you tried using RDB loader instead of Firehose?


#9

No, I haven’t. However, I can’t get an idea if several in one line is expected behavior, or something wrong with my pipeline?


#10

In the current pipeline (when using enrichment and rdb loader / shredder) the expectation is that you will end up with several files that contain the data that is loaded into Redshift. Although it’s possible to end up with files that only contain a single line (at very low volumes) this is not the expectation. The current pipeline doesn’t use Lambda or Firehose so I’d attempt to get the standard real time pipeline up and running before you add in any customisations.