Confused about Stream Enrich -> S3Loader Step


#1

I have successfully set up ELB -> ASG of Scala Collector -> Kinesis -> ASG of Scala Enricher -> Kinesis

When I start up the s3Loader, I get confusing output, and it does not seem to do anything when sending data through the pipeline. I have ensured access permissions are correct for both dynamo DB, s3 and kinesis.

My s3loader config:

It eventually just times out silently. Output like this:

log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream stream-bad-data-to-s3 exists and is active
[main] INFO com.snowplowanalytics.s3.loader.SinkApp$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-west-2, s3Endpoint=https://s3-us-west-2.amazonaws.com, kinesisInputStream=stream-enricher-to-s3, maxRecords=500, connectorDestination=s3, bufferMillisecondsLimit=15, bufferRecordCountLimit=1, s3Bucket=piv-stream-data-prod-bucket, kinesisEndpoint=https://kinesis.us-west-2.amazonaws.com, appName=piv_sink, bufferByteSizeLimit=8, retryLimit=1, initialPositionInStream=LATEST}
[main] WARN com.snowplowanalytics.s3.loader.KinesisSourceExecutor - idleTimeBetweenReads is greater than bufferTimeMillisecondsLimit. For best results, ensure that bufferTimeMillisecondsLimit is more than or equal to idleTimeBetweenReads
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created

My end goal is to have ASG S3Loader -> S3 -> EMR (shred and send to redshift)

Is this the golden path? Also would appreciate any help figuring out why the S3Loader is silently failing


#2

I also see connection to dynamoDB correctly writing records of shards, but nothing receiving any events we are sending


#3

Never mind, was able to fix this by passing the right app name in the tracker itself.


#4

With the data (LZO) living in that bucket, running my EMR runner:

E, [2018-09-26T21:14:42.121423 #4374] ERROR – : No run folders in [s3://piv-stream-data-prod-bucket/] found

I am running:
[ec2-user@ip-172-31-8-153 ~]$ ./snowplow-emr-etl-runner run -c emr.yaml -r resolver.json -t targets -d -f shred

targets has correct redshift config, and relevant emr:

enriched:
        good: s3://piv-stream-data-prod-bucket/       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://piv-stream-data-prod-bucket-bad/        # e.g. s3://my-out-bucket/enriched/bad
        errors: # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream: s3://piv-stream-data-prod-bucket # stream bucket
      shredded:
        good: s3://piv-prod-shredded-good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://piv-prod-shredded-bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded

There is clearly compressed s3 loaded data in that bucket, what am I doing wrong?


Having trouble with the EMR loader consuming in stream mode
#5

@dbuscaglia, your enriched:stream and enriched:good is the same which is wrong. Since you are running EmrEtlRunner in Stream Enrich mode the enriched:good bucket would be your staging bucket that is the first step is to move files from enriched:stream to enriched:good. Since they are the same it seems to confuse the app. See this diagram to understand the workflow: https://github.com/snowplow/snowplow/wiki/Batch-pipeline-steps#dataflow-diagram-for-stream-enrich-mode.


#6

@ihor I had seen that, and now:

      enriched:
        good: s3://piv-data-out-bucket/good/
        archive: s3//piv-data-archive/enriched
        stream: s3://piv-stream-data-prod-bucket/good/ # stream bucket
      shredded:
        good: s3://piv-data-out-bucket/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://piv-data-out-bucket/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:
        archive: s3://piv-data-archive/shredded

I am still getting “No run folder found.” Even with the data living in the stream location:


#7

I do understand the steps the ETL runner is doing: its moving data from s3 to HDFS which hadoop and spark are pulling from, where it can start to shred and run the RDB loader


#8

@dbuscaglia, by using -f shred you telling EmrEtlRunner to start from step 2 on the diagram I pointed earlier. Thus, essentially you are trying to shred data skipping the staging step. The staging step creates run folder. This explains the error.


#9

@ihor ack, I had read (probably bad R102 documentation) it was necessary to force the skip of the staging and enrichment steps. You have been a tremendous help, it is working now.

I owe you a beer. If you’re ever in SF, holler!


#10

@ihor is there a way to keep the EMR cluster up to avoid the provisioning boostrapping option? So far I only see the running of jobs on a new provision


#11

@dbuscaglia, sure you can have EMR cluster running permanently and add the tasks to be executed on it at whatever pace you require. However, you cannot do that with EmrEtlRunner. To make it work you would have to use Dataflow Runner.


#12

Awesome, that is production ready?


#13

Yes, we use it internally.