How to stream bad events into s3 using flink Job

Do we have a way to read the enriched bad events from kinesis and store them into s3 with some schema using flink job.
Bad events are not decodable using scala code could any one help what i have to use or any sample already build can be used please do share.

Hi @Ashwini_Padhy,

We use the S3 Loader to sink from Kinesis to S3 - this applies to both the good and bad streams. There’s no need for a flink job to do so.

If the task is to get the data to S3, that’s all you need to do.

If you need to run a flink job over them for some other reason, then they are decodable. Assuming you’re using R118 then they’re self-describing JSON - here’s documentation on the different types of bad event.

If you’re on a version before R118 then it’s still possible to decode bad data, but it’s a lot of trouble, so the recommendation here is to upgrade to the latest version first. (This was the main motivation of that release :smiley: ).

Hope that’s helpful.

sure that is of decision of upgrade is not sure at this time so I wanted understand how the decode happens what thrift schema can be used in scala if any reference that would be great.

I’d highly recommend using the new bad rows format - but if you’d like to use the old bad rows format, I wrote about how to decode this in Python - it’ll be quite similar in Scala - Decoding real-time bad records (Thrift) [tutorial]

I have tried it it does not work for the line i get in the ba stream always keeps this error
“ThriftGrammerError: Grammer error ‘<’ at line 7”

For anybody finding this thread but using the new bad events format, this repo might be helpful:

3 Likes

Could you show an example of bad row that you’re trying to decode ?

Old bad rows were JSONs with these 3 fields only:

  • line
  • errors
  • failure_tstamp

See here. Only collector payloads are serialized with Thrift.

Yes this is the json field we have as well and the line we are unable to decode using scala .Do we have any thrift schema to be followed to decode any reference.Sample is as below

{
"errors": [
  {
    "level": "error",
    "message": "error: instance value (\"category\") not found in enum "
  }
],
"failure_tstamp": "2021-02-22T18:30:06.276Z",
"line": "CwBkAAAADjE5Mi4xNDMuNTcuMjAyCgDIAAABd8sBcjwLANIAAAAFVVRGLTgLANwAAAASc3NjLTAuMTUuMC1raW5lc2lzCwEsAAAAbU1vemlsbGEvNS4wIChpUGhvbmU7IENQVSBpUGhvbmUgT1MgMTRfNCBsaWtlIE1hYyBPUyBYKSBBcHBsZVdlYktpdC82MDUuMS4xNSAoS0hUTUwsIGxpa2UgR2Vja28pIE1vYmlsZS8xNUUxNDgLATYAAAAhaHR0cHM6Ly9wdWJhZHMuZy5kb3VibGVjbGljay5uZXQvCwFAAAAAIy9jb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvdHAyCwFUAAAKz3sic2NoZW1hIjoiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvcGF5bG9hZF9kYXRhL2pzb25zY2hlbWEvMS0wLTQiLCJkYXRhIjpbeyJlIjoidWUiLCJ1ZV9wciI6IntcInNjaGVtYVwiOlwiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvdW5zdHJ1Y3RfZXZlbnQvanNvbnNjaGVtYS8xLTAtMFwiLFwiZGF0YVwiOntcInNjaGVtYVwiOlwiaWdsdTpjb20uZ2xvYmVhbmRtYWlsL2NvbnRlbnRfaW1wcmVzc2lvbnMvanNvbnNjaGVtYS8xLTAtNVwiLFwiZGF0YVwiOntcImNvbnRlbnRcIjpbe1widG9wXCI6MixcImhlaWdodFwiOjE4MyxcIndpZHRoXCI6MzAwLFwiaWRcIjpcIjYzODk2Y2IwLWM2MWUtNDRiZi1iOGZmLWVmNWM3OGIwMjRlZVwiLFwiZmVhdHVyZVwiOlwicHJvbW8gYXBwIHM3MXg3MCBsb2FkZWRcIixcImxhYmVsXCI6XCJhIGxvb2sgYXQgdGhlIGZpcnN0IHRlbGwtYWxsIHRvIHNoYWtlIHRoZSByb3lhbHNcIixcInRvdGFsXCI6MSxcImFjdGlvblwiOlwiaW4tdmlld1wiLFwidHlwZVwiOlwic29waGktbWFya2V0aW5nLWNvbnRhaW5lclwiLFwidGltZXN0YW1wXCI6MTYxNDAxODYwNTAyMX1dfX19IiwidHYiOiJqcy0yLjE3LjAiLCJ0bmEiOiJzb3BoaVRhZyIsImFpZCI6Im5ld3MyNC13ZWJzaXRlIiwicCI6IndlYiIsInR6IjoiQWZyaWNhL0pvaGFubmVzYnVyZyIsImxhbmciOiJlbi1nYiIsImNzIjoiVVRGLTgiLCJyZXMiOiI0MTR4ODk2IiwiY2QiOiIzMiIsImNvb2tpZSI6IjEiLCJlaWQiOiI3Y2FkMjdlOC0xN2Y1LTQ5ODAtYmZkMi1jNzUyNTE2MDk4YzgiLCJkdG0iOiIxNjE0MDE4NjA1MDMzIiwiY28iOiJ7XCJzY2hlbWFcIjpcImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93L2NvbnRleHRzL2pzb25zY2hlbWEvMS0wLTBcIixcImRhdGFcIjpbe1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5nbG9iZWFuZG1haWwvY29ubmVjdG9yL2pzb25zY2hlbWEvMS0wLTRcIixcImRhdGFcIjp7XCJwYWdlVmlld1VVSURcIjpcIjBhZTBmNmQ2LTQ5OWQtNDNlNy1hMjM4LWVkMWIyZTNkYWMyOVwifX0se1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5nbG9iZWFuZG1haWwvZW52aXJvbm1lbnQvanNvbnNjaGVtYS8xLTAtOFwiLFwiZGF0YVwiOntcInN0YXRlXCI6XCJhY3RpdmVcIixcImVudmlyb25tZW50XCI6XCJwcm9kXCIsXCJjbGllbnRcIjpcIm1lZGlhMjRcIixcImNsaWVudENvbmZpZ1ZlcnNpb25cIjpcIjEuNFwiLFwiaXNBZEJsb2NrXCI6ZmFsc2UsXCJ0cmFja2VyVmVyc2lvblwiOlwiMi4xLjhcIixcInNjcmVlbk9yaWVudGF0aW9uXCI6XCJsYW5kc2NhcGVcIn19LHtcInNjaGVtYVwiOlwiaWdsdTpjb20uZ2xvYmVhbmRtYWlsL3Zpc2l0b3IvanNvbnNjaGVtYS8xLTAtNlwiLFwiZGF0YVwiOntcInR5cGVcIjpcIlJlZ2lzdGVyZWRcIixcImlzTG9nZ2VkSW5cIjp0cnVlfX0se1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5nbG9iZWFuZG1haWwvcGFnZS9qc29uc2NoZW1hLzEtMC0xMVwiLFwiZGF0YVwiOntcInR5cGVcIjpcImNhdGVnb3J5XCIsXCJicmVhZGNydW1iXCI6XCJzcG9ydDp0b3BzdG9yaWVzOlwiLFwic2VjdGlvbk5hbWVcIjpcIihub3QgcHJvdmlkZWQpXCJ9fSx7XCJzY2hlbWFcIjpcImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93L2NsaWVudF9zZXNzaW9uL2pzb25zY2hlbWEvMS0wLTFcIixcImRhdGFcIjp7XCJ1c2VySWRcIjpcIjAxM2QxMThjLTJmNTQtNGY1MS1hYzJkLTBhNGNhY2Q1ZGRlM1wiLFwic2Vzc2lvbklkXCI6XCJjMGEzZjIxOC1mM2VjLTQzYzYtYmUwZC0zNTJjMzFiNDdjNWFcIixcInNlc3Npb25JbmRleFwiOjY3MixcInByZXZpb3VzU2Vzc2lvbklkXCI6bnVsbCxcInN0b3JhZ2VNZWNoYW5pc21cIjpcIlNRTElURVwifX1dfSIsInZwIjoiMzAweDI1MCIsImRzIjoiMzAweDI1MCIsInZpZCI6IjE1OSIsInNpZCI6IjkwOTkzZWUyLTkwOGMtNDFkYS04YmZmLThhZTlhNjUzMGM2ZSIsImR1aWQiOiIxNjBmNDIzYy0yMjdjLTQ0YjYtOTI2Mi0zYzA3MjY1NjhiM2MiLCJ1aWQiOiJLM3d2bkhZQlRLZkpIZ2VDTExGNDNtR0FiMWQyIiwidXJsIjoiaHR0cHM6Ly9wdWJhZHMuZy5kb3VibGVjbGljay5uZXQvZ2FtcGFkL2Fkcz9jYXBzPWludGVyYWN0aXZlVmlkZW9faW5saW5lVmlkZW9fbXJhaWQxX21yYWlkMl9tcmFpZDNfdGhfYXV0b3BsYXlfbWVkaWF0aW9uX2F2X3Nka0FkbW9iQXBpRm9yQWRzX2RpX3RyYW5zcGFyZW50QmFja2dyb3VuZF9zZGtWaWRlb19hc29fc2Z2X2Rpbm1fZGltX25hdl9uYXZjX2N0X3Njcm9sbF9kaW5tb19nbHNfc2FpTWFjcm9fY3BjYmdfb21pZEVuYWJsZWRfZ2NhY2hlX2Fib2lfeFNlY29uZHMmZWlkPTMxODQ4MjQ2MSUyQzMxODQ5MjQ5NiZmb3JtYXQ9MzIweDUwX2FzJmpzPWFmbWEtc2RrLWktdjcuNjQuMCZwcmVxcz0xMTk1JnNlcV9udW09MTk4MyNjYXBzPWludGVyYWN0aXZlVmlkZW9faW5saW5lVmlkZW9fbXJhaWQxX21yYWlkMl9tcmFpZDNfdGhfYXV0b3BsYXlfbWVkaWF0aW9uX2F2X3Nka0FkbW9iQXBpRm9yQWRzX2RpX3RyYW5zcGFyZW50QmFja2dyb3VuZF9zZGtWaWRlb19hc29fc2Z2X2Rpbm1fZGltX25hdl9uYXZjX2N0X3Njcm9sbF9kaW5tb19nbHNfc2FpTWFjcm9fY3BjYmdfb21pZEVuYWJsZWRfZ2NhY2hlX2Fib2lfeFNlY29uZHMmZWlkPTMxODQ4MjQ2MSUyNTJDMzE4NDkyNDk2JmZvcm1hdD0zMjB4NTBfYXMmanM9YWZtYS1zZGstaS12Ny42NC4wJnByZXFzPTExOTUmc2VxX251bT0xOTgzIiwic3RtIjoiMTYxNDAxODYwNTM5MSJ9XX0PAV4LAAAADwAAAB9Ib3N0OiBlbmRwb2ludC5zb3BoaS5uZXdzMjQuY29tAAAAeVVzZXItQWdlbnQ6IE1vemlsbGEvNS4wIChpUGhvbmU7IENQVSBpUGhvbmUgT1MgMTRfNCBsaWtlIE1hYyBPUyBYKSBBcHBsZVdlYktpdC82MDUuMS4xNSAoS0hUTUwsIGxpa2UgR2Vja28pIE1vYmlsZS8xNUUxNDgAAAALQWNjZXB0OiAqLyoAAAAiQWNjZXB0LUVuY29kaW5nOiBnemlwLCBkZWZsYXRlLCBicgAAABZBY2NlcHQtTGFuZ3VhZ2U6IGVuLXVzAAAAKE9yaWdpbjogaHR0cHM6Ly9wdWJhZHMuZy5kb3VibGVjbGljay5uZXQAAAAqUmVmZXJlcjogaHR0cHM6Ly9wdWJhZHMuZy5kb3VibGVjbGljay5uZXQvAAAAH1gtRm9yd2FyZGVkLUZvcjogMTkyLjE0My41Ny4yMDIAAAArWC1Gb3J3YXJkZWQtSG9zdDogZW5kcG9pbnQuc29waGkubmV3czI0LmNvbQAAABVYLUZvcndhcmRlZC1Qb3J0OiA0NDMAAAAYWC1Gb3J3YXJkZWQtUHJvdG86IGh0dHBzAAAARFgtRm9yd2FyZGVkLVNlcnZlcjogdHJhZWZpay1leHRlcGaAAAAJDY3NThlNDEzLWViMjQtNDJhOC05NmU5LTBiY2JhNGE0YmMxZQt6aQAAAEFpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9Db2xsZWN0b3JQYXlsb2FkL3RocmlmdC8xLTAtMAA="

}

@Ashwini_Padhy, line is base64 encoded. Additionally, if any custom data or contexts was tracked base64 encoded too (properties ue_px and cx) they will also be base64 encoded. Thus, those would be encoded twice - the whole line and then those ue_px and cx if present.

In the example you shared, you do not have ue_px or cx. Thus, in this case you would only base64 decode line.

Yeah I know that line is base 64 encoded and tried to decode in scala but unable to do it.Is the any thrift scala package I can use to decode specifically?

Snowplow uses libthrift ( Maven Repository: org.apache.thrift » libthrift ) but really you can use any thrift library you’d like as they pretty much all implement the same underlying Thrift specification.

@Ashwini_Padhy

import java.util.Base64
println(new String(Base64.getDecoder.decode(<BASE64_ENCODED_STRING>)))

If you replace <BASE64_ENCODED_STRING> with the encoded value of line you will see your raw data.

Thrift schema can be found here.

How to use this schema file to decode if I have collector payload?

@Ashwini_Padhy, when you decode line you will see the raw event is composed of the fields described by the thrift schema. You are likely interested in the 10th field (body) if events are sent via POST. If sent via GET it would be the field 9 (querystring)

How to download this file into scala 2.12.i have tried in
val snowplowDeps = Seq(
“com.snowplowanalytics” %% “snowplow-scala-analytics-sdk” % “1.0.0”,
“com.snowplowanalytics” %% “iglu-core” % “1.0.0”,
“com.snowplowanalytics” %% “collector-payload-1” % “0.0.0”
)

but error in download
[error] (update) sbt.librarymanagement.ResolveException: Error downloading com.snowplowanalytics:collector-payload-1_2.12:0.0.0

Hi @Ashwini_Padhy, it’s not an importable package like that - we haven’t released it as an asset for the simple reason that it’s only used internally.

Interacting with collector payloads isn’t part of the planned use of the pipeline, and to date there’s never really been a good reason for someone to need to do it (beyond forking the enrich job itself).

So from that point of view, what you’re asking about here is essentially not supported - of course we and the community are happy to help show you what might help as we see above in this thread, but it feels as though we’ve hit territory where beyond what’s already been provided, I’m not sure how much more help we can offer here.

I suspect that your use case most likely falls under the category of things that can be done in a way that has been designed into the product. The canonical way to deal with bad data is to fix the issue that caused the bad data. The older versions of the pipeline didn’t provide good enough visibility over bad rows, and so we built a new version that improves upon it greatly.

If you’ve arrived at a point where you’re extracting the line from an old format bad row, and deserialising the payload, you’re basically re-implementing a solution for a solved problem - which is a boatload more effort than just updating your pipeline… So I think perhaps it would be more productive to describe what it is you’re trying to achieve, and why upgrading isn’t a sufficient solution here?

2 Likes