Enriched event stream into Redshift using Kinesis Firehose


#1

Hi,

i am trying to push my data through kinesys to redshift, but my database is empty.
Could someone check my configurations? What did i missed?
Btw: my enrich process running without any errors, i can see the traffic in kinesis stream monitoring, and i got correct gzip files in S3 (not from firehouse)

config for snowplow-stream-collector-0.6.0

# Copyright (c) 2013-2014 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following
  # interface and port.
  interface = "172.16.**.**"
  port = 80

  # Production mode disables additional services helpful for configuring and
  # initializing the collector, such as a path '/dump' to view all
  # records stored in the current stream.
  production = true

  # Configure the P3P policy header.
  p3p {
    policyref = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days"
    # Network cookie name
    name = *****
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    domain = "tracker01.****.***"
  }

  # The collector has a configurable sink for storing data in
  # different formats for the enrichment process.
  sink {
    # Sinks currently supported are:
    # 'kinesis' for writing Thrift-serialized records to a Kinesis stream
    # 'stdout' for writing Base64-encoded Thrift-serialized records to stdout
    #    Recommended settings for 'stdout' so each line printed to stdout
    #    is a serialized record are:
    #      1. Setting 'akka.loglevel = OFF' and 'akka.loggers = []'
    #         to disable all logging.
    #      2. Using 'sbt assembly' and 'java -jar ...' to disable
    #         sbt logging.
    enabled = "kinesis"

    kinesis {
      thread-pool-size: 10 # Thread pool size for Kinesis API requests

      # The following are used to authenticate for the Amazon Kinesis sink.
      #
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      #
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      #
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        access-key: "*****************************"
        secret-key: "********************************************"
      }

      # Data will be stored in the following stream.
      stream {
        region: "eu-west-1"
        good: "****good"
        bad: "****bad"
      }

      # Minimum and maximum backoff periods
      backoffPolicy: {
        minBackoff: 1
        maxBackoff: 100
      }

      # Incoming events are stored in a buffer before being sent to Kinesis.
      # The buffer is emptied whenever:
      # - the number of stored records reaches record-limit or
      # - the combined size of the stored records reaches byte-limit or
      # - the time in milliseconds since the buffer was last emptied reaches time-limit
      buffer {
        byte-limit: 1080
        record-limit: 100
        time-limit: 5000
      }
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/2.2.3/general/configuration.html.
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

# spray-can is the server the Stream collector uses and has configurable
# options defined at
# https://github.com/spray/spray/blob/master/spray-can/src/main/resources/reference.conf
spray.can.server {
  # To obtain the hostname in the collector, the 'remote-address' header
  # should be set. By default, this is disabled, and enabling it
  # adds the 'Remote-Address' header to every request automatically.
  remote-address-header = on

  uri-parsing-mode = relaxed
  raw-request-uri-header = on

  # Define the maximum request length (the default is 2048)
  parsing {
    max-uri-length = 32768
  }
}

config for snowplow-stream-enrich-0.7.0

# Default Configuration for Stream Enrich.

enrich {
  # Sources currently supported are:
  # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
  source = "kinesis"

  # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt
  #    logging.
  sink = "kinesis"

  # AWS credentials
  #
  # If both are set to 'default', use the default AWS credentials provider chain.
  #
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  #
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    access-key: "************************"
    secret-key: "*******************************************"
  }

  streams {
    in: {
      raw: "****good"

      # After enrichment, are accumulated in a buffer before being sent to Kinesis.
      # The buffer is emptied whenever:
      # - the number of stored records reaches record-limit or
      # - the combined size of the stored records reaches byte-limit or
      # - the time in milliseconds since it was last emptied exceeds time-limit when
      #   a new event enters the buffer
      buffer: {
        byte-limit: 1080
        record-limit: 100
        time-limit: 5000
      }
    }

    out: {
      enriched: "****enriched"
      bad: "****bad"

      # Minimum and maximum backoff periods
      # - Units: Milliseconds
      backoffPolicy: {
        minBackoff: 1
        maxBackoff: 1000
      }
    }

    # "app-name" is used for a DynamoDB table to maintain stream state.
    # You can set it automatically using: "SnowplowKinesisEnrich-$\\{enrich.streams.in.raw\\}"
    app-name: SnowplowKinesisEnrich-${enrich.streams.in.raw}

    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # Note: This only effects the first run of this application
    # on a stream.
    initial-position = "TRIM_HORIZON"

    region: "eu-west-1"
  }

  # Optional section for tracking endpoints
  monitoring {
    snowplow {
      collector-uri: "tracker01.*****.***"
      collector-port: 80
      app-id: "appname"
      method: "GET"
    }
  }
}


config for snowplow-kinesis-s3-0.4.0
# Default configuration for kinesis-lzo-s3-sink

sink {

  # The following are used to authenticate for the Amazon Kinesis sink.
  #
  # If both are set to 'default', the default provider chain is used
  # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
  #
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  #
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    access-key: "******************"
    secret-key: "*********************************************"
  }

  kinesis {
    in {
      # Kinesis input stream name
      stream-name: "****enrichment"

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # Note: This only affects the first run of this application
      # on a stream.
      initial-position: "TRIM_HORIZON"

      # Maximum number of records to read per GetRecords call
      max-records: 100
    }

    out {
      # Stream for events for which the storage process fails
      stream-name: "****bad"
    }

    region: "eu-west-1"

    # "app-name" is used for a DynamoDB table to maintain stream state.
    # You can set it automatically using: "SnowplowLzoS3Sink-$\\{sink.kinesis.in.stream-name\\}"
    app-name: SnowplowLzoS3Sink-${sink.kinesis.in.stream-name}
  }

  s3 {
    # If using us-east-1, then endpoint should be "http://s3.amazonaws.com".
    # Otherwise "http://s3-<<region>>.s3.amazonaws.com", e.g.
    # http://s3-eu-west-1.amazonaws.com
    region: "eu-west-1"
    bucket: "****-bucket2"

    # Format is one of lzo or gzip
    # Note, that you can use gzip only for enriched data stream.
    format: "gzip"

    # Maximum Timeout that the application is allowed to fail for
    max-timeout: 10
  }

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byte-limit or
  # - the number of stored records exceeds record-limit or
  # - the time in milliseconds since it was last emptied exceeds time-limit
  buffer {
    byte-limit: 1080
    record-limit: 100
    time-limit: 5000
  }

  # Set the Logging Level for the S3 Sink
  # Options: ERROR, WARN, INFO, DEBUG, TRACE
  logging {
    level: "info"
  }

  # Optional section for tracking endpoints
  monitoring {
    snowplow {
      collector-uri: "tracker01.****.***"
      collector-port: 80
      app-id: "appname"
      method: "GET"
    }
  }
}

kinesis streams:
****bad
****good
****enriched
+kinesis firehouse stream to redshift


#2

Hi @balint,

I’m not familiar with Amazon Kinesis Firehose Delivery Stream but a quick look at the docs suggests that you are trying to replace the functionality we already built specifically for loading Snowplow streamed data into Redshift.

Please, refer to this topic explaining how we do it. The flow involves Kinesis S3 sink and the standard batch pipeline. Will it suit your purpose?

Regards,
Ihor


#3

Hi Ihor,

thank you for your answer! I am using the Sink flow only for testing, it is not part of my “production” system.
I think something is wrong with my stream configuration. As far I see, the dataflow stopped between kinesis enrichment stream and firehouse.


#4

@balint,

I see no problem with Snowplow components. You said it yourself Kinesis S3 sinked data to S3 off the enrichment stream. That confirms that everything is fine.

You rather need to check the configuration of Kinesis Firehose, which you are not offering to the community members to judge (if anyone ever tried to utilize this Amazon service with Snowplow). In any case, this is not a Snowplow related issue.

And again, I cannot comment on Kinesis Firehose.

–Ihor


#5

could you please check the stream flow in my configuration once again?
The collector generates data as ****good and ****bad streams. Raw in is same as my ****good stream. Out stream is ****enriched. It is correct?

i think my kinesis configuration is not wrong, because i dont have any data there
2016-05-28 04:49:04.045-0400 ip-172-.eu-west-1.compute.internal (FileTailer[kinesis:yourkinesisstream:/tmp/app.log*].MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.tailing.FileTailer [INFO] FileTailer[kinesis:yourkinesisstream:/tmp/app.log*]: Tailer Progress: Tailer has parsed 0 records (0 bytes), and has successfully sent 0 records to destination.
2016-05-28 04:49:04.048-0400 ip-172-
.eu-west-1.compute.internal (FileTailer[fh:yourdeliverystream:/tmp/app.log*].MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.tailing.FileTailer [INFO] FileTailer[fh:yourdeliverystream:/tmp/app.log*]: Tailer Progress: Tailer has parsed 0 records (0 bytes), and has successfully sent 0 records to destination.
2016-05-28 04:49:04.049-0400 ip-172-31-42-76.eu-west-1.compute.internal (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 0 records parsed (0 bytes), and 0 records sent successfully to destinations. Uptime: 600028ms


#6

Hi @balint,

For clarity, I drew the current configuration you have built:

Stream Collector
	|-> good stream 
	|	|-> Stream Enrich
	|		 |
	|-> bad stream <-|
			 |-> enriched stream

There’s nothing wrong with these configuration.

The only discrepancy I can see is Kinesis S3 sink you also use for testing purpose is consuming from “enrichment” stream and not “enriched” stream. Since you mentioned that the S3 sink does produce the output may I suggest that you misconfigured the names of the streams?


#7

Hi Ihor,

it is not misconfigured, and thank you for your confirmation regarding the stream flow.
If I am checking the monitoring TAB in kinesis stream, i can see the changes on “Total incoming Records” and “Put records…” graphs, so i guess the enrichment works fine.
Furthermore, i can not see any changes on “read throughput” and “get requests” graph. Thats mean, i need to push the stream from kinesis streams (named ****enriched) to kinesis firehose.
My kinesis firehose configuration is also done. I should get files in my intermediate bucket, but it is also empty. Thats mean, the problem must be somewhere between kinesis enriched stream and kinesis firehose.
What i dont understand, how it is possible to push the kinesis enriched stream to firehose, because i can not define any source in firehose.
I tried to push the enriched stream directly to firehose, but its not possible, because i can use only kinesis stream in enrich configuration as out-stream.
I tried to use aws kinesis agent also, but its for files not for streams. I know, its not part of enrichment, but i think it should be a part of snowplow´s storage documentation.
I would highly appreciate any advice.


#8

Hi @balint - I’m afraid that Kinesis Firehose isn’t something we’ve worked with, or plan on supporting with Snowplow, so it’s not something we can really support on. You can find our rationale for not prioritising Firehose in this ticket:

https://github.com/snowplow/snowplow/issues/2165