Using Hadoop Event Recovery to recover events with a missing schema [tutorial]

Our new Hadoop Event Recovery project (documentation) lets you fix up Snowplow bad rows and make them ready for reprocessing, by writing your own custom JavaScript to execute on each bad row.

While this is a powerful tool, using it can be quite involved. This tutorial walks you through one common use case for event recovery: where some of your events failed validation because you forgot to upload a particular schema. Let’s get started.

Refresher on Snowplow bad rows

Snowplow bad rows look like this:

{
  "line": "2015-06-09\t09:56:09\tNRT12\t831\t211.14.8.250\tGET...",
  "errors": [{
    "level": "error",
    "message": "Could not find schema with key iglu:com.acme/myevent/jsonschema/1-0-0 in any repository"
  }]
}

This is the structure that Hadoop Event Recovery will be working on.

Writing our function

The Hadoop Event Recovery jar will extract the “line” string (containing the original raw event) and an array of all the error messages which describe why the line failed validation, and pass them to your JavaScript function.

We need to write a JavaScript function called process which accepts two arguments: the raw line string and the error message array. The function should always return either null (signifying that the bad row should be ignored, not recovered) or a string which will be used as the new bad row. Your JavaScript can define other top-level functions besides as process. We also provide several built-in JavaScript functions for you to call - for more detail check out the documentation.

Remember, we want to recover events where the failure message involved a missing schema. The JavaScript function looks like this:

function process(event, errors) {
    for (var i = 0; i < errors.length; i++) {
        if (! /Could not find schema with key/.test(errors[i])) {
            return null;
        }
    }
    return event;
}

We only want to reprocess events which only failed due to the missing schema, so we return null (meaning that we won’t recover the event) if any of the error messages is not a missing schema error. Otherwise we return the original raw line, which should now pass validation because we have uploaded the schema.

Next we use base64encode.org to encode our script:

ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9

Running the Hadoop Event Recovery job

Now we ready to run the job using the AWS CLI:

$ aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{
    "InstanceProfile":"EMR_EC2_DefaultRole",
    "AvailabilityZone":"{{...}}",
    "EmrManagedSlaveSecurityGroup":"{{...}}",
    "EmrManagedMasterSecurityGroup":"{{...}}"
}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.3.0 --log-uri 's3n://{{path to logs}}' --steps '[
{
    "Args":[
        "--src",
        "s3n://{{my-output-bucket/enriched/bad}}/",
        "--dest",
        "hdfs:///local/monthly/",
        "--groupBy",
        ".*(run)=2014.*",
        "--targetSize",
        "128",
        "--outputCodec",
        "lzo"
    ],
    "Type":"CUSTOM_JAR",
    "ActionOnFailure":"TERMINATE_CLUSTER",
    "Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
    "Name":"Combine Months"
},
{
    "Args":[
        "com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob",
        "--hdfs",
        "--input",
        "hdfs:///local/monthly/*",
        "--output",
        "hdfs:///local/recovery/",
        "--inputFormat",
        "bad",
        "--script",
        "VHlwZSAob3IgcGFzdGUpIGhlcmUuLi5mdW5jdGlvbiBwcm9jZXNzKGV2ZW50LCBlcnJvcnMpIHsNCglmb3IgKHZhciBpID0gMDsgaSA8IGVycm9ycy5sZW5ndGg7IGkrKykgew0KCQlpZiAoISAvQ291bGQgbm90IGZpbmQgc2NoZW1hIHdpdGgga2V5Ly50ZXN0KGVycm9yc1tpXSkpIHsNCgkJCXJldHVybiBudWxsOw0KCQl9DQoJfQ0KCXJldHVybiBldmVudDsNCn0="
    ],
    "Type":"CUSTOM_JAR",
    "ActionOnFailure":"CONTINUE",
    "Jar":"s3://snowplow-hosted-assets/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0.jar",
    "Name":"Fix up bad rows"
},
{
    "Args":[
        "--src",
        "hdfs:///local/recovery/",
        "--dest",
        "s3n://{{my-recovery-bucket/recovered}}"
    ],
    "Type":"CUSTOM_JAR",
    "ActionOnFailure":"TERMINATE_CLUSTER",
    "Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
    "Name":"Back to S3"
}
]' --name 'MyCluster' --instance-groups '[
    {
        "InstanceCount":1,
        "InstanceGroupType":"MASTER",
        "InstanceType":"m1.medium",
        "Name":"MASTER"
    },
    {
        "InstanceCount":2,
        "InstanceGroupType":"CORE",
        "InstanceType":"m1.medium",
        "Name":"CORE"
    }
]'

There are a couple of things to note about this command. First, the placeholders in curly brackets should be replaced with actual S3 paths. Second, the --groupBy argument’s value of .*(run)=2014.* means that only bad rows from 2014 will be considered for recovery. For more information on how to control the range of input bad rows using the --groupBy argument, please see the wiki page.

Start this job and then keep an eye on the job in the EMR console.

Before we re-process our recovered events

Assuming the job completes successfully, we now have the fixed-up raw events available in s3://my-recovery-bucket/recovered for reprocessing. We now need to process them through the Snowplow batch pipeline.

Before we start, double check that the missing schemas are now available from one of the Iglu registries in your resolver! Otherwise this event recovery process will be unsuccessful.

Next, we need to ensure that no other jobs are running - we don’t want a conflict between our recovery job and our regular batch pipeline runs. If you have the regular job running on a frequent cron, it’s a good idea to disable it for the duration. Don’t forget to re-enable it afterwards.

Processing the recovered events through Snowplow

We are now ready to kick off the batch pipeline.

Create a copy of your regular config.yml file, calling it config-recovery.yml or similar. You need to update the aws.s3.buckets.raw section of your EmrEtlRunner configuration file so it looks something like this:

raw:
  in:
    - s3://does-not-exist # The "in" section will be ignored
  processing: s3://{{my-recovery-bucket/recovered}}

Now, you can run EmrEtlRunner using your new config-recovery.yml and with the --skip staging option, since the data is already in the processing bucket. The reason for treating the recovery bucket as the processing location and skipping staging is explained in the documentation.

This pipeline run should complete fine - take a note of the run= ID of the recovery run (as seen in the archive folders) so that you can distinguish this run from regular pipeline runs in the future.

And that’s it! We’ve successfully recovered Snowplow events which ended up in bad rows due to a missing schema.

If you have a different event recovery challenge, do please create a new Discourse thread and we’ll be happy to brainstorm it there!

2 Likes

@alex - Thank you for the detailed post.

For others who may follow this tutorial word for word, the proper base64encoded function to reprocess events with missing schemas is below:

ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9

Thanks @digitaltouch - I’ve fixed the original!

Sorry for bumping up this old thread, but I did not want to create a separate one.
I’m trying to fire off the EMR job like this:

aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{
"InstanceProfile":"EMR_EC2_DefaultRole",
"SubnetId": "subnet-xxxxxx",
"EmrManagedSlaveSecurityGroup":"sg-xxxxxx",
"EmrManagedMasterSecurityGroup":"sg-xxxxxx",
"ServiceAccessSecurityGroup":"sg-xxxxxx"
}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.5.0 --log-uri 's3://my-snowplow-etl/logs/' --steps '[
{
"Args":[
    "--src",
    "s3://my-snowplow-data/enriched/bad",
    "--dest",
    "hdfs:///local/monthly/",
    "--groupBy",
    ".*(run)=2017.*",
    "--targetSize",
    "128",
    "--outputCodec",
    "lzo"
],
"Type":"CUSTOM_JAR",
"ActionOnFailure":"TERMINATE_CLUSTER",
"Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"Name":"Combine Months"
},
{
"Args":[
    "com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob",
    "--hdfs",
    "--input",
    "hdfs:///local/monthly/*",
    "--output",
    "hdfs:///local/recovery/",
    "--inputFormat",
    "bad",
    "--script", "ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9"
],
"Type":"CUSTOM_JAR",
"ActionOnFailure":"CONTINUE",
"Jar":"s3://snowplow-hosted-assets/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0.jar",
"Name":"Fix up bad rows"
},
{
"Args":[
    "--src",
    "hdfs:///local/recovery/",
    "--dest",
    "s3://my-snowplow-data/recovered"
],
"Type":"CUSTOM_JAR",
"ActionOnFailure":"TERMINATE_CLUSTER",
"Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"Name":"Back to S3"
}
]' --name 'snowplow ETL bad rows' --instance-groups '[
{
    "InstanceCount":1,
    "InstanceGroupType":"MASTER",
    "InstanceType":"c4.large",
    "Name":"MASTER"
},
{
    "InstanceCount":2,
    "InstanceGroupType":"CORE",
    "InstanceType":"c4.large",
    "Name":"CORE"
}
]'

No matter what script I try, the ‘Fix up bad rows’ step simply fails with no error message in the logs. The job takes 0 seconds, does not produce any output whatsoever. I tried downloading all ETL logs, but could not find any error message. Tried different emr releases, 4.3.0 4.4.0 4.5.0 to no avail. Any ideas?

Anyone? Have a working setup?

Do you have a directory with the correct timestamp (for example “run=2017.02.02”) in our “s3://my-snowplow-data/enriched/bad” bucket? I also would test the script first on some small amount of events and not on all of them!
If not, you can change the regex below, to fit your needs.

I did the recovering last week and had it running after some failures, if you need more help, let me know.

Hi, tclass.

My regexp is definitely correct and matches directories inside the bad bucket. I’ve also tested with just supplying 1 directory for processing like this:

    "--groupBy",
    ".*(run)=2017-05-28-22-01-48",

Same issue:

There’s no output in the EMR logs. Could it have something to do with my ec2 config? I’m running the job in my VPC. Although, the same security groups are used for the main snowplow pipeline and it runs smoothly.

I haven’t seen the Combine Months step fail before as it doesn’t really do anything particularly complicated other than move data from S3 to HDFS. One thing I have noticed is that in the args for the Combine Months task you have used the s3:// prefix rather than the s3n:// prefix - have you tried using the s3n:// prefix instead?

Yes, same result with s3n://

Did you;

  • for the buckets you define, I have an / at the end, don’t know if that can lead to any problems
  • I added a "KeyName": "snowplow" in the ec2-attribute field. I just added it, because the normal cluster also runs with these keynames, I’m not sure, what the keyname does.
  • there is no return after your “–script”, shouldn’t matter, but who knows

this is the configuration I used with some things in brackets that I have to adjust next time:
https://pastebin.com/UMdJtcfy

Yeah, the trailing slash apparently does matter. “KeyName” just provides the option to insert your pubkey into EMR instances, so it’s optional. Got it to work with 4.5.0 version. 4.3.0 did not work:

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/hadoop-
yarn/staging/hadoop/.staging/job_1496058522298_0001/job.jar could only be replicated to 0 nodes instead of 
minReplication (=1).  There are 0 datanode(s) running and no node(s) are excluded in this operation.

Thanks for your help. Got the ‘Combine Months’ step working. The next error was due to hadoop instances running out of memory. Found this after digging into etl log bucket:

Container killed on request. Exit code is 143

Increased the instance size:

 Master: Terminated 1 c4.xlarge
 Core: Terminated 2 c4.2xlarge

which should be plenty, but received exit code 143 again. Could be the javascript itself failing - do you have methods for debugging?

yes there are, at the end of this thread: Process bad rows from Elasticsearch and form them into good rows

1 Like

Wow thanks! That’s perfect.

Tested out my javascript with Rhino, seems to work. Cannot seem to get step3 to do anything at all. Tried many different javascript functions, same error. Seems the issue could lie somewhere else. Any ideas?

I thought maybe the cluster could be running out of space or memory, as c4 instances have EBS storage only. Switched to c3 instances, still same issue. Interestingly, EMR console does not display any data in the monitoring section.

There should be a Log URI section in your EMR Job, this is where the EMR puts all the logfiles. Maybe the job doesn’t log anything for this step, but sometimes you can see a hint in the other logfiles.

Yes, the only thing of interest I found in the full logs is this:

2017-05-31 13:16:50,034 INFO [RMCommunicator Allocator]         
org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: After Scheduling: PendingReds:0 ScheduledMaps:0 
ScheduledReds:0 AssignedMaps:0 AssignedReds:4 CompletedMaps:1 CompletedReds:4 ContAlloc:6 ContRel:0 
HostLocal:1 RackLocal:0
2017-05-31 13:16:50,034 INFO [AsyncDispatcher event handler] 
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from 
attempt_1496236471024_0001_r_000001_0: Container killed by the ApplicationMaster.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

2017-05-31 13:16:51,036 INFO [RMCommunicator Allocator] 
org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Received completed container 
container_1496236471024_0001_01_000003
2017-05-31 13:16:51,037 INFO [RMCommunicator Allocator] 
org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Received completed container 
container_1496236471024_0001_01_000006
2017-05-31 13:16:51,037 INFO [AsyncDispatcher event handler] 
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from 
attempt_1496236471024_0001_r_000000_0: Container killed by the ApplicationMaster.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

2017-05-31 13:16:51,037 INFO [RMCommunicator Allocator] 
org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator: Received completed container 
container_1496236471024_0001_01_000007
2017-05-31 13:16:51,037 INFO [AsyncDispatcher event handler] 
org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from 
attempt_1496236471024_0001_r_000003_0: Container killed by the ApplicationMaster.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

Which could indicate heap/memory issue, but I’m now using really beefy ec2 instances and it’s still the same.

the beefyness always depends on how much data we talk about? Can you try the job on just one folder with some MBs of bad rows, otherwise I have no idea what it could be, but I would try to narrow down the vector of possible problems and if it’s an OOM it might be just too much events

Yeah, I tried with c3.4xlarge instances and the bad row folder weighs 32MB, so it can’t be a resource issue. If it was the script issue, I suppose I’d have some output/errors from it and the job would atleast take some time. It runs for 0 seconds with all example js from the tutorial and my own js and some gibberish encoded text so I’m lost at this point.