Unable to recover bad events due to a missing schema

We have some bad events due to a missing schema and are not being able to recover.

Our current architecture looks like this:

Scala Collector > Kinesis > S3 Collector > S3 > EmrEtlRunner > Redshift

We tried to recover events using the Hadoop Event Recovery, it took approximately 9 hours to run this job in EMR and move 10 days worth of bad records (~200Gb) to s3/recovered. Files were apparently processed correctly and showed up in s3/recovered.

However when we ran the EmrEtlRunner again with --skip staging and processing= s3/recovered the job failed in the Elasticity S3DistCp Step: Enriched HDFS -> S3 step with the following message:

Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-0-4-101.eu-west-1.compute.internal:8020/tmp/e0bb72ce-4b9a-43d3-b341-19132d6f921c/files

Looks like files were not enriched at all and they were not available for S3DistCp. No bad records or errors in S3.


We also tried to run a python script similar to what is found here:
https://discourse.snowplowanalytics.com/t/python-script-to-reprocess-bad-rows/216/2

By the way this was much faster than the Hadoop Event Recovery (a few minutes vs 9 hours), and processed files also look fine. However when we ran the EmrEtlRunner the job hanged at the Enrich step for 12 hours. EMR monitoring showed 500 containers pending the whole time, looking like the job didn’t progress at all. HDFS usage was also quite low ~3%.

Again no bad records or errors in S3.

We canceled the execution and are kind of out of other alternatives. Would you have any idea of what we should do to process those bad records?

We also tried different instance settings for the EMR cluster.

This is our config file:
```

aws:
  access_key_id: {{ aws_access_key_id }}
  secret_access_key: {{ aws_secret_access_key }}
  s3:
    region:            eu-west-1
    buckets:
      assets:          s3://snowplow-hosted-assets
      jsonpath_assets: s3://{{ environment }}-snowplow/jsonpaths
      log:             s3n://{{ environment }}-snowplow/etl-logs
      encrypted:       false
      raw:
       in:
          - s3a://{ environment }}-snowplow/in
        processing:    s3a://{{ environment }}-snowplow/etl-processing
        archive:       s3a://{{ environment }}-snowplow/archive-raw
      enriched:
        good:          s3a://{{ environment }}-snowplow/data-enriched/good
        bad:           s3a://{{ environment }}-snowplow/data-enriched/bad
        errors:        s3a://{{ environment }}-snowplow/data-enriched/errors
        archive:       s3a://{{ environment }}-snowplow/data-enriched/archive
      shredded:
        good:          s3a://{{ environment }}-snowplow/data-shredded/good
        bad:           s3a://{{ environment }}-snowplow/data-shredded/bad
        errors:        s3a://{{ environment }}-snowplow/data-shredded/errors
        archive:       s3a://{{ environment }}-snowplow/data-shredded/archive
  emr:
    ami_version:       5.9.0
    region:            eu-west-1
    jobflow_role:      role-{{ environment }}-platform-snowplow-ec2
    service_role:      role-{{ environment }}-platform-snowplow-emr
    placement:
    ec2_subnet_id:     {{ ec2_subnet_id }}
    ec2_key_name:      {{ ec2_key_name }}
    bootstrap: []
    software:
      hbase:
      lingual:
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    jobflow:
      job_name:                 Snowplow ETL
      master_instance_type:     m4.large
      core_instance_count:      3
      core_instance_type:       r4.xlarge
      task_instance_count:      0
      task_instance_type:       m4.large
      task_instance_bid:        0.020
      core_instance_ebs:
        volume_size:            200
        volume_type:            "io1"
        volume_iops:            400
        ebs_optimized:          false
    bootstrap_failure_tries:    3
collectors:
  format:                       thrift
enrich:
  versions:
    spark_enrich:               1.16.0
  continue_on_unexpected_error: false
  output_compression:           NONE
storage:
  versions:
    rdb_loader:                 0.14.0
    rdb_shredder:               0.13.1
    hadoop_elasticsearch:       0.1.0 
monitoring:
  tags: {
    "Name": "emr-service-snowplowetl"
  }
  logging:
    level:     INFO
  snowplow:
    method:    get
    app_id:    snowplow
    collector: snplw.co.uk

@Andre_Sionek, The problem seems to be

InvalidInputException: Input path does not exist

You stated “processing= s3/recovered” but your configuration file has “s3a://{{ environment }}-snowplow/etl-processing”. There is a clear mismatch (unless I misunderstood) which results in EmrEtlRunner not being able to find the raw (recovered) files to enrich.

Also, 200 GB sounds like too much to be processed in one go. I would advise to break it up into smaller chunks. Are you sure you recovered only the events you intended to?

Hello @ihor thanks for quick response! I wasn’t very clear, sorry about that.

This is the config for the normal EMR job. To run the recovery, we did change the path to be s3a://{{ environment }}-snowplow/recovered. I do see data being loaded into HDFS in EMR monitoring, when I’m running the first step.

Filtering out the events didn’t work for us in Hadoop Recovery and all files were empty (0 bites). So we created a simple passthrough function to allow recovering all events from bad to recovered.

The job is also failing after enrichment, when copying from HDFS to S3.

The files created by Hadoop Recovery are also not compressed, while the files inside in folder are lzo compressed. When I say we have 200 Gb, it is of uncompressed data. If it was compressed, it would be about 20-30Gb.

@Andre_Sionek, it’s been a very long time I was running Hadoop Recovery as we switched to Stream Enrich a while ago. However, your scenario is the simplest one. I only can guess that your own function didn’t produce the right data.

For Hadoop Recovery, your recovery function is as simple as

function process(event, errors) {
    for (var i = 0; i < errors.length; i++) {
        if (! /Could not find schema with key/.test(errors[i])) {
            return null;
        }
    }
    return event;
}

Your Dataflow Runner playbook would look something like

# Remove all comments before use and validate JSON

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1", # Do not change.
  "data": {
    "region": "<your_region>", # Must be the same region in which you are creating the EMR cluster.
    "credentials": {
      "accessKeyId": "<aws-access-key-id>", # Environment variable or hardcoded value.
      "secretAccessKey": "<aws-secret-access-key>" # Environment variable or hardcoded value.
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "Combine Months",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src",
          "s3n://path/to/enriched/bad/", # Find this in your pipeline repo,
                                         # in snowplow/config.yml, under aws.s3.buckets.enriched.bad.
                                         # Don't forget the trailing slash.
          "--dest",
          "hdfs:///local/monthly/",
          "--groupBy",
          ".*(run)=2020-0[1-6].*", # This will only select bad rows from runs in the period as per RegEx.
          "--targetSize",
          "128",
          "--outputCodec",
          "none"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "Fix up bad rows",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "s3://snowplow-hosted-assets-eu-central-1/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0.jar",
                  # Make sure to change the name of the bucket with the appropriate region.
                  # It must be the same region in which you are creating the cluster.
        "arguments": [
          "com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob",
          "--hdfs",
          "--input",
          "hdfs:///local/monthly/*",
          "--output",
          "hdfs:///local/recovery/",
          "--inputFormat",
          "bad",
          "--script",
          "ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "Back to S3",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src",
          "hdfs:///local/recovery/",
          "--dest",
          "s3n://path/to/recovered/"
        ]
      }
    ],
    "tags": [ # Optional section.
      {
        "key": "job",
        "value": "recovery"
      }
    ]
  }
}