Mismatching ids after deduplication

Hello everyone,

I have an issue with ids for duplicate events after shredding step - they are mismatching between 2 bound tables (“atomic.events” and “atomic.custom_event” respectively).
Let me show some sample events to illustrate the issue.

There are following events sent from the tracker:

event_id, collector_tstamp
4da86136-f1f7-41c2-9084-fc15f8d1c579,2021-06-17 23:59:59.000
fff0bfb5-b95f-4dfe-92f5-374080d71ac0,2021-06-17 23:59:59.000
05aee854-828d-4dab-8b42-069e9d80c7f0,2021-06-17 23:59:59.000
a9b5c2b2-0556-4218-9437-11238eeca64f,2021-06-17 23:59:59.000

2aaa096d-d9df-4cf5-b82c-4dc63d2a7963,2021-06-18 00:00:01.000
fb920ec4-ad3e-41c4-a966-484938c9e6e8,2021-06-18 00:00:01.000
05aee854-828d-4dab-8b42-069e9d80c7f0,2021-06-18 00:00:01.000
a9b5c2b2-0556-4218-9437-11238eeca64f,2021-06-18 00:00:01.000

The difference between each duplicate is “dvce_sent_tstamp” field only, other fields are the same. Those events successfully enriched and sent to shredder.
By the way, we’re not using “Event fingerprint enrichment” in our enrichment step.
I expect shredded to detect duplicates and do some deduplication work, so there would be either 6 or 8 events (keep only first events, delete others or keep all events). However, I receive a set of 12 different identifiers.

There are 8 records in “atomic.events” table, 4 for unique records and 4 for duplicate records:

event_id, collector_tstamp
4da86136-f1f7-41c2-9084-fc15f8d1c579,2021-06-17 23:59:59.000000
fff0bfb5-b95f-4dfe-92f5-374080d71ac0,2021-06-17 23:59:59.000000
33726166-4356-4cfe-a02c-baf049a676f5,2021-06-17 23:59:59.000000
7bc53840-9268-4ea4-8dcd-20b3458252a8,2021-06-17 23:59:59.000000
2aaa096d-d9df-4cf5-b82c-4dc63d2a7963,2021-06-18 00:00:01.000000
fb920ec4-ad3e-41c4-a966-484938c9e6e8,2021-06-18 00:00:01.000000
b0062a23-515c-4357-93c2-c7c51b866364,2021-06-18 00:00:01.000000
fd5f2337-e7b8-401e-8526-62194d32178e,2021-06-18 00:00:01.000000

There are 8 records in “atomic.custom_event” table, 4 for unique records and 4 for duplicate records:

root_id, root_tstamp
4da86136-f1f7-41c2-9084-fc15f8d1c579,2021-06-17 23:59:59.000000
fff0bfb5-b95f-4dfe-92f5-374080d71ac0,2021-06-17 23:59:59.000000
2a448b51-a45f-4b80-8953-575b7f37a816,2021-06-17 23:59:59.000000
6e861595-d4ec-4c0a-a3e4-6bcca8488a9f,2021-06-17 23:59:59.000000
2aaa096d-d9df-4cf5-b82c-4dc63d2a7963,2021-06-18 00:00:01.000000
fb920ec4-ad3e-41c4-a966-484938c9e6e8,2021-06-18 00:00:01.000000
88e6bd45-10b5-4b07-b1ac-6074d7e0929b,2021-06-18 00:00:01.000000
fb563b1e-015c-47b2-a69e-b503b9bd1764,2021-06-18 00:00:01.000000

Also, there are 4 records in duplicates table, which points to correct events from the input (original_event_id):

root_id, root_stamp, original_event_id
16b72348-3414-4bbe-a900-b5ef41e9ec21,2021-06-17 23:59:59.000000,05aee854-828d-4dab-8b42-069e9d80c7f0
52257594-c68f-4da5-aef8-1bd1facfb438,2021-06-17 23:59:59.000000,a9b5c2b2-0556-4218-9437-11238eeca64f
d449aed8-9584-45e5-a1b7-8e583fd487c3,2021-06-18 00:00:01.000000,05aee854-828d-4dab-8b42-069e9d80c7f0
4145459f-112b-4672-bf82-3370e012195d,2021-06-18 00:00:01.000000,a9b5c2b2-0556-4218-9437-11238eeca64f

So, both tables has new records, but their ids are mismatching, so there is no way for me to do sql join. Even more, those ids are not matching to anything in “atomic.duplicates”.

Could someone, please, explain what’s happening during deduplication? And what can I do to really remove duplicates?

Hi @Vasiliy_Pribytkov ,

Duplicates are a very tricky issue but I’ll try to give you a quick explanation of what you’re seeing.

There are two types of “duplicates”, which we call “natural” and “synthetic”.

Natural duplicates are duplicates in the true sense of the word. They are copies of the same event with some slight differences. Most commonly the difference is in when the event was sent / collected, since many trackers would re-try sending events when they are not acknowledged by the collector.

Synthetic duplicates are often completely different events, which happen to have the same event_id. A common source is bot traffic.

The strategies for dealing with duplicates are different, depending on what type they are. For natural duplicates, the solution is to keep only one copy and discard the rest. For synthetic ones, the solution is to assign a new event_id to each event.

In the shredding step, the data first goes through natural deduplication, and then through synthetic deduplication. This is described in more detail here.

How does the Shredder know which duplicates are natural and which ones are synthetic? It compares the payloads. Events with the same event_id and the same payload are natural duplicates. Events with the same event_id but different payloads are synthetic duplicates.

How does the Shredder compare payloads? As I noted earlier, the payloads even for natural duplicates are not identical. There are slight differences, usually in timestamps. So we need to compute an identifier for the payload, which excludes the fields that might reasonably be different. This is what the event_fingerprint enrichment is doing.

Since you have this enrichment turned off, the shredder uses a random UUID in place of the event_fingerprint. This way, all natural duplicates are considered to have unique payloads and they do not get deduplicated at that step. In the synthetic deduplication step, they all get assigned new event_id. A context that matches the new event_id to the old one is attached and ends up in your duplicates table.

As you can see from your last example, each of the duplicate events received a new event_id and that table tells you what the new event_id is.

The only unexplained bit is that the new event_id from the duplicates table does not match the event_id in the other tables you cite in your examples. Is it possible though that the events in your examples are completely unrelated to the duplicates? Can you please check if you can find the events with event_id IN ('16b72348-3414-4bbe-a900-b5ef41e9ec21', '52257594-c68f-4da5-aef8-1bd1facfb438', 'd449aed8-9584-45e5-a1b7-8e583fd487c3', '4145459f-112b-4672-bf82-3370e012195d') in atomic.events?

Thank you for the such detailed reply @dilyan !
And special thanks for the details about shredding step. I didn’t know Snowplow handles duplicates in that order.

This is what the event_fingerprint enrichment is doing.

Looks like it will be very helpful to enable this enrhichment in our pipeline.

Since you have this enrichment turned off, the shredder uses a random UUID in place of the event_fingerprint

Is this generated UUID temporary or should it be written to the database? Currently I see only “null” value for event_fingerprint field.

event_id IN (‘16b72348-3414-4bbe-a900-b5ef41e9ec21’, ‘52257594-c68f-4da5-aef8-1bd1facfb438’, ‘d449aed8-9584-45e5-a1b7-8e583fd487c3’, ‘4145459f-112b-4672-bf82-3370e012195d’)

I executed such a query and in the whole table there were no records with such identifiers (about 31 billion records).

Yes, the generated UUID that stands in for the event_fingerprint is temporary and not saved anywhere.

On the missing duplicates, could you please share which version of RDB shredder you are using? It sounds a bit like a bug we fixed.

We’re using rdb_shredder: 0.13.0

Alright, then I definitely recommend you upgrade.

We fixed this bug in 0.16.0, this bug in 0.17.0 and this bug in 0.18.0.

This sounds very similar to what I’m expirienced.
Which version would you recommend upgrade to? Will we have any compatibility issues if we upgrade straight to 1.1.0 (I’m concerned because of major version change)?

I’d recommend upgrading to 1.1.0.

You can follow the upgrade guide for 1.0.0 here and then upgrading to 1.1.0 should be easy.

As part of the upgrade to 1.1.0, we do recommend that you first follow the R35 Upgrade Guide, which will get you to version 0.19.0 where all the bugs I mentioned have been fixed.

Do let us know if you stumble upon any difficulties.

We decided to upgrade to 0.18.2 first and check if current issue is resolved.
Upgrade to latest version seems a bit complicated, so we postponed it, at least for now.

We have some issues with upgrade to versions even prior 1.0.0. We’ve tried to upgrade to 0.18.2 and 0.16.0, both attempts are failed. Both versions are crashing at start with same exception:

Warning: Skip remote jar s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.18.2.jar.
21/07/05 14:59:08 INFO RMProxy: Connecting to ResourceManager at ip-10-22-3-42.ec2.internal/
21/07/05 14:59:08 INFO Client: Requesting a new application from cluster with 5 NodeManagers
21/07/05 14:59:08 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
21/07/05 14:59:08 INFO Client: Will allocate AM container, with 6143 MB memory including 558 MB overhead
21/07/05 14:59:08 INFO Client: Setting up container launch context for our AM
21/07/05 14:59:08 INFO Client: Setting up the launch environment for our AM container
21/07/05 14:59:08 INFO Client: Preparing resources for our AM container
21/07/05 14:59:10 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
21/07/05 14:59:12 INFO Client: Uploading resource file:/mnt/tmp/spark-5d5d9392-f101-4de1-a745-d202aed4db0c/__spark_libs__4078113607070151782.zip → hdfs://ip-10-22-3-42.ec2.internal:8020/user/hadoop/.sparkStaging/application_1625496705659_0006/__spark_libs__4078113607070151782.zip
21/07/05 14:59:15 WARN RoleMappings: Found no mappings configured with ‘fs.s3.authorization.roleMapping’, credentials resolution may not work as expected
21/07/05 14:59:16 INFO Client: Uploading resource s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.18.2.jar → hdfs://ip-10-22-3-42.ec2.internal:8020/user/hadoop/.sparkStaging/application_1625496705659_0006/snowplow-rdb-shredder-0.18.2.jar
21/07/05 14:59:16 INFO S3NativeFileSystem: Opening ‘s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.18.2.jar’ for reading
21/07/05 14:59:18 INFO Client: Uploading resource file:/mnt/tmp/spark-5d5d9392-f101-4de1-a745-d202aed4db0c/__spark_conf__1376634497366301087.zip → hdfs://ip-10-22-3-42.ec2.internal:8020/user/hadoop/.sparkStaging/application_1625496705659_0006/spark_conf.zip
21/07/05 14:59:18 WARN DFSClient: Caught exception
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
at java.lang.Thread.join(Thread.java:1326)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:609)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:370)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:546)
21/07/05 14:59:18 INFO SecurityManager: Changing view acls to: hadoop
21/07/05 14:59:18 INFO SecurityManager: Changing modify acls to: hadoop
21/07/05 14:59:18 INFO SecurityManager: Changing view acls groups to:
21/07/05 14:59:18 INFO SecurityManager: Changing modify acls groups to:
21/07/05 14:59:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
21/07/05 14:59:18 INFO Client: Submitting application application_1625496705659_0006 to ResourceManager
21/07/05 14:59:18 INFO YarnClientImpl: Submitted application application_1625496705659_0006
21/07/05 14:59:19 INFO Client: Application report for application_1625496705659_0006 (state: ACCEPTED)
21/07/05 14:59:19 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1625497158351
final status: UNDEFINED
tracking URL: http://ip-10-22-3-42.ec2.internal:20888/proxy/application_1625496705659_0006/
user: hadoop
21/07/05 14:59:20 INFO Client: Application report for application_1625496705659_0006 (state: ACCEPTED)
21/07/05 14:59:21 INFO Client: Application report for application_1625496705659_0006 (state: ACCEPTED)
21/07/05 14:59:22 INFO Client: Application report for application_1625496705659_0006 (state: ACCEPTED)
21/07/05 14:59:23 INFO Client: Application report for application_1625496705659_0006 (state: FAILED)
21/07/05 14:59:23 INFO Client:
client token: N/A
diagnostics: Application application_1625496705659_0006 failed 1 times due to AM Container for appattempt_1625496705659_0006_000001 exited with exitCode: 15
For more detailed output, check application tracking page:http://ip-10-22-3-42.ec2.internal:8088/cluster/app/application_1625496705659_0006Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1625496705659_0006_01_000001
Exit code: 15
Stack trace: ExitCodeException exitCode=15:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 15
Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1625497158351
final status: FAILED
tracking URL: http://ip-10-22-3-42.ec2.internal:8088/cluster/app/application_1625496705659_0006
user: hadoop
Exception in thread “main” org.apache.spark.SparkException: Application application_1625496705659_0006 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1104)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1150)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
21/07/05 14:59:23 INFO ShutdownHookManager: Shutdown hook called
21/07/05 14:59:23 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-5d5d9392-f101-4de1-a745-d202aed4db0c
Command exiting with ret ‘1’

And service started successfully after we’ve reverted back to 0.13.0.

Just a small update - we successfully upgraded to 0.16.0, that’s should be enought to resolve described in this topic issue.