Kinesis + EMR ETL (R89)

After reading the following:

I believe I’ve followed the instructions correctly, but no logs are ever ending in s3://snowplow-wg/realtime/processing.

Here is more info about my setup:

1. I’m running snowplow-stream-collector-0.9.0 which is configured as per below to output to the kinesis stream “snowplow-collector-good”

collector {
  interface = "0.0.0.0"
  port = 80 
  production = false 

  p3p {
    policyref = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  cookie {
    enabled = true
    expiration = 365 # e.g. "365 days"
    name = "spcol" 
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    # domain = "{{collectorCookieDomain}}"
  }

  sink {
    enabled = "kinesis"

    kinesis {
      thread-pool-size: 10 # Thread pool size for Kinesis API requests
      aws {
        access-key: "iam"
        secret-key: "iam"
      }

      stream {
        region: "ap-southeast-2"
        good: "snowplow-collector-good"
        bad: "snowplow-collector-bad"
      }

      backoffPolicy: {
        minBackoff: 20 
        maxBackoff: 60
      }
    }

    kafka {
      brokers: "{{collectorKafkaBrokers}}"

      topic {
        good: "{{collectorKafkaTopicGoodName}}"
        bad: "{{collectorKafkaTopicBadName}}"
      }
    }

    buffer {
      byte-limit: 4000000 
      record-limit: 500 # Not supported by Kafka; will be ignored
      time-limit: 5000 
    }
  }
}

akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

spray.can.server {
  remote-address-header = on

  uri-parsing-mode = relaxed
  raw-request-uri-header = on

  parsing {
    max-uri-length = 32768
  }
}

2. I’m running snowplow-kinesis-s3-0.5.0 and it is successfully putting (many) log files in s3://snowplow-kinesis-wg

An example file in that S3 bucket is 2017-08-15-49575083451241952265900828384403460541645649702371196930-49575083451241952265900828384479622868281380961104429058.gz

And sink.conf file looks like this (note I’ve tried both lzo and gzip below)

# Default configuration for kinesis-lzo-s3-sink

sink {
AWS_SECRET_ACCESS_KEY
  aws {
    access-key: "iam"
    secret-key: "iam"
  }

  kinesis {
    in {
      # Kinesis input stream name
      stream-name: "snowplow-collector-good"

      initial-position: "TRIM_HORIZON"
      max-records: 10000
    }

    out {
      # Stream for events for which the storage process fails
      stream-name: "snowplow-sink-failed"
    }

    region: "ap-southeast-2"
    app-name: "SnowplowLzoS3Sink-snowplow-enriched-out"
  }

  s3 {
    region: "ap-southeast-2"
    bucket: "snowplow-kinesis-wg"

    # Format is one of lzo or gzip
    # Note, that you can use gzip only for enriched data stream.
    format: "gzip"

    max-timeout: 60000
  }

  buffer {
    byte-limit: 4000000
    record-limit: 500
    time-limit: 5000
  }

  logging {
    level: "DEBUG"
  }

  monitoring {
    snowplow {
        collector-uri: "sp.winning.com.au"
        collector-port: 80
        app-id: "spenrich"
        method: "GET"
    }
  }
}

Both #1 and #2 above run without errors.

3. And when I run snowplow-emr-etl-runner, it runs “successfully” but does nothing (i.e. s3://snowplow-wg/realtime/processing/ is empty):

[root@ip-10-10-22-45 storageloader]# ./snowplow-emr-etl-runner --config config.yml --resolver resolver.json --targets targets/ --enrichments enrichments/

D, [2017-08-15T07:56:52.913000 #26919] DEBUG -- : Staging raw logs...
  moving files from s3://snowplow-kinesis-wg/ to s3://snowplow-wg/realtime/processing/

Here is a snippet of my config used for snowplow-emr-etl-runner (R89 which I used over R90 as it was was very different in architecture):

  s3:
    region: ap-southeast-2
    buckets:
      assets: s3://snowplow-hosted-assets
      jsonpath_assets: s3://snowplow-wg/jsonpaths
      log: s3://snowplow-wg/realtime/log
      raw:
        in:
          - s3://snowplow-kinesis-wg
        processing: s3://snowplow-wg/realtime/processing
        archive: s3://snowplow-wg/realtime/archive/raw
      enriched:
        good: s3://snowplow-wg/realtime/enriched/good
        bad: s3://snowplow-wg/realtime/enriched/bad
        errors:      
        archive: s3://snowplow-wg/realtime/archive/enriched    
      shredded:
        good: s3://snowplow-wg/realtime/shredded/good       
        bad: s3://snowplow-wg/realtime/shredded/bad       
        errors:      
        archive: s3://snowplow-wg/realtime/archive/shredded   

So the main issue here is that the s3://snowplow-wg/realtme/processing folder is empty after #3 runs which makes me think I might have the wrong files in s3://snowplow-kinesis-wg

After reading the two URLs above, I made the kinesis.in.stream-name = “snowplow-collector-good” which is the same as the collector’s output stream which is collector.sink.kinesis.stream.good = “snowplow-collector-good”.

Greatly appreciate any help! I feel I’m really close to getting both batch and real time working in parallel.

Thanks,
Tim

What is the value of the format setting in the collectors block of your config.yaml?

If it is set to clj-tomcat only files matching the tomcat access log pattern (.*localhost_access_log.*.txt) will be moved. Set it to thrift and all files will be included.

1 Like

Thank you!! I will give it a go!

p.s. I had clj-tomcat configured

Hey Adam,

Your fix got the log files copied into the /processing S3 folder but ultimately nothing ended up being put into Redshift so it must not have liked the contents of the log files despite not throwing any errors.

I appreciate the batch and real time pipelines weren’t originally supposed to be interconnected so that’s why it’s a tough one!

I’ll dig through the archive folders and see if it gives me any clues as to where it’s not working as expected.

Will come back with more info when I have it!

Can you share the output of snowplow-emr-etl-runner? Did anything make it into shredded-good?

Hi Adam,

I tried everything from the top and it worked!!

I believe the issue was not waiting long enough for the sink app to write the pageview to the s3 bucket. It was writing a LOT of files and nothing else was hitting that collector except my test page views hence my mistake.

For anyone’s benefit, here is more info:

  1. LZO files copied successfully from the Kinesis collector to the processing folder
  2. EMR cluster ran successfully
  3. The test pageview I did is in shredded/good/run=2017…/atomic-events/part-00007
	web	2017-09-04 07:22:22.829	2017-09-04 07:04:39.360	2017-09-04 07:04:39.337	page_view	8a4ad24a-20b1-4d18-a635-88c27f09ec97		mycljcoll	js-2.5.1	ssc-0.9.0-kinesis	spark-1.9.0-common-0.25.0		125.255.84.146	422957073	a36988b929feafe4	2	3ace9c05-a46e-4994-a219-2df132a5a234	AU	02	Sydney	2000	-33.8591	151.2002	New South Wales					https://crm.winning.com.au/commercial/view?customer_no=C00230664	View customer	https://crm.winning.com.au/commercial/aol	https	crm.winning.com.au	80	/commercial/view	customer_no=C00230664		https	crm.winning.com.au	80	/commercial/aol																																		Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36	Chrome	Chrome	60.0.3112.113	Browser	WEBKIT	en-US	0	1	0	0	0	0	0	0	0	1	24	1666	1169	Mac OS X	Mac OS X	Apple Inc.	Australia/Sydney	Computer	0	2560	1440	UTF-8	1666	1320								Australia/Sydney							8973e6a4-69bd-4605-9dc7-3ef530e1e182	2017-09-04 07:04:39.360	com.snowplowanalytics.snowplow	page_view	jsonschema	1-0-0		

Thank you so much for your help. Between R90 dropping emr-etl-loader and figuring out the real time with R89 I clearly confused myself and made a rookie mistake!

Hi @timgriffinau - glad you got it working! You are right - there is a lot of flux currently with the new Snowplow releases. Unfortunately this is probably the new normal - we are evolving the platform much faster now than we were in 2016…

2 Likes

Hi @alex,

That was definitely not a criticism! The community and RFC process is great - it’s one of the best supported and flexible platforms I’ve worked with.

Keep up the great work.

1 Like

Thanks for the kind words @timgriffinau! Coming up quite soon is a major overhaul of our open-source docs, which are showing their age now… This should help to simplify things for the open-source community.

1 Like