Collector is sending empty raw events


#1

Hey,
Firstly this is my first time attempting to set up Snowplow.

I am running a scala collector which sends custom structured events to kinesis, which the stream enricher needs to validate and send into the enriched kinesis stream. I am not doing any additional enriching, I just want to get the events through the stream enricher so I can load them into Redshift.

I am expecting about 10 events in a clump every 10 seconds.

I have attempted to follow the setup guide a few times, but I am not overly sure about the config file and the resolver file.

I am not getting any errors, just the events are being transferred onto the bad kinesis stream.

config file:

enrich {
  # Sources currently supported are:
  # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'kafka' for reading Thrift-serialized records from a Kafka topic
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
  source = kinesis

  # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'kafka' for writing enriched events to one Kafka topic and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt logging.
  sink = kinesis

  # AWS credentials
  # If both are set to 'default', use the default AWS credentials provider chain.
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    accessKey = default
    secretKey = default
  }

  streams {
    in {
      # Stream/topic where the raw events to be enriched are located
      raw = "raw-stream"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = "enriched-stream"
      # Stream/topic where the event that failed enrichment will be stored
      bad = "bad-stream"

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      partitionKey = event_id
    }

    kinesis {
      # Region where the streams are located
      region = "us-east-1"

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 1000

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      initialPosition = TRIM_HORIZON

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      initialTimestamp = "2017-05-17T10:00:00Z"

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 300
        maxBackoff = 60000
      }
    }

    # Kafka configuration
    kafka {
      brokers = "{{kafkaBrokers}}"

      # Number of retries to perform before giving up on sending a record
      retries = 0
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit = 1000000
      recordLimit = 100 # Not supported by Kafka; will be ignored
      timeLimit = 60000
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
    appName = "app"
  }

  # Optional section for tracking endpoints
  #monitoring {
  #  snowplow {
  #    collectorUri = "{{collectorUri}}"
  #    collectorPort = 80
  #    appId = {{enrichAppName}}
  #    method = GET
  #  }
  #}
}

And the resolver is just the default one I found:

{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
  "data": {
    "cacheSize": 500,
    "repositories": [
      {
        "name": "Iglu Central",
        "priority": 0,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://iglucentral.com"
          }
        }
  }
]

}
}

The output in the terminal running the enricher:

[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Initializing record processor for shard: shardId-000000000000
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Processing 10 records from shardId-000000000000
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 10 records to Kinesis stream bad-stream
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 10 out of 10 records

Any help would be greatly appreciated.

Edit:
After playing around with my tracker I am now getting a error message through the bad stream:

"errors":[{"level":"error","message":"Querystring is empty: no raw event to process"}],"failure_tstamp":"2017-11-22T20:33:48.277Z"}

I am note sure what this actually means


#3

I believe the scala collector isn’t collecting the event data, but it is still sending the event onto the kinesis stream without any data.

I am running the collector in a normal terminal
running:

curl http://localhost:8080/heath

returns ok
I set up a local server ie:
python -m SimpleHTTPServer 8080
to test what is being received by the collector:

Serving HTTP on 0.0.0.0 port 8080 ...
127.0.0.1 - - [23/Nov/2017 11:27:01] "GET /i?lang=en&e=pv&uid=testuser&dtm=1511389621022&p=pc&url=www.example.com&stm=1511389621000&refr=www.referrer.com&tv=py-0.8.0&eid=9111de41-777d-4251-bee9-7ce23891aa5c&tz=Europe%London&page=example HTTP/1.1" 200 -
127.0.0.1 - - [23/Nov/2017 11:27:01] "GET /i?lang=en&e=pp&uid=testuser&dtm=1511389621043&p=pc&url=http%3A%2F%2Fmytesturl%2Ftest2&stm=1511389621000&refr=http%3A%2F%2Fmyreferrer.com&pp_may=500&pp_max=100&tv=py-0.8.0&pp_miy=0&pp_mix=0&tz=urope%London&page=Page+title+2&eid=633d322c-3c9c-4836-8033-21f22bebc04a HTTP/1.1" 200 -

It could be something wrong with my collector config:

collector {
      # The collector runs as a web service specified on the following
      # interface and port.
      interface = "0.0.0.0"
      port = 8080

      # Configure the P3P policy header.
      p3p {
        policyRef = "/w3c/p3p.xml"
        CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
      }

      # The collector returns a cookie to clients for user identification
      # with the following domain and expiration.
      cookie {
        enabled = false
        expiration = "365 days" # e.g. "365 days"
        # Network cookie name
        name = "sp"
        # The domain is optional and will make the cookie accessible to other
        # applications on the domain. Comment out this line to tie cookies to
        # the collector's full domain
        #domain = "{{cookieDomain}}"
      }

      # When enabled and the cookie specified above is missing, performs a redirect to itself to check
      # if third-party cookies are blocked using the specified name. If they are indeed blocked,
      # fallbackNetworkId is used instead of generating a new random one.
      cookieBounce {
        enabled = false
        # The name of the request parameter which will be used on redirects checking that third-party
        # cookies work.
        name = "n3pc"
        # Network user id to fallback to when third-party cookies are blocked.
        fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
      }

      # Sinks currently supported are:
      # 'kinesis' for writing Thrift-serialized records to a Kinesis stream
      # 'kafka' for writing Thrift-serialized records to kafka
      # 'stdout' for writing Base64-encoded Thrift-serialized records to stdout
      #    Recommended settings for 'stdout' so each line printed to stdout
      #    is a serialized record are:
      #      1. Setting 'akka.loglevel = OFF' and 'akka.loggers = []'
      #         to disable all logging.
      #      2. Using 'sbt assembly' and 'java -jar ...' to disable
      #         sbt logging.
      sink = kinesis

      streams {

        # Events which have successfully been collected will be stored in the good stream/topic
        good = "raw-stream"

        # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
        bad = "c-bad-stream"

        # Whether to use the incoming event's ip as the partition key for the good stream/topic
        useIpAddressAsPartitionKey = false

        kinesis {
          # Region where the streams are located
          region = "us-east-1"

          # Thread pool size for Kinesis API requests
          threadPoolSize = 10

          # The following are used to authenticate for the Amazon Kinesis sink.
          # If both are set to 'default', the default provider chain is used
          # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
          # If both are set to 'iam', use AWS IAM Roles to provision credentials.
          # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
          aws {
            accessKey = default
            secretKey = default
          }

          # Minimum and maximum backoff periods
          backoffPolicy {
            minBackoff = 3000 #3 seconds
            maxBackoff = 600000 #5 minutes
          }
        }

        kafka {
          brokers = ""

          # Number of retries to perform before giving up on sending a record
          retries = 0
        }

        # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
        # The buffer is emptied whenever:
        # - the number of stored records reaches record-limit or
        # - the combined size of the stored records reaches byte-limit or
        # - the time in milliseconds since the buffer was last emptied reaches time-limit
        buffer {
          byteLimit = 1000000 #1MB
          recordLimit = 100 # Not supported by Kafka; will be ignored, 500 records
          timeLimit = 60000 #1Min
        }
      }
    }

I don’t understand what is going wrong with the collector.


#4

I believe the issue was trivial:

I added this to the collected and it seems to be working now:

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on

raw-request-uri-header = on

# Define the maximum request length (the default is 2048)
parsing {
  max-uri-length = 32768
  uri-parsing-mode = relaxed
}
  }