Is it possible to load data to Redshift after StreamEnricher?

I have such layout: JS Tracker -> ScalaStreamCollector -> Kinesis -> StreamEnricher -> Kinesis -> S3

So I want to promote data to Redshift, but RDBLoader are used only in EtlLoader pipeline. I can import data to Redshift from Kinesis via Kinesis Firehose, but in case of custom context I have no ability to parse JSON data and load it to appropriate tables. Is it possible somehow load data to redshift with current setup without implementing parsing of custom context data? May be it is possible run RDB loader at the end of this pipeline?

yes, you can import the data in s3 via the emr-etl-runner it starts an EMR-Cluster and after shredding and enriching it will load the data into redshift

I suppose, it does not work like this. EMR-Cluster need data in raw format, so it should be save on S3 before enricher. Enriched data does not recognized by EMR-Cluster in my case.
in case it is possible to process enriched TSV files with EMR-cluster, please share with setting and parameters for running EMR cluster in such setup.

@sphinks, it does work. It can be done in two ways:

  • sink raw data and run a full batch pipeline (older approach)
  • sink enriched data and run batch in stream mode (current approach)

See the diagrams in How to setup a Lambda architecture for Snowplow.

@ihor thanks for pointing me to stream mode. I have settle it up, but still have some issues:

  1. Here https://snowplowanalytics.com/blog/2018/04/03/snowplow-r102-afontova-gora-with-emretlrunner-improvements/#kinesis-enrich I found instruction that " aws.s3.buckets.enriched.stream property to your config.yml file. This should point to the bucket where you have configured Snowplow S3 Loader) to write enriched events." But I’m not using Snowplow S3 Loader, enriched events are coming to S3 after Enricher and Kinesis. Is it critical to use Snowplow S3 loader?
  2. I have output folder for enriched events (that are coming there in TSV format) s3://some_prefix/enriched/good. As far as I understand this folder also should be an input folder for RDB Loader, so aws.s3.buckets.enriched.stream should be equal to s3://some_prefix/enriched/good? I have done it this way and after running ./snowplow-emr-etl-runner run -c config/config.yml -r config/iglu_resolver.json but I get an error:
    uri:classloader:/gems/avro-1.8.1/lib/avro/schema.rb:350: warning: constant ::Fixnum is deprecated D, [2018-09-10T22:40:39.683224 #8174] DEBUG -- : Initializing EMR jobflow There seems to be an ongoing run of EmrEtlRunner: Cannot safely add stream staging step to jobflow, s3://some_prefix/enriched/good/ is not empty. What is wrong here?

@sphinks,

Is it critical to use Snowplow S3 loader?

Yes, it ensures the streamed data sinked to S3 is in the (thrift) format suitable for batch processing. Do ensure to state that format in the configuration file for the batch pipeline:

collectors:
  format: "thrift"

I have output folder for enriched events (that are coming there in TSV format) s3://some_prefix/enriched/good . As far as I understand this folder also should be an input folder for RDB Loader, so aws.s3.buckets.enriched.stream should be equal to s3://some_prefix/enriched/good ?

No, s3://some_prefix/enriched/good serves as “processing” bucket for standard mode while “stream” bucket is like “in” bucket for the standard mode. You can review the data workflow in this diagram: Batch pipeline steps · snowplow/snowplow Wiki · GitHub.

Hi @sphinks,

To clarify the above - for loading data from “raw” (the older process) you would need “thrift” encoded data. For loading data from the “enriched” stream we use “gzip” encoding instead.

So if you wanted to load “enriched” data from Kinesis into Redshift you would need to pull all of that data down from Kinesis into S3 using the Snowplow S3 Loader and using the “gzip” encoding for this application.

@josh @ihor , thanks for your help! I’m still have issues: with S3 Loader at current moment. I have such config:

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "hardcoded_key"
  secretKey = "hardcoded_secret_key"
}

# Config for NSQ
nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  channelName = "{{nsqSourceChannelName}}"
    
  # Host name for NSQ tools
  host = "{{nsqHost}}"

  # HTTP port for nsqd
  port = 80

  # HTTP port for nsqlookupd
  lookupPort = 80
}

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = "TRIM_HORIZON"

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  initialTimestamp = "2018-08-01T10:00:00Z"

  # Maximum number of records to read per GetRecords call     
  maxRecords = 200

  region = "us-east-1"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "s3-stage"

  ## Optional endpoint url configuration to override aws kinesis endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

streams {
  # Input stream name
  inStreamName = "snowplow-events-stage-enriched-good"

  # Stream for events for which the storage process fails
  outStreamName = "snowplow-events-stage-enriched-bad-s3"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 45000 # Not supported by NSQ; will be ignored
    recordLimit = 200
    timeLimit = 10000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "us-east-1"
  bucket = "bucket"
  # optional directory structure to use while storing data on s3, you can place date format strings which will be
  # replaced with date time values
  # eg: directoryPattern = "enriched/good/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"
  directoryPattern = "enriched/stream/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 120000

  ## Optional endpoint url configuration to override aws s3 endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

# Optional section for tracking endpoints
monitoring {
  snowplow{
    collectorUri = "host_of_collector.com"
    collectorPort = 443
    appId = "snowplow"
    method = "get"
  }
}

But I get nothing in my S3 bucket. I’m running s3-loader locally and get such output:

java -jar snowplow-s3-loader-0.6.0.jar --config ./examples/config.hocon.sample
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream snowplow-events-stage-enriched-bad-s3 exists and is active
[main] INFO com.snowplowanalytics.s3.loader.SinkApp$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-east-1, s3Endpoint=https://s3.amazonaws.com, kinesisInputStream=snowplow-events-stage-enriched-good, maxRecords=200, connectorDestination=s3, bufferMillisecondsLimit=10000, bufferRecordCountLimit=200, s3Bucket=plowbird-test, kinesisEndpoint=https://kinesis.us-east-1.amazonaws.com, appName=s3-stage, bufferByteSizeLimit=45000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[INFO] [09/11/2018 19:34:22.670] [snowplow-scala-tracker-akka.actor.default-dispatcher-2] [akka://snowplow-scala-tracker/system/IO-TCP/selectors/$a/1] Message [akka.io.SelectionHandler$ChannelReadable$] from Actor[akka://snowplow-scala-tracker/deadLetters] to Actor[akka://snowplow-scala-tracker/system/IO-TCP/selectors/$a/1#2074028712] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

As @ihor mentioned I need to put:

     collectors:
        format: "thrift"

But I could not find where should placed this setting, however as far as I understand in case of using Kinesis, messages are transfered in thrift format. Once again my layout in details:

Js Tracker -> ScalaStreamCollector -> Kinesis ( [snowplow-events-stage]) -> StreamEnricher -> Kinesis( [snowplow-events-stage-enriched-good]) -> S3 Loader
I also attached to stream snowplow-events-stage-enriched-good Kinesis Firehose, that is saving data as TSV to s3 and I clearly could see incoming events. So events are coming, but for some reasons they are only saved by Kinesis Firehose to s3, but not by S3 Loader. I also connected with Kinesis analytics tools to both streams snowplow-events-stag, snowplow-events-stage-enriched-good and could see events are passing. In the stream snowplow-events-stage-enriched-bad-s3 I can find anything also.

The only issue that I can see: Kinesis analytics tool show that events after snowplow-events-stage-enriched-good are going as plain text (TSV), so seems like it is not thrift format. Am I correct? Where should I correct setting for enabling thrift format? Any other issues in my setup?

@sphinks, in your hocon file the s3.bucket property has the value “bucket”. That actually should be the bucket where the data would be sinked to. Is that a valid bucket name? Is that the bucket you were checking for enriched files?

The collectors:format: "thrift" refers to config.yml of the batch pipeline to be used to load the data into Redshift in the older approach - raw events. You can ignore that if running batch in Stream mode, as per clarification provided by @josh.

@ihor I just replaced real bucket name with such name before posting, but yes, it is a real bucket and I’m checking it for path that I have configured in directoryPath, but there is no folders created according that pattern inside bucket.

@ihor @josh seems like I managed to run S3 loader on AWS server. But I get a problem with directoryPattern (directoryPattern = “enriched/stream/run_yr={YYYY}/run_mo={MM}/run_dy={DD}”). It does not work, all data it dumping directly in bucket root. In case, I add path to bucket param, it creates directories, but directoryPattern still is omited. Any suggestions why it does no work?

1 Like