Re-run the enrichment bad log

Hello There,

Couple of week ago we did some changes in Snowplow pageview events for our NodeJS application.
the changes was, we were trying to override the the platform with some custom value, but we come to know after that we can’t override the platform because it is enum, now the problem is we are not getting any of events since 15 days in our redshift table and it’s almost million of record, which actually going to bad enrichment folder.
now my question is I want to process those all bad enrichment with platform changes.
and want to rerun those events because it’s already archive in enrichment folder, so can you guys suggest a best way to re-process those bad events.

Hey @birju1100 - check out this tutorial: Using Hadoop Event Recovery to recover events with a missing schema [tutorial]

Thanx @alex,

I followed the doc and ran the job but nothing came out.

this is my recovery.js

function process(event, errors) {

    var failedUrl = false;

    for (var i = 0; i < errors.length; i++) {
            var err = errors[i];
            if (!isMissingSchemaError(err)) {
                    return null;
            }else {
                var fields = tsvToArray(event);
                if (fields[5] == 'GET') {
                        var querystring = parseQuerystring(fields[11]);
                        querystring['p'] = 'mob';
                        fields[11] = buildQuerystring(querystring);
                        return arrayToTsv(fields);
                } else {
                        return null;
                }
            } 
    }
}

function isMissingSchemaError(err) {
    return /Could not find schema with key/.test(err);
}

I am changing the platform value to mob.

can you help me out to debug the issue, and also can you tell me how I can debug the javascript.

Hi @birju1100 - is there a reason why you are checking that this is a missing schema error before proceeding? Surely you should be checking for an “invalid platform” error…

Hey @alex,

It works thanks, the problem was in the script I did not add the returns outside for loop.
now I change my script like this.

  var fields = tsvToArray(event);
    if (fields[5] == 'GET') {
        var querystring = parseQuerystring(fields[11]);
        querystring['p'] = 'mob';
        fields[11] = buildQuerystring(querystring);
    }
    return arrayToTsv(fields);
}

so it’s working.

Hello @alex,

 #!/usr/bin/env bash
 main() {
   aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{
     "InstanceProfile":"EMR_EC2_DefaultRole",
     "AvailabilityZone":"eu-west-1b"
 }' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.3.0 --log-uri 's3n://company-snowplow-troubleshoot/recovery/logs/cluster/' --steps '[
   {
     "Args":[
         "--src",
         "s3n://company-snowplow-efritin/logs/enriched/bad/",
         "--dest",
         "hdfs:///local/monthly/",
         "--groupBy",
         ".*(run)=2016-08.*",
         "--targetSize",
         "128",
         "--outputCodec",
         "lzo"
     ],
     "Type":"CUSTOM_JAR",
     "ActionOnFailure":"TERMINATE_CLUSTER",
     "Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
     "Name":"Combine Months"
   },
   {
     "Args":[
         "com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob",
         "--hdfs",
         "--input",
         "hdfs:///local/monthly/*",
         "--output",
         "hdfs:///local/recovery/",
         "--inputFormat",
         "bad",
         "--script",
         "ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7CiAgdmFyIGZpZWxkcyA9IHRzdlRvQXJyYXkoZXZlbnQpOwogICAgaWYgKGZpZWxkc1s1XSA9PSAnR0VUJykgewogICAgICAgIHZhciBxdWVyeXN0cmluZyA9IHBhcnNlUXVlcnlzdHJpbmcoZmllbGRzWzExXSk7CiAgICAgICAgcXVlcnlzdHJpbmdbJ3AnXSA9ICdtb2InOwogICAgICAgIGZpZWxkc1sxMV0gPSBidWlsZFF1ZXJ5c3RyaW5nKHF1ZXJ5c3RyaW5nKTsKICAgIH0KICAgIHJldHVybiBhcnJheVRvVHN2KGZpZWxkcyk7Cn0KCg=="    ],
     "Type":"CUSTOM_JAR",
     "ActionOnFailure":"CONTINUE",
     "Jar":"s3://snowplow-hosted-assets/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0.jar",
     "Name":"Fix up bad rows"
   },
   {
     "Args":[
         "--src",
         "hdfs:///local/recovery/",
         "--dest",
         "s3n://company-snowplow-troubleshoot/logs/raw/processing/"
     ],
     "Type":"CUSTOM_JAR",
     "ActionOnFailure":"TERMINATE_CLUSTER",
     "Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
     "Name":"Back to S3"
   }
   ]' --name 'Snowplow-Recovery-Cluster' --instance-groups '[
     {
         "InstanceCount":1,
         "InstanceGroupType":"MASTER",
         "InstanceType":"m1.medium",
         "Name":"MASTER"
     },
     {
         "InstanceCount":2,
         "InstanceGroupType":"CORE",
         "InstanceType":"m3.xlarge",
         "Name":"CORE"
     }
  ]'
 }

 main "$@"

This is my EMR configuration when I am running this Fix up bad rows steps are failing this is the log

2016-08-17 18:36:59,456 INFO cascading.flow.hadoop.util.HadoopUtil (main): resolving application jar from found main method on: com.snowplowanalytics.hadoop.scalding.JobRunner$
2016-08-17 18:36:59,459 INFO cascading.flow.hadoop.planner.HadoopPlanner (main): using application jar: /mnt/var/lib/hadoop/steps/s-2G3U750KH6Q9K/snowplow-hadoop-event-recovery-0.2.0.jar
2016-08-17 18:36:59,478 INFO cascading.property.AppProps (main): using app.id: 04799887A3E1448CB011342C972A9FE3
2016-08-17 18:36:59,692 INFO org.apache.hadoop.conf.Configuration.deprecation (main): mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
2016-08-17 18:36:59,756 INFO org.apache.hadoop.conf.Configuration.deprecation (main): mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
2016-08-17 18:36:59,770 WARN cascading.util.Version (main): found multiple 'cascading/version.properties' files on the CLASSPATH. Please check your dependencies: file:/mnt/var/lib/hadoop/steps/s-2G3U750KH6Q9K/tmp/hadoop-unjar2414297679156794865/cascading/version.properties,jar:file:/mnt/var/lib/hadoop/steps/s-2G3U750KH6Q9K/snowplow-hadoop-event-recovery-0.2.0.jar!/cascading/version.properties, using first returned
2016-08-17 18:37:00,117 INFO cascading.util.Version (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): Concurrent, Inc - Cascading 2.7.0
2016-08-17 18:37:00,121 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....] starting
2016-08-17 18:37:00,122 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....]  source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["hdfs:/local/monthly/*"]
2016-08-17 18:37:00,122 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....]  sink: Hfs["TextDelimited[['altered']]"]["hdfs:/local/recovery"]
2016-08-17 18:37:00,122 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....]  parallel execution is enabled: true
2016-08-17 18:37:00,122 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....]  starting jobs: 1
2016-08-17 18:37:00,123 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....]  allocating threads: 1
2016-08-17 18:37:00,124 INFO cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] starting step: (1/1) hdfs:/local/recovery
2016-08-17 18:37:00,291 INFO org.apache.hadoop.yarn.client.RMProxy (pool-4-thread-1): Connecting to ResourceManager at ip-10-73-164-48.eu-west-1.compute.internal/10.73.164.48:8032
2016-08-17 18:37:00,743 INFO org.apache.hadoop.yarn.client.RMProxy (pool-4-thread-1): Connecting to ResourceManager at ip-10-73-164-48.eu-west-1.compute.internal/10.73.164.48:8032
2016-08-17 18:37:01,775 INFO amazon.emr.metrics.MetricsSaver (pool-4-thread-1): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1471458713864 
2016-08-17 18:37:01,775 INFO amazon.emr.metrics.MetricsSaver (pool-4-thread-1): Created MetricsSaver j-2QPHXXGPPETKV:i-d2cf1c59:RunJar:08173 period:60 /mnt/var/em/raw/i-d2cf1c59_20160817_RunJar_08173_raw.bin
2016-08-17 18:37:02,807 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (pool-4-thread-1): Loaded native gpl library
2016-08-17 18:37:02,824 INFO com.hadoop.compression.lzo.LzoCodec (pool-4-thread-1): Successfully loaded & initialized native-lzo library [hadoop-lzo rev 02f444f0932ea7710dcc4bcdc1aa7ca55adf48c9]
2016-08-17 18:37:02,943 INFO org.apache.hadoop.mapred.FileInputFormat (pool-4-thread-1): Total input paths to process : 10
2016-08-17 18:37:03,163 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-4-thread-1): number of splits:10
2016-08-17 18:37:03,723 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-4-thread-1): Submitting tokens for job: job_1471458701193_0002
2016-08-17 18:37:04,037 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-4-thread-1): Submitted application application_1471458701193_0002
2016-08-17 18:37:04,096 INFO org.apache.hadoop.mapreduce.Job (pool-4-thread-1): The url to track the job: http://ip-10-73-164-48.eu-west-1.compute.internal:20888/proxy/application_1471458701193_0002/
2016-08-17 18:37:04,097 INFO cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] submitted hadoop job: job_1471458701193_0002
2016-08-17 18:37:04,098 INFO cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] tracking url: http://ip-10-73-164-48.eu-west-1.compute.internal:20888/proxy/application_1471458701193_0002/
2016-08-17 18:37:30,317 INFO cascading.util.Update (UpdateRequestTimer): newer Cascading release available: 2.7.1
2016-08-17 18:48:40,069 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] hadoop job job_1471458701193_0002 state at FAILED
2016-08-17 18:48:40,070 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] failure info: Task failed task_1471458701193_0002_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0

2016-08-17 18:48:40,106 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] task completion events identify failed tasks
2016-08-17 18:48:40,106 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] task completion events count: 10
2016-08-17 18:48:40,108 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000009_0, Status : FAILED
2016-08-17 18:48:40,109 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000001_0, Status : FAILED
2016-08-17 18:48:40,109 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000009_1, Status : FAILED
2016-08-17 18:48:40,109 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000002_0, Status : FAILED
2016-08-17 18:48:40,109 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000007_0, Status : FAILED
2016-08-17 18:48:40,109 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000001_1, Status : FAILED
2016-08-17 18:48:40,109 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000002_1, Status : FAILED
2016-08-17 18:48:40,110 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000008_0, Status : SUCCEEDED
2016-08-17 18:48:40,110 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000007_1, Status : FAILED
2016-08-17 18:48:40,110 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1471458701193_0002_m_000009_2, Status : FAILED
2016-08-17 18:48:40,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....] stopping all jobs
2016-08-17 18:48:40,119 INFO cascading.flow.FlowStep (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....] stopping: (1/1) hdfs:/local/recovery
2016-08-17 18:48:40,121 INFO cascading.flow.Flow (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): [com.snowplowanalytics....] stopped all jobs
2016-08-17 18:48:40,131 INFO cascading.tap.hadoop.util.Hadoop18TapUtil (flow com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob): deleting temp path hdfs:/local/recovery/_temporary

Can you help me out where I am missing.

function process(event, errors) {
  var fields = tsvToArray(event);
    if (fields[5] == 'GET') {
        var querystring = parseQuerystring(fields[11]);
        querystring['p'] = 'mob';
        fields[11] = buildQuerystring(querystring);
    }
    return arrayToTsv(fields);
}

@alex any luck