Correct/change data in thrift LZO files for reprocessing

Recently we have added a new field to one of our custom context tables. The developer that was creating the payload for this context thought at the time that the value of this field would fit into a numeric(3,2) type in Redshift. As it turned out, some of the values had 10 to 12 decimal places so we are getting errors during enrichment such as:

remainder of division is not zero (1.100000023841858 / 0.01)

The developer has now added to his JS code a check to make sure the values passed in the request back to our collector will not cause this error.

The problem I am trying to solve now is to take all the raw archived events, change the value for this custom context field to one with 2 decimal places, save them again in the correct format, and reprocess them through our snowplow pipeline. I think the format of the raw archived files are LZO compressed thrift files and I am having a hard time figuring out a way to process those. I have tried following the suggestions on this psot https://www.snowflake-analytics.com/blog/2016/12/13/decoding-snowplow-real-time-bad-rows-thrift with no success. The Python script I have written is having a hard time parsing the thrift file. I of course have used lzop to decompress the .lzo first, so not sure what the issue is.

I am writing to find out if others have a suggestion on how to accomplish this task. I have searched discourse and found topics where others have solution for processing the same thrift files, but in my case I need to correct a value first before I process them again. Could I take the payload from my enriched/bad/ S3 bucket and send them to the collector again in a new request? Could I somehow take payloads from enriched/bad/, change the value for the new field to one with 2 decimal places and then add them back to the /enriched/good/ S3 bucket and have snowplow pick them up from there?

I appreciate any help on this issue.

Thank you

Hi @dkeidel,

If the rows that were failing have ended up in bad (in s3 enriched/bad) the recommended approach is to use Hadoop Event Recovery to recover these events from data that has ended up in bad. This data then ends up in a recovery folder which can then be rerun through the enrichment process and then through storage-loader into Redshift.

In this circumstance as you haven’t had a schema change you should be able to filter to the events that were causing this error and either transform by reducing the precision of this field - or alternately keeping the value and loading it into Redshift with the new column.

1 Like

Thank you Mike for your reply.

  1. I see the Hadoop Event Recovery was part of the release we are using which is r81. Will this version be fine to use or should I upgrade?

  2. I would like to know a little more about your alternative option. Are
    you saying keep the value as is, but load it into Reshift as a VARCHAR, for
    example. I am not clear on what you meant by “new column”?

  3. Lastly, can I also take the enrich/bad/ payloads and reprocess them by
    sending them in a POST request to our collector or add them to the kinesis
    stream directly? I really want to go the route of changing the value of
    the rows to values with 2 decimal places and keep the field as numeric(3,2).

I appreciate all your help. Thank you.

  1. You should be able to use the 0.2.0 version of Hadoop Event Recovery without any issues.

  2. You could modify the table such that the field in Redshift has increased precision - but doing Event Recovery is the better option here.

  3. You really want to replay events through the collector - Hadoop Event Recovery allows you to modify these rows from enriched/bad without having to replay anything! You’ll need to write a function to filter to these events and then you can transform them in whatever way you want (in this case reduce their precision/rounding). The result of this function is returning the modified row, and those rows in turn are written to S3 which is fed into the EMR process. Sometimes this can feel like a little bit more effort upfront but the alternatives such as replaying the data will induce a significant headache.

Thank you Mike for the reply. I have taken your advice and have spent the last day or so on writing the JS and the aws emr command. I am however having issues and errors when running my enriched/bad/ rows.

Here is the JS that I wrote. Can you look it over to let me know if you see anything wrong with it:

function process(event, errors) {

    var failedPixelDensity = false;

    for (var i = 0; i < errors.length; i++) {
            var err = errors[i];
            if (isBadPixelDensityError(err)) {
                    failedPixelDensity = true;
            } else {
                    return null;
            }
    }

    if (failedPixelDensity) {
	// seems that what needs to be done is that you must return
	// a base64 encoded event since that is what you started with
	
	// first base64 decode
	// var decodedEvent = decodeBase64(event)

	// base64 decoding creates tab separated string
        var fields = tsvToArray(event);

	// create dictionary of all query parameters
	// not too sure about it being position 11, but can change that
	var querystringDict = parseQuerystring(fields[11]);

	// get the context portion since this is what we want to edit
	var cxParameter = querystringDict['cx'];

	// base64 decode that now
	var cxContexts = decodeBase64(cxParameter);

	// convert resulting JSON string to javascript object
	var cx = parseJson(cxContexts)

	// get pixel_density value

	// round the pixel density to two decimal places
	// and replace the current value with rounded one
	for (i = 0; i < cx.data; i++) {
		var pd = cx.data[i].data.pixel_density;
		if (pd) {
			 cx.data[i].data.pixel_density = (pd).toFixed(2);
		}

	}

	// convert JSON object back to string and base64 encode it
	// and then replace the the cx key with that new value
	querystringDict['cx'] = encodeBase64(stringifyJson(cxJSON));

	// turn dictionary back into query string
	fields[postPosition] = buildQuerystring(querystringDict);


	// return tsv of the array
        return arrayToTsv(fields);
	

    } else {
        return null;
    }
}

function isBadPixelDensityError(err) {
    return /remainder of division is not zero/.test(err);
}

Here is an example row:

{"line":"CwBkAAAADTIzLjMwLjE4MS4yMTQKAMgAAAFaue28/QsA0gAAAAVVVEYtOAsA3AAAABFzc2MtMC43LjAta2luZXNpcwsBLAAAAGBNb3ppbGxhLzUuMCAoV2luZG93cyBOVCAxMC4wOyBXaW42NDsgeDY0OyBydjo1MS4wKSBHZWNrby8yMDEwMDEwMSBGaXJlZm94LzUxLjAuMSBXYXRlcmZveC81MS4wLjELATYAAAAzaHR0cHM6Ly9zZWFrYXlha2luZ3VzYS5zbXVnbXVnLmNvbS9DaGxvZUcvbi1Wd3NSWHovCwFAAAAAAi9pCwFKAAAEZ2U9c2Umc2VfY2E9VXBsb2FkJnNlX2FjPVVwbG9hZCUyMEVycm9yJnNlX2xhPUR1cGxpY2F0ZSUyMEZpbGUmc2VfcHI9RHVwbGljYXRlJTIwRmlsZSZ0dj1qcy0yLjQuMSZ0bmE9cHJpbWFyeSZhaWQ9V2Vic2l0ZSZwPXdlYiZ0ej1BbWVyaWNhJTJGTmV3X1lvcmsmbGFuZz1lbi1VUyZjcz1VVEYtOCZmX3BkZj0wJmZfcXQ9MCZmX3JlYWxwPTAmZl93bWE9MCZmX2Rpcj0wJmZfZmxhPTAmZl9qYXZhPTAmZl9nZWFycz0wJmZfYWc9MSZyZXM9NDI4OHgyNDEyJmNkPTI0JmNvb2tpZT0xJmVpZD04NTg5YWNiOC0wNDhmLTRhMDUtYTU2MS1jMGQwYmUyNmExODMmZHRtPTE0ODkxNzgwNTA0NTYmdnA9MjA1OHgxMDgzJmRzPTIwNTh4MTU2MiZ2aWQ9MjEyJmR1aWQ9MzcyMDAyMzRiODlmZTYwMCZmcD0yOTM1NzQ5MjcyJnVpZD1lcDVNUTduVENRTW9ZJnJlZnI9aHR0cHMlM0ElMkYlMkZzZWFrYXlha2luZ3VzYS5zbXVnbXVnLmNvbSUyRkNobG9lRyUyRm4tVndzUlh6JnVybD1odHRwcyUzQSUyRiUyRnNlYWtheWFraW5ndXNhLnNtdWdtdWcuY29tJTJGQ2hsb2VHJTJGbi1Wd3NSWHolMkYmY3g9ZXlKelkyaGxiV0VpT2lKcFoyeDFPbU52YlM1emJtOTNjR3h2ZDJGdVlXeDVkR2xqY3k1emJtOTNjR3h2ZHk5amIyNTBaWGgwY3k5cWMyOXVjMk5vWlcxaEx6RXRNQzB3SWl3aVpHRjBZU0k2VzNzaWMyTm9aVzFoSWpvaWFXZHNkVHBqYjIwdWMyMTFaMjExWnk5elkzSmxaVzR2YW5OdmJuTmphR1Z0WVM4eExUQXRNQ0lzSW1SaGRHRWlPbnNpYzJOeVpXVnVYMjVoYldVaU9pSlZjR3h2WVdSbGNpSjlmU3g3SW5OamFHVnRZU0k2SW1sbmJIVTZZMjl0TG5OdGRXZHRkV2N2ZFhObGNpOXFjMjl1YzJOb1pXMWhMekV0TUMwd0lpd2laR0YwWVNJNmV5SmhZM1JwYjI1ZmMyOTFjbU5sSWpvaVZYTmxjaUlzSW1selgyeHZaMmRsWkY5cGJpSTZkSEoxWlN3aWFYTmZkSEpwWVd4ZllXTmpiM1Z1ZENJNlptRnNjMlVzSW1selgzQmhjblJwWTJsd1lXNTBYMkZqWTI5MWJuUWlPbVpoYkhObGZYMHNleUp6WTJobGJXRWlPaUpwWjJ4MU9tTnZiUzV6YlhWbmJYVm5MMlJsZG1salpTOXFjMjl1YzJOb1pXMWhMekV0TUMweElpd2laR0YwWVNJNmV5SndhWGhsYkY5a1pXNXphWFI1SWpvd0xqZzVOVFV5TWpReE5UWXpOemszZlgxZGZRDwFeCwAAAAsAAAAbSG9zdDogc3RhdHMtbmV3LnNtdWdtdWcuY29tAAAAC0FjY2VwdDogKi8qAAAAIkFjY2VwdC1FbmNvZGluZzogZ3ppcCwgZGVmbGF0ZSwgYnIAAAAgQWNjZXB0LUxhbmd1YWdlOiBlbi1VUywgZW47cT0wLjUAAAirQ29va2llOiBvcHRpbWl6ZWx5U2VnbWVudHM9JTdCJTdEOyBvcHRpbWl6ZWx5RW5kVXNlcklkPW9ldTE0NTU5MTQ5MDE1NDNyMC4xNjgxOTc2OTk2NDkzMjk4NTsgb3B0aW1pemVseUJ1Y2tldHM9JTdCJTdEOyBfX3V0bWE9MjU2Mzc4ODg5LjkyOTAzMjMzMi4xNDU1OTE0OTAyLjE0ODg2NjIxODIuMTQ4OTE3Nzc5Mi4yMzQ7IF9fdXRtej0yNTYzNzg4ODkuMTQ4NTkxOTQ2Ny4yMTQuMTIudXRtY3NyPXNlYWtheWFraW5ndXNhLnNtdWdtdWcuY29tfHV0bWNjbj0ocmVmZXJyYWwpfHV0bWNtZD1yZWZlcnJhbHx1dG1jY3Q9L0x1bHVMaWx5L24tM3B3QjVTLzsgX2dhPUdBMS4yLjkyOTAzMjMzMi4xNDU1OTE0OTAyOyBfX2F1Yz0zMjE0YjZlYjE1MmZiNGE2NGNhMTY4MWRkZjU7IF9zcF9pZC4xYmQyPTM3MjAwMjM0Yjg5ZmU2MDAuMTQ1NTkxNDkwMy4yMTIuMTQ4OTE3ODA1MC4xNDg4NjYzMzc0OyBfX3FjYT1QMC0xODE0NzM0ODg4LTE0NTU5MTQ5MDI3Nzg7IHNwPTc2ZjNkZTNkLTlkMWMtNDZjYy04NDdkLTY2OTY0MTc2MTRhODsgc3A9NGNiMDAxNTEtZjI2ZC00MThkLWI3OTEtZDZiNjYyNzg3MTUzOyBpaHk9JTdCJTIyaGxpJTIyJTNBdHJ1ZSUyQyUyMmRmc2xvJTIyJTNBdHJ1ZSU3RDsgc3N0cmFjaz01ZTlmMDNlNi1mYjI0LTQ0ZGMtYTY3OC00Y2E1ZDU1Nzk1NmI7IF9jZWcucz1vbHl0amg7IF9jZWcudT1vbHl0amg7IGZibV81MTc1Njg4MDM2OD1iYXNlX2RvbWFpbj0uc211Z211Zy5jb207IF9tc3V1aWRfMDBscjV3emx5MD00QzY3MzBEQy00NjA5LTRFNjgtOEVERC1EMTA3MzhBN0EyNTI7IFNyZWZlcnJlcj1odHRwcyUzQSUyRiUyRnd3dy5zbXVnbXVnLmNvbSUyRjsgcmVtZW1iZXJLZWVwTWVMb2dnZWRJbj0xOyBpaG09JTdCJTIybCUyMiUzQSUyMjY1ZDY5NDNjNzQyY2ZlZjg0MjgxOGU2ZWFjNDMwZWZjJTIyJTJDJTIydCUyMiUzQSUyMjZkZjBjOGI3ZWMxODdjZjBkMDQxNDRhMzlhMDVkMDliYjU1MzRiOGElMjIlN0Q7IF9zdT03YzA3MTc2NDdjODEwYmZlMzAzN2QwZGIxMTc4MmQ1ODsgaXM9JTdCJTIyZXMlMjIlM0ElMjI2aElFSkd1ZU0xMm5RdDlCMFZaSzJDWllqT2FjOWtabnZIT2JjTlJWbnJQN2JGeSUyQnBnMFpmQSUzRCUzRCUyMiU3RDsgU01TRVNTPWJmZmVkYTRiNGU3ZDAxNWExYThkY2VlZGU4M2EyMzM1OyBvZD0lN0IlMjJzdHJpbmclMjIlM0ElMjIlN0IlNUMlMjJTTiU1QyUyMiUzQSU1QiU1QyUyMlNlYWtheWFraW5ndXNhJTVDJTIyJTVEJTJDJTVDJTIyViU1QyUyMiUzQTElN0QlMjIlMkMlMjJ0aW1lJTIyJTNBMTQ4OTE3Nzc4OSUyQyUyMnNpZ25hdHVyZSUyMiUzQSUyMk5qRXhaV0ZrTnpWaE1qRTNPR1l4WVRKak9HUmlOamN4Wm1Sa1pETXpNR1poWVRSaE16RXhOQSUzRCUzRCUyMiUyQyUyMnZlcnNpb24lMjIlM0ExJTJDJTIyYWxnb3JpdGhtJTIyJTNBJTIyc2hhMSUyMiU3RDsgX191dG1iPTI1NjM3ODg4OS41LjEwLjE0ODkxNzc3OTI7IF9fdXRtYz0yNTYzNzg4ODk7IF9fdXRtdD0xOyBfc3Bfc2VzLjFiZDI9KjsgZmJzcl81MTc1Njg4MDM2OD1OZERvdnNVV0FheWFBQi1qamE0TVZtVXdqZFlTb3Z4a0hndWFDbkIzZVNnLmV5SmhiR2R2Y21sMGFHMGlPaUpJVFVGRExWTklRVEkxTmlJc0ltTnZaR1VpT2lKQlVVTmhjVXhDU25kbGRIUnJlV0ZtWDJWSlZGVlJSaTAxYzFBd2VIUm1hMUZqZDFCcGVYaEphMGxOUVdaNVEwbFNibEJmVUZKMFpXcFJSVUpXUm5CalZFeDBiMFJtT0dGdlFWUmtkalptTUZGd1RtNURObm93V1d4VlRqSjFhME5SV1hsRlMzaFJUakpVWkU1VGFuTTVUMk55U3pKU1VGQkRXRXhxU1VSTVMyVjFiMGhxZFVSemRHSjRjVWhXYUd0NldGOXdkeTB3WWpKbU5VSTRTRzl0TURsblVVcFZZVTVFVDNneGIydGpSMHRvUlc1M09VZDJNMkpTVjIxVmVITnZVbHBTUkdOaGVVNVBVRXhwZEVzM1owMVhZV0ZzTjJKSlZUSnJhWGRZVmxabVIzZDZhVTFVTFd3Mk4yVjNURVY0ZWxOT01WY3dlRUpEWTB4TFkzSnpSbFl4VW1kVU1tMHhjMjlaZEVaUFpWTXlTaTFWTW1wV1N6QlVUM2x1Y0RCc1UzSkZkMXBoZHpKek1FTlBNRGwxZERab2RsUmZOMWQyYjJaa2NGUmhNWFV4VEd3MFNsVjFSMU4yV0U1VFMxQk1RV2RZY3poSk9IaDRaSGxNT0VwblJWUXhZWFpoTkhwRVJVeGhhbHB6TFZwdWRrSnpUMkZGV0dFNFYzcFZXR3hRTUZraUxDSnBjM04xWldSZllYUWlPakUwT0RreE56YzNPVEVzSW5WelpYSmZhV1FpT2lJMU16QTBOall6T0RJaWZROyBfX2FzYz01MzBmNWYyMDE1YWI5ZTljZmRmYjU2MjM4YWUAAAA8UmVmZXJlcjogaHR0cHM6Ly9zZWFrYXlha2luZ3VzYS5zbXVnbXVnLmNvbS9DaGxvZUcvbi1Wd3NSWHovAAAAbFVzZXItQWdlbnQ6IE1vemlsbGEvNS4wIChXaW5kb3dzIE5UIDEwLjA7IFdpbjY0OyB4NjQ7IHJ2OjUxLjApIEdlY2tvLzIwMTAwMTAxIEZpcmVmb3gvNTEuMC4xIFdhdGVyZm94LzUxLjAuMQAAAB5YLUZvcndhcmRlZC1Gb3I6IDIzLjMwLjE4MS4yMTQAAAAVWC1Gb3J3YXJkZWQtUG9ydDogNDQzAAAAGFgtRm9yd2FyZGVkLVByb3RvOiBodHRwcwAAABZDb25uZWN0aW9uOiBrZWVwLWFsaXZlCwGQAAAAFXN0YXRzLW5ldy5zbXVnbXVnLmNvbQsBmgAAACQ1ZTlmMDNlNi1mYjI0LTQ0ZGMtYTY3OC00Y2E1ZDU1Nzk1NmILemkAAABBaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvQ29sbGVjdG9yUGF5bG9hZC90aHJpZnQvMS0wLTAA","errors":[{"level":"error","message":"error: remainder of division is not zero (0.89552241563797 / 0.1)\n    level: \"error\"\n    schema: {\"loadingURI\":\"#\",\"pointer\":\"/properties/pixel_density\"}\n    instance: {\"pointer\":\"/pixel_density\"}\n    domain: \"validation\"\n    keyword: \"multipleOf\"\n    value: 0.89552241563797\n    divisor: 0.1\n"}],"failure_tstamp":"2017-03-10T21:31:23.795Z"}

My EMR cluster is failing at the Fix up bad rows step. I have tried the aws emr command above with the identity test from https://github.com/snowplow/snowplow/blob/master/3-enrich/hadoop-event-recovery/src/test/scala/com/snowplowanalytics/hadoop/scalding/SnowplowEventRecoveryJobSpec.scala and I did get raw rows recovered in my recovered S3 bucket. So I don’t think there is anything wrong with my aws emr command, but instead there might be something wrong with my JS.

I haven’t tested that function yet but the logic in it looks mostly pretty good, with the exception that the postPosition variable in the code snippet isn’t defined anywhere.

Thank you for catching that mike. I have made the correction here:

function process(event, errors) {

    var failedPixelDensity = false;

    for (var i = 0; i < errors.length; i++) {
            var err = errors[i];
            if (isBadPixelDensityError(err)) {
                    failedPixelDensity = true;
            } else {
                    return null;
            }
    }

    if (failedPixelDensity) {
	// seems that what needs to be done is that you must return
	// a base64 encoded event since that is what you started with
	
	// first base64 decode
	// var decodedEvent = decodeBase64(event)

	// base64 decoding creates tab separated string
        var fields = tsvToArray(event);

	// create dictionary of all query parameters
	// not too sure about it being position 11, but can change that
	var querystringDict = parseQuerystring(fields[11]);

	// get the context portion since this is what we want to edit
	var cxParameter = querystringDict['cx'];

	// base64 decode that now
	var cxContexts = decodeBase64(cxParameter);

	// convert resulting JSON string to javascript object
	var cx = parseJson(cxContexts)

	// get pixel_density value

	// round the pixel density to two decimal places
	// and replace the current value with rounded one
	for (i = 0; i < cx.data; i++) {
		var pd = cx.data[i].data.pixel_density;
		if (pd) {
			 cx.data[i].data.pixel_density = (pd).toFixed(2);
		}

	}

	// convert JSON object back to string and base64 encode it
	// and then replace the the cx key with that new value
	querystringDict['cx'] = encodeBase64(stringifyJson(cxJSON));

	// turn dictionary back into query string
	fields[11] = buildQuerystring(querystringDict);


	// return tsv of the array
        return arrayToTsv(fields);
	

    } else {
        return null;
    }
}

function isBadPixelDensityError(err) {
    return /remainder of division is not zero/.test(err);
}

If you could test this function with the row I provided in the previous message I would love to hear what you find. Even after I made the change, my EMR cluster is still failing at the Fix up bad rows step.

Also, is there a way to log information from the JS when it is processing so that I can use the EMR console to see this logging information?

Update, I did dig deeper into event-recovery/logs/j-KJSE7MML7QL1/hadoop-mapreduce/history/2017/03/16/000000/job_1489682963527_0002 through the EMR console and found an error message that might be related:

org.mozilla.javascript.EcmaError: TypeError: Cannot call method \"split\" of undefined (user-defined-script#12)\n\tat 

The issue might be when I call this function:

function parseQuerystring(qstr) {
  var query = {};
  var a = qstr.split('&');
  for (var i = 0; i < a.length; i++) {
    var b = a[i].split('=');
    query[decodeURIComponent(b[0])] = decodeURIComponent(b[1] || '');
  }
  return query;
}

What I pass back from my function is fields[11]. I picked `fields[11] since this is what all the examples provided shows is the query parameters, but maybe in my row it is not. Guessing here since not sure how to log this.

Could even be this function since it also calls split:

function tsvToArray(event) {
  return event.split('\t', -1);
}

That Javascript looks mostly correct but I think the major issue which I probably should have caught earlier from your Base64 example event is that I don’t think (could you confirm @alex?) that Hadoop Event Recovery doesn’t support Thrift records and may only support tab delimited input formats based on the issue here and the source here.

That’s correct, good catch @mike!

@mike - I see that those errors I was providing regarding errors I was finding with the split function (TypeError: Cannot call method “split” of undefined) triggered you to think of the file format. Glad that was helpful, but at the same time I spent the last 2 days working solid on this :frowning: But thank you for all your help.

@alex or mike - what is the workaround then? If there is no workaround, how can I do what I need to do if Hadoop Event Recovery is not an option anymore since I have rows that are not TSV?

I am able to base64 decode my bad rows, is there a way to convert it to a TSV file?

Thank you

I think it might (?) be possible to do this. The approach I’d consider would be to:

  1. Filter your bad rows down based on error message.
  2. Base 64 decode these rows which will give you the CollectorPayload Thrift record, deserialize this and attempt to transform it into a TSV equivalent.
  3. Use the above as an input to Hadoop Bad Rows and transform the pixel density field. Return the row as a TSV line.
  4. Run through Event Recovery.
  5. Load into Redshift.

I was able to deserialize the Thrift records. Did this with python using the thriftpy module.

I am at the step where I need to now transform this into a TSV equivalent. My question to you @mike is do you have an example format that I can shoot for. What I am saying is that I do not know where the tabs should be in a record and if you could give me an example of what that should look like, I probably will be able to make a TSV separated file that I can run through Hadoop Event Recovery.

Other thing I was thinking of is that do you think I can fix the pixel density field in the python script and keep it in a deserialized thrift format. I am asking if I can send that form of the record (deserialized thrift) into snowplow at some stage? Or do I have to send it through Hadoop Event Recovery?

Thank you

@mike - I tried over the past couple days to continue down the road of reprocessing these records.

I was able to base64 decode the enriched/bad/ row, deserialize it, correct the pixel_density, thrift serialize it again, write it out as a binary file, lzo compress that, and finally create a splittable lzo index file.

I then loaded the .lzo and .lzo.thrift file into a recovery S3 bucket. I modofied the ruby yaml file and ran EER. So I am trying to process it all the way from the beginning of my pipeline. After the EER job finishes I run the storage loader.

Nothing loads into Reshift so I went back to look at the enriched/bad/ records and see the following:

{"line":"/id2418615610w==","errors":[{"level":"error","message":"Error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 0 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}],"failure_tstamp":"2017-03-22T02:15:19.702Z"}
{"line":"Z/ziAiglucomsnowplowanalyticssnowplow/CollectorPayload/thrift/1+0+0Juic=","errors":[{"level":"error","message":"Error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 0 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}],"failure_tstamp":"2017-03-22T02:15:20.059Z"}

It seems that maybe I am not creating the correct thrift or lzo or lzo.index files.

Do you think it is possible to reprocess records the way I am describing? Have you ever successfully reprocessed thrift records after first modifying them or know of anyone who has been successful doing that?

I appreciate all your help.

I’m not sure if this approach is possible.

I’ve written a bit of code below to convert the raw Thrift payload into one or more TSV events (in the Clojure collector format at the moment) which could then be provided as input to the recovery process. This should be relatively simple for GET (single event) but I haven’t tested this on POST (with multiple events) yet.

import json
from datetime import datetime
from urllib.parse import urlencode

def build_row(raw_payload, method, event_index=0):
    """Returns a single tab delimited row from a Thrift payload"""
    ts = datetime.fromtimestamp(raw_payload.timestamp/1000)
    if method == 'POST':
        # if POST decode the payload and select the event_index'ed event
        json_object = json.loads(raw_payload.body)
        event = json_object.get('data')[event_index]
        query_string = urlencode(event)
    else:
        query_string = raw_payload.querystring
        
    formatted = [
        ts.strftime('%Y-%m-%d'), # date
        ts.strftime('%H:%M:%S'), # time
        '-',
        '-',
        raw_payload.ipAddress, # client IP
        method,
        '????', # cs(Host) does this matter?
        raw_payload.path, #cs(uri-stem)
        '200',
        raw_payload.refererUri,
        raw_payload.userAgent,
        query_string, # if method=POST use raw_payload.body instead
        '-',
        '-',
        '-',
        '-',
        '-'
    ]
    
    return '\t'.join(formatted)


def thrift_to_tsv(raw_payload):
    """Returns a list of rows in TSV format."""
    if raw_payload.path == '/i':
        method = 'GET'
        return [build_row(raw_payload, method)]
    elif raw_payload.path == '/com.snowplowanalytics.snowplow/tp2':
        method = 'POST'
        rows = []
        event_index = 0
        json_object = json.loads(raw_payload.body)
        for event in json_object.get('data'):
            event_row = build_row(raw_payload, method, event_index)
            rows.append(event_row)
            event_index += 1
        return rows

Thank you @mike! I will take a look at this. One other quick question. Have you ever used the thriftpy python module to write a payload out to a file? I am able to serialize the payload, but don’t know if thriftpy has a way to write that out to file. I have written to a wb file using Python, but not sure if that is the way to do it.

I don’t think thriftpy offers a way out of the box to write it to file, but writing the raw serialized bytes is probably the best way to do it (using ‘wb’ as you’ve mentioned).

This conversation is continuing in this thread:

http://discourse.snowplow.io/t/writing-thrift-from-enriched-bad-rows/1067

Hi @dkeidel

I can suggest a bit different approach. If you are using Realtime data pipe and make backup of raw data, you may consider to pickup backups, process them and put the back to raw Kinesis stream. I have been fixing contextual information and unstructured events data with python script and no thrift library. The only tricky thing is mangling in thrifts manually. I was able to reprocess about 3,2M events per 40 minutes.

@grzegorzewald - Thank you for the reply. I too have considering doing something like this so I am happy to hear that it is possible.

Yes, we are using “Realtime” pipeline.

When you say keep backups, are you referring to raw/archive/ on S3? I think that is what you are referring to. We do keep those but they are in thrift. But it sounds like you are not decompressing or deserializing these backups.

Would you be willing to share you process with me in Python?

@mike - I am attempting to convert the deserialized thrift into a TSV format that I can use. I do this and save it to file. Just one record while testing.

I then run this through Hadoop Event Recovery as you suggested in a previous comment. Hadoop Event Recovery then outputs that to the directory I specify. When viewing the file it looks like this:

2017-03-10	12:56:42	-	-	24.186.156.106	GET	????	/i	200	http://www.alexkaplan.photography/Weddings	Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36	uid=gJLLpc4N5iYpw&vid=5&f_fla=1&cd=24&vp=2048x1023&f_qt=0&f_wma=0&cs=UTF-8&tz=America%2FNew_York&duid=adcaac695cac9ee2&tv=js-2.4.1&res=2048x1152&f_java=0&f_realp=0&fp=118507526&f_pdf=1&f_ag=0&cookie=1&f_gears=0&ds=2048x1023&lang=en-US&e=pv&dtm=1489179402212&url=http%3A%2F%2Fwww.alexkaplan.photography%2FWeddings&p=web&cx=eyJkYXRhIjogW3siZGF0YSI6IHsiaXNfdHJpYWxfYWNjb3VudCI6IGZhbHNlLCAiYWN0aW9uX3NvdXJjZSI6ICJVc2VyIiwgImlzX3BhcnRpY2lwYW50X2FjY291bnQiOiBmYWxzZSwgImlzX2xvZ2dlZF9pbiI6IHRydWV9LCAic2NoZW1hIjogImlnbHU6Y29tLnNtdWdtdWcvdXNlci9qc29uc2NoZW1hLzEtMC0wIn0sIHsiZGF0YSI6IHsicGl4ZWxfZGVuc2l0eSI6ICIxLjI1In0sICJzY2hlbWEiOiAiaWdsdTpjb20uc211Z211Zy9kZXZpY2UvanNvbnNjaGVtYS8xLTAtMSJ9XSwgInNjaGVtYSI6ICJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0wIn0=&tna=primary&eid=e2e2fde6-e647-48e7-b728-93a254b916b5&f_dir=0&aid=Website&page=Weddings%20-%20AlexKaplanPhoto	-	-	-	-	-

Its file name is part-00000. I change it to match the file name structure of the Cloudfront format.

I then take this file and put it into another S3 bucket and change my YAML config file to grab the input from there for EER and also change the format in my YAML to format: tsv/com.amazon.aws.cloudfront/wd_access_log.

When I run EER it fails.

I look at the output in enriched/bad/ and see the following:

```{"line":"2017-03-10\t12:56:42\t-\t-\t24.186.156.106\tGET\t????\t/i\t200\thttp://www.alexkaplan.photography/Weddings\tMozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\tuid=gJLLpc4N5iYpw&vid=5&f_fla=1&cd=24&vp=2048x1023&f_qt=0&f_wma=0&cs=UTF-8&tz=America%2FNew_York&duid=adcaac695cac9ee2&tv=js-2.4.1&res=2048x1152&f_java=0&f_realp=0&fp=118507526&f_pdf=1&f_ag=0&cookie=1&f_gears=0&ds=2048x1023&lang=en-US&e=pv&dtm=1489179402212&url=http%3A%2F%2Fwww.alexkaplan.photography%2FWeddings&p=web&cx=eyJkYXRhIjogW3siZGF0YSI6IHsiaXNfdHJpYWxfYWNjb3VudCI6IGZhbHNlLCAiYWN0aW9uX3NvdXJjZSI6ICJVc2VyIiwgImlzX3BhcnRpY2lwYW50X2FjY291bnQiOiBmYWxzZSwgImlzX2xvZ2dlZF9pbiI6IHRydWV9LCAic2NoZW1hIjogImlnbHU6Y29tLnNtdWdtdWcvdXNlci9qc29uc2NoZW1hLzEtMC0wIn0sIHsiZGF0YSI6IHsicGl4ZWxfZGVuc2l0eSI6ICIxLjI1In0sICJzY2hlbWEiOiAiaWdsdTpjb20uc211Z211Zy9kZXZpY2UvanNvbnNjaGVtYS8xLTAtMSJ9XSwgInNjaGVtYSI6ICJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0wIn0=&tna=primary&eid=e2e2fde6-e647-48e7-b728-93a254b916b5&f_dir=0&aid=Website&page=Weddings%20-%20AlexKaplanPhoto\t-\t-\t-\t-\t-","errors":[{"level":"error","message":"Access log TSV line contained 17 fields, expected 12, 15, 18, 19, or 23"}],"failure_tstamp":"2017-03-24T02:15:27.723Z"}```

It appears that there are 17 fields in the list you build, but it is wanting 12, 15, 18, 19, or 23. I am trying to map what they will be, but that is cumbersome task.

My question to you is first, am I doing all the steps that you do when you run this and do you change up the filename? Do you think that your python is missing a few fields? Or have you successfully ran a TSV through EER?
Do you know where all the required fields might be listed out?

Thank you