"Not a file" error from "[enrich] spark: Enrich Raw Events"

Hi everyone,

We’re new to Snowplow and just today set up our first cluster. This is a basic configuration with a CloudFront Collector, a handful of requests to the tracking pixel, basic enrichment with EmrEtlRunner, and no custom shredders or schemas.

EmrEtlRunner ran fine for the first time, but has been failing ever since its second run on the [enrich] spark: Enrich Raw Events step. We’ve found the following error message in the logs:

19/09/04 02:58:09 ERROR FileFormatWriter: Aborting job null.
java.io.IOException: Not a file: hdfs://ip-172-31-54-192.us-west-2.compute.internal:8020/local/snowplow/raw-events/archive/run=2019-09-03-17-44-30
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
     etc...

This is how we run EmrEtlRunner:

./snowplow-emr-etl-runner run \
 -c config.yml \
 -r iglu_resolver.json \
 -d \
 -t targets/ \
 -f enrich

This is the output we’re seeing:

uri:classloader:/gems/avro-1.8.1/lib/avro/schema.rb:350: warning: constant ::Fixnum is deprecated
uri:classloader:/gems/json-schema-2.7.0/lib/json-schema/util/array_set.rb:18: warning: constant ::Fixnum is deprecated
D, [2019-09-03T19:42:46.130888 #83618] DEBUG -- : Initializing EMR jobflow
D, [2019-09-03T19:42:47.788460 #83618] DEBUG -- : EMR jobflow j-K0VX7J06FRQK started, waiting for jobflow to complete...
I, [2019-09-03T19:58:51.315069 #83618]  INFO -- : No RDB Loader logs
F, [2019-09-03T19:58:51.530948 #83618] FATAL -- :
    Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-K0VX7J06FRQK failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
    j-K0VX7J06FRQK: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2019-09-03 19:52:20 -0700 - ]
     - 1. Elasticity Setup Hadoop Debugging: COMPLETED ~ 00:00:28 [2019-09-03 19:52:22 -0700 - 2019-09-03 19:52:51 -0700]
     - 2. [enrich] s3-dist-cp: Raw S3 -> Raw HDFS: COMPLETED ~ 00:03:27 [2019-09-03 19:52:53 -0700 - 2019-09-03 19:56:20 -0700]
     - 3. [enrich] spark: Enrich Raw Events: FAILED ~ 00:01:50 [2019-09-03 19:56:22 -0700 - 2019-09-03 19:58:13 -0700]
     - 4. [cleanup] Empty Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]
      etc...

And this is our configuration file:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
  s3:
    region: us-west-2
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://snowplow-output/log
      encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          - s3://snowplow-collector-logs         # e.g. s3://my-old-collector-bucket
        processing: s3://snowplow-collector-logs/processing
        archive: s3://snowplow-collector-logs/archive    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://snowplow-output/enriched/good      # e.g. s3://my-out-bucket/enriched/good
        bad: s3://snowplow-output/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://snowplow-output/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow-output/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://snowplow-output/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://snowplow-output/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://snowplow-output/shredded/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow-output/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: us-west-2        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: us-west-2a     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id:  # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: snowplow-emr
    security_configuration:  # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 1
      core_instance_type: m1.medium
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
    spark_enrich: 1.17.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  #snowplow:
    #method: get
    #protocol: http
    #port: 80
    #app_id: ADD HERE # e.g. snowplow
    #collector: ADD HERE # e.g. d3rkrsqld9gmqf.cloudfront.net

Any help on next steps for troubleshooting this problem would be extremely helpful. Thank you!

@ondrej, we have stopped using Clodfront collector internally long time ago. Thus, I’m guessing the reason for this failure could be no raw events (collector log files) moved during the 2nd step (s3-dist-cp: Raw S3 -> Raw HDFS), hence the error “Not a file: hdfs://ip-172-31-54-192.us-west-2.compute.internal:8020/local/snowplow/raw-events/archive/run=2019-09-03-17-44-30”.

Your CLI command (and logs) indicates running from enrich step. Why did you skip staging step? The staging step is meant to stage raw files. You need to remove the -f option unless you recover from failed enrich step and you do have raw files in raw:processing bucket (the size of files in s3://snowplow-collector-logs/processing is not 0 bytes).

Hi Ihor, thanks for the reply.

I’m skipping the staging step because without -f enrich, I get the following error: There seems to be an ongoing run of EmrEtlRunner: Cannot safely add staging step to jobflow, s3://snowplow-collector-logs/processing/ is not empty. That error made sense, since previously EmrEtlRunner failed on the enrich step as well. The s3://snowplow-collector-logs/processing folder does have files inside (an archive folder, archive_$folder$ and processing_$folder$ files, and a .gz file with the logs created by CloudFront.

I could start from scratch again (empty all buckets and create a couple new CloudFront logs) to see if the problem reproduces, but before I do, would you recommend anything else? I started with the CloudFront Collector as the simplest collector to get a feel for Snowplow, but it sounds like the Clojure and Scala Stream collectors might be worth moving on to sooner than later.

@ondrej,

I wouldn’t expect the archive folder in the processing bucket. It appears you might have archived raw events into the very same bucket where you stage the events. This forms recursive use of the very same events and the false assumption the (new) files have been staged.

Your raw:in bucket is the same as raw:processing. This is likely to cause such behaviour as per Common configuration · snowplow/snowplow Wiki · GitHub

1 Like

Separating out raw:in and raw:processing into their very own buckets (rather than just their own subfolders) did the trick. Thanks Ihor!