Running Hadoop Event Recovery with Dataflow Runner [tutorial]


#1

Hadoop Event Recovery (documentation) lets you fix up Snowplow bad rows and make them ready for reprocessing, by writing your own custom JavaScript to execute on each bad row.

This tutorial walks you through setting up and running a recovery job in Dataflow Runner. We use one common use case for event recovery as an example: where some of your events failed validation because you forgot to upload a particular schema.

Refresher on Snowplow bad rows

Snowplow bad rows look like this:

{
  "line": "2015-06-09\t09:56:09\tNRT12\t831\t211.14.8.250\tGET...",
  "errors": [{
    "level": "error",
    "message": "Could not find schema with key iglu:com.acme/myevent/jsonschema/1-0-0 in any repository"
  }]
}

This is the structure that Hadoop Event Recovery will be working on.

Writing our function

The Hadoop Event Recovery jar will extract the “line” string (containing the original raw event) and an array of all the error messages which describe why the line failed validation, and pass them to your JavaScript function.

We need to write a JavaScript function called process which accepts two arguments: the raw line string and the error message array. The function should always return either null (signifying that the bad row should be ignored, not recovered) or a string which will be used as the new bad row. Your JavaScript can define other top-level functions besides as process. We also provide several built-in JavaScript functions for you to call - for more detail check out the documentation.

Remember, we want to recover events where the failure message involved a missing schema. The JavaScript function looks like this:

function process(event, errors) {
    for (var i = 0; i < errors.length; i++) {
        if (! /Could not find schema with key/.test(errors[i])) {
            return null;
        }
    }
    return event;
}

We only want to reprocess events which only failed due to the missing schema, so we return null (meaning that we won’t recover the event) if any of the error messages is not a missing schema error. Otherwise we return the original raw line, which should now pass validation because we have uploaded the schema.

(Testing the recovery JS function is tricky, especially if you are using some of our helper functions which will be undefined if you are testing in the browser console. You could try replacing them with some standard JS functions. Ultimately though, the most reliable test is to run the job and see if the function produces the expected result. It might be a good idea to first run the function without returning null values. So instead of return null in the example above, you would have a useful debug message such as return 'Error not found.'.)

Next we use base64encode.org to encode our script:

ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9

Running the Hadoop Event Recovery job

Next up, we need to write our Dataflow Runner job. That has two components:

  • an EMR config file (stub)
  • a playbook (stub).

EMR cluster config file

# Remove all comments before use and validate JSON

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0", # Do not change
  "data": {
    "name": "com.example Recovery", # No underscores ('_') allowed in EMR cluster names
    "logUri": "s3n://logs/", # Find this in your pipeline repo, in snowplow/config.yml, under aws.s3.buckets.log. 
                             # Don't forget the trailing slash.
    "region": "eu-central-1", # Change this to the same region where your main pipeline is. 
                              # Find it in your pipeline repo, in snowplow/config.yml, under aws.s3.region.
    "credentials": {
      "accessKeyId": "{{secret "aws-access-key-id"}}", # Environment variable or hardcoded value.
      "secretAccessKey": "{{secret "aws-secret-access-key"}}" # Environment variable or hardcoded value.
    },
    "roles": {
      "jobflow": "EMR_EC2_DefaultRole",
      "service": "EMR_DefaultRole"
    },
    "ec2": { # Copy the details from the emr section of snowplow/config.yml in your pipeline repo.
      "amiVersion": "4.8.2", 
      "keyName": "snowplow-com-example-key",
      "location": {
        "vpc": {
          "subnetId": "subnet-4d8e7610"
        }
      },
      "instances": {
        "master": {
          "type": "m4.large"
        },
        "core": {
          "type": "m3.xlarge",
          "count": 1
        },
        "task": {
          "type": "m4.large",
          "count": 0,
          "bid": "0.015"
        }
      }
    },
    "tags": [ # Optional section.
      {
        "key": "client",
        "value": "com.example"
      },
      {
        "key": "job",
        "value": "recovery"
      }
    ],
    "bootstrapActionConfigs": [
      {
        "name": "Elasticity Bootstrap Action",
        "scriptBootstrapAction": {
          "path": "s3://snowplow-hosted-assets-eu-central-1/common/emr/snowplow-ami4-bootstrap-0.2.0.sh",
                  # Make sure to change the name of the bucket with the appropriate region.
                  # It must be the same region in which you are creating the cluster.
          "args": [ "1.5" ]
        }
      }
    ],
    "applications": [ "Hadoop"]
  }
}

Dataflow Runner playbook

# Remove all comments before use and validate JSON

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1", # Do not change.
  "data": {
    "region": "eu-central-1", # Must be the same region in which you are creating the EMR cluster.
    "credentials": {
      "accessKeyId": "{{secret "aws-access-key-id"}}", # Environment variable or hardcoded value.
      "secretAccessKey": "{{secret "aws-secret-access-key"}}" # Environment variable or hardcoded value.
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "Combine Months",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src",
          "s3n://path/to/enriched/bad/", # Find this in your pipeline repo,
                                         # in snowplow/config.yml, under aws.s3.buckets.enriched.bad.
                                         # Don't forget the trailing slash.
          "--dest",
          "hdfs:///local/monthly/",
          "--groupBy",
          ".*(run)=2017-0[1-9].*", # This will only select bad rows from runs in the period 1 Jan 2017 to 30 Sep 2017.
          "--targetSize",
          "128",
          "--outputCodec",
          "lzo"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "Fix up bad rows",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "s3://snowplow-hosted-assets-eu-central-1/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0.jar", 
                  # Make sure to change the name of the bucket with the appropriate region.
                  # It must be the same region in which you are creating the cluster.
        "arguments": [
          "com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob",
          "--hdfs",
          "--input",
          "hdfs:///local/monthly/*",
          "--output",
          "hdfs:///local/recovery/",
          "--inputFormat",
          "bad",
          "--script",
        "ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "Back to S3",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src",
          "hdfs:///local/recovery/",
          "--dest",
          "s3n://path/to/recovered/" 
              # Create this bucket in S3 before running the job. Make the naming 
              # consistent with your other snowplow buckets.
              # For example, if your enriched/good bucket is named "s3n://sp-com-example-965698660934-5-batch-output/main/enriched/good",
              # name the recovery bucket something like "s3n://sp-com-example-965698660934-5-201710-recovery-job/recovered".
              # 201710 refers to the year and month when the recovery took place.
              # Don't forget the trailing slash.
        ]
      }
    ],
    "tags": [ # Optional section.
      {
        "key": "client",
        "value": "com.example"
      },
      {
        "key": "job",
        "value": "recovery"
      }
    ]
  }
}

For more information on how to control the range of input bad rows using the --groupBy argument, please see the wiki page.

Run the job in Dataflow Runner

$ ./path/to/dataflow-runner run-transient --emr-config ./path/to/emr-config.json --emr-playbook ./path/to/playbook.json

(Your AWS credentials should be in environmental variables when launching Dataflow Runner.)

For more options, check out the Dataflow Runner documentation.

Start this job and then keep an eye on it in the EMR console. If any step fails, inspect the logs (find them in the cluster details in the EMR console) to see why.

Once the job is completed, check the recovered bucket on S3 to see if everything worked out as expected. If you are returning a debug message instead of null, you should see it here. Make sure to clean up the JavaScript function (and clean up the recovered bucket) before kicking off a ‘production’ run of the job.

Re-processing the recovered events

Before we re-process our recovered events

Assuming the job completes successfully, we now have the fixed-up raw events available in s3://my-recovery-bucket/recovered for reprocessing. We now need to process them through the Snowplow batch pipeline.

Before we start, double check that the missing schemas are now available from one of the Iglu registries in your resolver! Otherwise this event recovery process will be unsuccessful.

Next, we need to ensure that no other jobs are running - we don’t want a conflict between our recovery job and our regular batch pipeline runs. If you have the regular job running on a frequent cron, it’s a good idea to disable it for the duration. Don’t forget to re-enable it afterwards.

Processing the recovered events through Snowplow

We are now ready to kick off the batch pipeline.

Create a copy of your regular config.yml file, calling it config-recovery.yml or similar. You need to update the aws.s3.buckets.raw section of your EmrEtlRunner configuration file so it looks something like this:

raw:
  in:
    - s3://does-not-exist # The "in" section will be ignored
  processing: s3://{{my-recovery-bucket/recovered}}

Now, you can run EmrEtlRunner using your new config-recovery.yml and with the --skip staging option, since the data is already in the processing bucket. The reason for treating the recovery bucket as the processing location and skipping staging is explained in the documentation.

This pipeline run should complete fine - take a note of the run= ID of the recovery run (as seen in the archive folders) so that you can distinguish this run from regular pipeline runs in the future.

And that’s it! We’ve successfully recovered Snowplow events which ended up in bad rows due to a missing schema.

If you have a different event recovery challenge, do please create a new Discourse thread and we’ll be happy to brainstorm it there!


Event recovery with Scala Stream Collector
#2

Thanks for this tutorial it worked almost without a flaw. We had the problem, that the recovered files are called like “part-0001”,“part-0002” and if we then start the pipeline with config-recovery.yml the files are not picked up, because it searches for files with this regex .*([0-9]+-[0-9]+-[0-9]+)-[0-9]+.* so we had to rename these files and I’m not aware of any emr parameters to change this regex.