Python script to reprocess "bad" rows

We wanted to share a quick python script we use to format the “bad” event json to back to a format that the EmrEtlRunner can run. We use this in place of Hadoop-Bad-Rows when we are testing or just feel like using a “drag and drop” vs. booting an Emr job.

#!/usr/bin/python
import sys`
import tempfile
import shutil
import os
import os.path
import json
import uuid

rootdir = sys.argv[1]
newdir = sys.argv[2]

# walk the directory and sub directories
for subdir, dirs, files in os.walk(rootdir):
    for file in files:
        # filter out system files
        if not file.startswith('.'):
            filename = os.path.join(subdir, file)
            # create a temporary file for new json
            tmp = tempfile.NamedTemporaryFile(delete=False)
            new_file_name = newdir + file + str(uuid.uuid4())
            with open(filename) as f:
                print 'Reading: ' + filename
                with open(tmp.name,'wb') as ftmp:
                    for line in f:
                        bad_json = json.loads(line)
                        ftmp.write(bad_json["line"] + '\n')
                shutil.move(tmp.name,new_file_name)
            print 'Finished: ' + filename
2 Likes

Thanks for sharing this @digitaltouch! Could you maybe share with the community some of the common scenarios where you want to extract and re-process bad rows?

Absolutely. I should have been more detailed to begin with.

We recently had a webhook that changed unannounced and we did not have the additionalProperties flag set in the schema. The webhook immediately started failing. We were able to identify this quickly by downloading the bad bucket, walking the directories and parsing the files. A simple upload back to the processing bucket and we were able to run EmrEtlRunner with --skip staging.

We also had a wild idea for an alerting system using S3 -> Lambda -> SNS to alert our team to check in on Kibana.

We can parse the text file that is output in the bad bucket and check if the size of the file is greater than a certain threshold (generally a sign that there are a large number of bad events). Lambda is very cost efficient. Thoughts?

Thanks for sharing this @digitaltouch!

The alerting idea with Lambda is cool. I don’t know if you are loading your bad rows into Elasticsearch - if you are, then you could maybe try out this Elasticsearch alerting module to achieve something similar:

@digitaltouch where in your flow did you fix the bad rows before they were reprocessed? or was this merely handled by adding the missing flag in your schema and then having all bad rows reprocess?

@travisdevitt

This particular case was solved by adding the missing flag in the schema and reprocessing the bad rows. Our experience has been the “bad” rows are generally caused by incorrect schemas more often than not.

Do you have some examples of data that was malformed and was solved using a process other than a schema revision or changing the collector code?

Not yet, but I’m curious how Snowplowers would deal with a situation where their schema was correct but the data they sent was malformed, in which case the rows themselves might need to be altered with a script.

Hey @travisdevitt - Snowplow R81 is going to support exactly that scenario, and there will be a Discourse tutorial on using the new functionality too. Stay tuned!

1 Like

I’m not sure how the filenames generated by the Python script would be compatible with the s3-dist-cp regex: ..([0-9]±[0-9]±[0-9]+)-[0-9]+..

Modifying the script to generate valid hourstamps that are properly picked up by s3-dist-cp does the trick:

#!/usr/bin/env python
import sys
import os
import os.path
import json
import uuid

from datetime import datetime

rootdir = sys.argv[1]
newdir = sys.argv[2]

# walk the directory and sub directories
for subdir, dirs, files in os.walk(rootdir):
    for file in files:
        # filter out system files
        if not file.startswith('.'):
            filename = os.path.join(subdir, file)
            new_filename = os.path.join(newdir, "%s.%s.%s" % (file, datetime.now().strftime("%Y-%m-%d-%H"), uuid.uuid4()))
            print 'Reading: ' + filename
            with open(filename) as f:
                with open(new_filename,'wb') as nf:
                    for line in f:
                        bad_json = json.loads(line)
                        nf.write(bad_json["line"] + '\n')
                print 'Finished: ' + new_filename

Also looking forward to built-in support for bad row reprocessing in Snowplow CLI :slight_smile:

Hi @rgabo,

We released Snowplow R81 with Hadoop Event Recovery a couple of days ago:

1 Like

Thanks @christophe, will check!

On a similar note, I’ve modified the original script to extract the errors in a palatable format. May I present extract_errors.py:

#!/usr/bin/env python
import codecs
import sys
import os
import os.path
import json


from datetime import datetime

rootdir = sys.argv[1]
newdir = sys.argv[2]

# walk the directory and sub directories
for subdir, dirs, files in os.walk(rootdir):
    for file in files:
        # filter out system files
        if not file.startswith('.'):
            filename = os.path.join(subdir, file)
            new_filename = os.path.join(newdir, file)
            with open(filename) as f:
                print 'Reading: ' + filename
                with codecs.open(new_filename, 'w', 'utf-8') as nf:
                    for line in f:
                        bad_json = json.loads(line)
                        if bad_json["errors"]:
                            for error in bad_json["errors"]:
                                nf.write(error["message"] + '\n')
            print 'Finished: ' + new_filename

I’ve also added proper utf-8 encoding to the output.

3 Likes