Need help getting events from kinesis to s3 to redshift

I have snowplow configured using the following modules: scala-collector -> scala-kinesis-enrich -> scala-kinesis-s3-sink

My goal is to get the events into redshift. I understand that I need to use storage loader to make this happen but before I can use that the documentation says the events must be “shredded”. Ok cool. The documentation says to use EmrETLRunner to do the shredding but it’s a bit light on how to do that without going through the enrichment process. I’m already enriching the events with the kinesis module so I don’t need Emr to do that, I only need the events to be shredded and moved into another folder for use by the storage-loader. I think I have everything configured to just that using --skip to skip the steps that I don’t need. I’ve watched my Emr cluster spin up and attempt to do the jobs but the Elasticity S3DistCp Step: Enriched S3 -> HDFS step always fails with the following error:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://<ipaddress>:8020/tmp/8cc67ab2-46c5-43de-9d94-0e769e5f5b7a/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

I can see the InvalidInputException which leads me to believe its an issue with my s3 paths. I feel that the fault is mine in not understanding the implementation details of the individual steps so I could use some hand holding there.

I am running Emr with the following command: snowplow-emr-etl-runner -d --config /etc/snowplow/config.yml -r /etc/snowplow/iglu_resolver.json --skip staging,enrich,archive_raw

Are my flags correct for what i’m trying to accomplish?

FYI The kinesis-s3-sink is depositing my enriched events into s3:///events/enriched

Here is my config.yml (with sensitive data omitted):
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: XXXXXXXXXXXXXXXXXxx
secret_access_key: XXXXXXXXXXXXXXXX
s3:
region: us-east-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3:///log
raw:
in: # Multiple in buckets are permitted
- s3:///events/raw # e.g. s3://my-in-bucket
processing: s3:///events/processing
archive: s3:///events/archived/raw # e.g. s3://my-archive-bucket/in
enriched:
good: s3:///events/enriched # e.g. s3://my-out-bucket/enriched/good
bad: s3:///events/bad/enriched # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless continue_on_unexpected_error set to true below
archive: s3:///events/archived/enriched # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3:///events/shredded # e.g. s3://my-out-bucket/shredded/good
bad: s3:///events/bad/shredded # e.g. s3://my-out-bucket/shredded/bad
errors: # Leave blank unless continue_on_unexpected_error set to true below
archive: s3:///events/archived/shredded # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 4.3.0 # Don’t change this
region: us-east-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: us-east-1b # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-e66207cd # Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # Or ‘clj-tomcat’ for the Clojure Collector, or ‘thrift’ for Thrift records, or ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront access logs
enrich:
job_name: Snowplow ETL # Give your job a name
versions:
hadoop_enrich: 1.6.0 # Version of the Hadoop Enrichment process
hadoop_shred: 0.8.0 # Version of the Hadoop Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
continue_on_unexpected_error: false # Set to ‘true’ (and set out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
download:
folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
targets:
- name: "Snowplow Redshift"
type: redshift
host: # The endpoint as shown in the Redshift console
database: snowplow # Name of database
port: 5439 # Default Redshift port
table: atomic.events
username:
password:
maxerror: 1 # Stop loading on first error, or increase to permit more load errors
comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
ssl_mode: disable
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
app_id: dt # e.g. snowplow
collector: # e.g. d3rkrsqld9gmqf.cloudfront.net

Many thanks in advance

Hi @sphoid,

You seem to be confusing two distinct flows (real time & batch). What you are really after looks like this

scala-collector -> Raw Events Stream -> scala-kinesis-s3-sink -> S3 (raw events) -> 
   -> EmrEtlRunner -> S3 (enriched & shredded) -> StorageLoader -> Redshift

I believe the reason for InvalidInputException error is that your source to Kinesis S3 Sink is Enriched Events Stream as opposed to Raw Event Stream.

Hopefully this is clear.

Regards,
Ihor

Makes sense. So what’s the point of the kinesis enricher then? I take it I should just not use it and just use emretlrunner for that.

EDIT: Actually, let me rephrase the question, is there a way to get events into redshift with the realtime flow? I’m guessing not since they have to be batch loaded into redshift.

@sphoid,

You answered the question. I just want to add Redshift as a storage is not appropriate for the real-time events analysis.

Our real-time pipeline is built on lambda architecture. In short, it allows for two flows (real time and batch) to co-exist feeding from the same source. The Scala Kinesis Enrich is engaged in the main (real-time) “branch” of the flow.

If you are not interested in real-time data at all then you picked the wrong flow. You would rather need to build a “pure” batch pipeline which means using either Cloudfront or Clojure collector (as opposed to Stream Collector).

–Ihor

Good info. Thanks a ton.