All events do not arrive to Snowplow pipeline after being sent in bulk using AsyncEmitter in Python

Hi,

I am trying to send a backlog of historical events stored in a database to our GCP streaming setup of Snowplow. I’m using Python to format the events locally and sending them using the AsyncEmitter with the following settings:

def failure(num, arr):
    print(str(num) + " events sent successfully!")
    print("These events were not sent successfully:")
    for event_dict in arr:
        print(event_dict)

e = AsyncEmitter("ourSnowplowSetupOnGCP", protocol='https', thread_count = 10, buffer_size= 10,  on_failure=failure)

And then simply looping through the events, sending them one by one via a tracker with the AsyncEmitter:

      for index, event in self.data.iterrows():
         tracker.send_event(event) ### this function first formats and sends the event using the track_struct_event() method

I ran a test with the above setup for 80 000 events which were sent in around 20s to our pipeline without any warnings or errors being raised on my end. However, only 700 of the sent events arrived in our BigQuery table. After analyzing the backend we saw no indication that any events were “lost”, not received, or still being processed by our pipeline.

Does anyone have any ideas on how to solve this issue? Or any experience sending events in bulk with the AsyncTracker in Python?

Depending how your test is set up, it is possible that your test completes running and shuts down before the events have been sent. Do you have any Thread.sleep() (sorry, not sure what the Python version is) steps in your test?

Edit: ah - time.sleep(0.5) will sleep your main thread for half a second

2 Likes

You can also try e.flush() once you’ve “sent” them all.

Hello Miranda,

That might be the case, after inspecting the source code for the AsyncEmitter again I see that the created threads are daemon treads:

        for i in range(thread_count):
            t = threading.Thread(target=self.consume)
            t.daemon = True
            t.start()

From the python threading module docs:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property or the daemon constructor argument. Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly.

I will look into this further

Seems like the threads created by AsyncEmitter were indeed being interrupted by the main thread exiting. I can now keep the threads running by using

for thread in threading.enumerate():
    print(thread.name,'isAlive()',thread.is_alive(),'isDaemon=', thread.daemon)
    if thread.name == 'MainThread':
        continue
    thread.join()

But since the threads use the following method they never terminate even if all events are sent (… I think):

    def consume(self) -> None:
        while True:
            evts = self.queue.get()
            self.send_events(evts)
            self.queue.task_done()

I recently had a similar problem using the Java tracker. I had modified the built-in demo “simple-console” to send 10 000 events to my Micro pipeline. Like you, only a fraction of them arrived. Adding some sleep after flushing all the events solved it.

The final lines were something like this:

// Track some events
for (int i = 0; i < 10000; i++) {
    tracker.track(getPageViewEvent);
}

// This flushes all remaining events as well as shutting down the threadpool
emitter.close();

// Allow 5 seconds for events to be processed
Thread.sleep(5000);

Something similar might work for you.

1 Like

Are your test events all arriving now at least?

Hello again

Yes, when I force it to run using thread.join() the events arrive as they should. This is all I need essentially as this will be a one-time load - even if it is a bit inelegant. Thank you for the solution. :slight_smile:

However, when testing to flush after adding the events to the emitter (as Paul also suggested) without forcing the threads to continue running. I get into the same issue but just delaying the main thread termination for the duration of the sleep.

This allows more events to get sent to the pipeline before exiting, but the question then becomes how long to sleep() to allow all events to get sent. Or am I missing something here with the effect flush has on the running threads of the AsyncEmitter?

Glad you’ve got the events!

Yeah flush() won’t affect running threads. It just avoids having to process the events in small batches. For the sleep(), it’s kind of trial and error but I would err on the side of a longer wait so that it still works if e.g. you try again from a cold start on Monday morning.

This problem is quite artificial - sending 80000 events basically all at once. So I don’t know if more advanced thread handling is needed for the Python tracker in production. We’ll discuss it within the Trackers team.

Another mitigation I tried when I was playing with the Java tracker tests was to add short sleep()s after every 5 or so events. The event-sending threads were getting blocked from accessing the event queue otherwise. The structure of the Java tracker is quite different though from the Python tracker so I don’t know if this would help. I’m not so familiar with the Python tracker.

Welcome to the community btw :slight_smile:

2 Likes

Just wanted to point out that the AsyncEmitter in Python does seem to have a synchronous version of flush: sync_flush(). Perhaps that could wait for all the events to be sent before returning (but I should mention that I haven’t tried it).

I’m glad to see this came to a positive outcome, I’ll just add for anyone coming across it in the future - this seems like an issue I’ve come across in testing, where the reason some of my events were failing was basically down to sending too many events at the same time (ie my script ran faster than my poor laptop’s network ports could handle - especially as I wasn’t willing to close my thousands of carefully curated and all totally necessary browser tabs).

As has been pointed out, they get retried if we wait long enough, but in my case I needed as consistent throughput of data as possible to keep my tests as consistent as I could manage. I found that adding a few ms sleep every n iterations stopped the events from failing in the first place. (I believe my most recent iteration with the python tracker was like 2ms delay every 100 events but YMMV).

Best still need to flush & sleep for a second or so at the end with this approach though, just to be safe on the last few events.

1 Like

you should just use defined functions " [ get_event_loop() ] , [ set_event_loop() ].set_event_loop), and [ new_event_loop() ] .new_event_loop)" this is specifically to optamization.