Real-time reporting using AWS Lambda and DynamoDB: a tutorial to compute the number of players in a game level on the Snowplow event stream (1/2)

1. Setting the scene

CodeCombat is a brilliant, open source game that makes learning programming fun.

The CodeCombat team wants to know, at any one time, how many people are active in each game level so that they can surface that data in the game map: providing players with a sense of how many other players are playing the same levels as them, at the same time. (You can view the map here.)

In this tutorial, we’ll walk through how that was accomplished by building an analytics-on-write application on top of CodeCombat’s Snowplow event stream in Kinesis, using AWS Lambda and DynamoDB. This tutorial is written for data analysts and engineers who are familiar working with data in an analytics-on-read world, via SQL or Spark on data in warehouses, to help them get familiar with processing data live from a Kinesis stream using an analytics-on-write, stream-processing toolset.

The example is deliberately chosen to illustrate how more complicated reporting (in this case, a count distinct on the number of users currently active in each level), can be accomplished in a relatively straightforward using the excellent Lambda and DynamoDB toolset provided by AWS.

2. Kinesis, Lambda and DynamoDB in a nutshell

2.1 Kinesis

Amazon Kinesis is a platform for building large scale, distributed, real-time data processing applications. Data can be written onto a Kinesis stream as an ordered set of records. (Ordered based on the sequence with which each record is written to the stream.) Data can then be read off that stream by separate applications, where each application reads the records on the stream in the same order that those records were written to the stream.

Data persists in the stream for up to 7 days. Multiple applications can read data from the same stream. Because each application maintains its own cursor position (i.e. knows where it is on the stream, so which record to process next), each application can consume the data at its own pace, independently of the other applications that might be writing or reading from the stream. As a result, Kinesis streams become very useful abstraction for breaking up complicated, real-time, data processing task into simple, independent services. Often these services are called ‘microservices’, to highlight that each, considered in isolation, is simple.

We use Kinesis streams extensively as part of the Snowplow Real Time event pipeline. Our collectors write data to a “raw stream”. We then have a separate process that reads from that raw stream, validates the data and enriches it, before writing it out to a new “enriched” stream suitable for processing by downstream data processing applications. In this tutorial, we will build a downstream data processing application, that will as a first step, read data from the Snowplow Enriched event stream.

2.2. Lambda

If Kinesis provides a convenient way to flow data between multiple independent data processing applications, then Lambda provides a very convenient way to write and run those individual data processing applications.

Lambda enables you to write one or more funtions that take individual data records from a Kinesis stream and do something with that data. Lambda then manages the process of executing those functions, on each record in the Kinesis stream, so you, as a data engineer or data analyst, only need worry about how you want that data processed, not how that code should be executed and scaled across your Kinesis stream(s). Code can be written in a variety of languages. (In this example we’re going to use Python.) It’s worth noting that Lambda can also be used to execute code on data arriving in a variety of other places (including S3 and API Gateway, for example). In this post we’re going to focus exclusively on Lambda as a convenient services for running stream-processing applications to process data in Kinesis.

2.3 DynamoDB

DynamoDB is a NoSQL database optimized for servicing applications that need very low latency responses at potentially massive scale.

For analysts used to doing analytics-on-read in SQL databases and datawarehouses (e.g. Redshift), Dynamo is a very different beast to get used to. So it is worth highlighting, in advance, what makes Dynamo so different to the SQL databases that dominate the analytics-on-read space:

  1. DynamoDB is well suited to storing the state of entities
  • In this particular example we want to know the number of players that are active in each level at this point in time. So we’re going to use Dynamo to keep track of that number, for each level. This will be tracked in a LevelState table
  • When we’re processing event streams, state is often inferred by looking at the data across an event stream. In this example, we might find out from one event that a player was in a particular level at a particular point in time. A minute later, we might receive another event that the player has started a second level. It is only when viewing the second event in the context of having seen the first event that we can infer that at that point in time the player started the second level, she must have left the first level. (Therefore we need to reduce the count of active players in the first level and increase the count of active players in the second level). To do this, we somehow need to persist the fact that the player was on the first level in the applications “memory”, so that we can respond appropriately (by decrementing the count of active players in level 1) when we see the player in level 2. This necessarily has to live outside of our Lambda functions, which only know about the specific event in Kinesis they are operating on. We’ll use DynamoDB to manage that persistence: at any point in time it will record the current state of each player.
  • This is a different use case to datawarehouses, where typically many events or ‘facts’ are stored as individual lines of data simultaneously and where each ‘event’ contain the state of relevant entities at the point in time when those events occurred. Then at query time it is straightforward to process the full event stream in the data warehouse to compute the state of any entity at any point in time, by looking at the full stream before and after the particular point in time in question.
  1. DynamoDB is optimized to enable a large number of concurrent queries to either write to or read from the database simultaneously, and return results in single digit milliseconds
  • This is very important in stream processing applications, where a large Kinesis event streams might be processed across multiple shards in parallel, so very many write requests might be made on a single table per second.
  • This is very different from datawarehouses, which are typically optimized to enable a smaller number of concurrent users (where each user is an analyst rather than a machine) the ability to efficiently run complicated queries on the data, that might return in seconds or minutes rather than single digit milliseconds.
  1. Sophisticated data processing does not occur in Dynamo itself
  • In our real-time process app, the data processing occurs outside of Dynamo,in Lambda. Dynamo is used to maintain state including being the store that the end result is available in
  • This is totally different to a Redshift-like datawarehouse, where a lot of heavy-duty computing would typically be done in the data in the datawarehouse, using the datawarehouse’s own querying and processing resources
  1. The Dynamo query API is optimized to handle multiple concurrent processes mutating data tables at the same time. It includes first class support for a range of updates that do not require that the application making the request check either for the existence or the value associated with a particular record prior to updating it: removing the requirement to read a value to perform an update (i.e. a mutation) on an existing data point. That includes:
  • Incrementing data
  • Creating a new record if one does not already exist, or updating an existing one if it does
  • Conditional updates that will only commit if a particular statement is true
  1. DynamoDB tables can be setup to emit an event stream with an event per update. These are called DynamoDB Streams
  • This means that it is easy to write applications that respond to changes in entity state, by writing Lambda functions that respond to the DynamoDB stream.
  • Because these applications consume the Dynamo stream, they do not need to query the Dynamo table itself. Contrast this with a “standard” database, where if you wanted to write an application that did something on a change in state, would have to regularly poll the database for such a change.
  • By utilizing the DynamoDB stream, we can avoid performing any reads on the table at all (instead relying on the DynamoDB stream to keep us updated with any changes to the table. The DynamoDB stream provides effectively a cleaned, simplified view of the changes of states of any of the entities we’re tracking over time.

3. The CodeCombat event stream

CodeCombat emits a rich event stream of c.100 different event types. Some of those events are emitted within levels, and some are not.

Events that are emitted within levels typically contain both a user ID field and a level_slug which indicates which level the event occurred in. A custom Javascript enrichment is used to consolidate this information into a single derived context with the following schema:

{
   "$schema":"http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
   "description":"Schema for a level context",
   "self":{
      "vendor":"com.codecombat",
      "name":"level_context",
      "format":"jsonschema",
      "version":"1-0-0"
   },
   "type":"object",
   "properties":{
      "user_id":{
         "type":["string", "null"],
         "maxLength":24
      },
      "world_name":{
         "type":["string", "null"],
         "maxLength":128
      },
      "level_name":{
         "type":["string", "null"],
         "maxLength":128
      },
      "level_slug":{
         "type":["string", "null"],
         "maxLength":128
      },
      "level_session":{
         "type":["string", "null"],
         "maxLength":24
      }
   },
   "additionalProperties":false
}

Our application is going to:

  • Consume the Codecombat event stream
  • Check if an event contains both a user_id and a level_slug
  • If so, identify and record what level that particular player is in at the event time. The player state will be maintained in a dedicated PlayerState table in DynamoDB
  • Use the PlayerState table in Dynamo to compute a LevelState table which will record how many players are active in each level. In practice, this will be computed using a Lambda that consumes the Dynamo stream for the player_state table.
  • Use the PlayerState table in Dynamo to compute a Transitions table in Dynamo that will keep track of the different transitions that have occurred in the last time period

Let’s walk through the process of building our application.

4. Defining our PlayerState table in DynamoDB

Our PlayerState table is going to be pretty simple:

  • It will have a PlayerId field, which will be the primary key for each record. This is a string.
  • Each record will in addition contain two fiels:
    • A levelId (string) field, that will record what level the player is on
    • A lastUpdated field, which is an integer. It will be set to the Unix timestamp that the last recorded event occurred, for that player in that level. This is important because we need to ensure that only events that happened after the most recent recorded event update the player state. (We don’t want a late arriving event to update an entry set based on a more recent event.) We also need a way to expire records that are more than a certain period old, so that if a user stops playing a level part way through effectively classify the user as having dropped out of the level after a certain period of inactivity.

4. Writing a Lambda that records the player state in DynamoDB

Our Lambda function for setting the player-state is going to work as follows:

  1. Read a record from the Kinesis stream
  2. If the record contains both a user_id and a level_slug field, make a conditional update to the player-state table that sets the player state to the level ID if either
  • The player state is not set (i.e. there is no record for that player). This will be the case if the player was not active in any levels prior to this event occurring
  • The player state is set, but the timestamp on the current event is more recent than the timestamp that was used to set the current player state.

Note that with DynamoDB we have a single UpdateItem API call, it’s a single API call that needs to be made. Dynamo will do the hard work of figuring out if a new record needs to be created, an existing record needs to be updated or no change needs to be made based on the PlayerId, LevelId and timestamp value included in the event.

The basic Lambda function is as follows:

def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))
    
    records = get_records(event)

    for record in records:
        snowplow_event_json = None 
        
        try:
            snowplow_event_json = transform(record)
        except:
            print("Ignoring badly formatted record in stream (failed to parse with SP analytics SDK")
            continue
        
        if ENRICHMENT_KEY in snowplow_event_json:
            
            for user_level_context in snowplow_event_json[ENRICHMENT_KEY]:
            
                if 'user_id' in user_level_context and 'level_slug' in user_level_context:
                    player_id = user_level_context['user_id']
                    level_id = user_level_context['level_slug']
                    
                    utc_dt = datetime.strptime(snowplow_event_json['collector_tstamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
                    timestamp = (utc_dt - datetime(1970, 1, 1)).total_seconds() * 1000 # seconds -> milliseconds
                    timestamp = int(timestamp) # remove fractional milliseconds (timestamp is now an integer)
                    
                    # sometimes these values are null, ignore those
                    if player_id is None or level_id is None:
                        print("{} is missing a player id ({}) or level id ({})".format(ENRICHMENT_KEY, player_id, level_id))
                    else:
                        now = timenow_millis()
                        print("player name = {}\ncurrent level = {}\ntimestamp = {}\ntime now = {}".format(player_id, level_id, timestamp, now))
                        update_player_level(player_id, level_id, now)
                else:
                    print("{} in unexpected format - cannot find 'user_id' or 'level_slug'".format(ENRICHMENT_KEY))
        
        else:
            print("Ignoring Snowplow JSON event without {} key".format(ENRICHMENT_KEY))

    return "Successfully processed {} Kinesis Records(s)".format(len(records))

The Lambda is run against a microbatch of multiple Snowplow events at a time. The first thing that the Lambda function does is process them using the get_records function:

records = get_records(event)

Our get_records function is defined as follows:

def get_records(update):
    data = []
    if "Records" in update:
        for record in update["Records"]:
            if "kinesis" in record and "data" in record['kinesis']:
                decoded_data = record['kinesis']['data'].decode('base64')
                data.append(decoded_data)
    return data

The Snowplow Enriched events in Kinesis are base 64 encoded: so the function base 64 decodes the data and returns it. So the above function base 64 decodes each event in our microbatch.

Each event returned is then transformed into a JSON format using the Snowplow Python Analytics SDK. This makes the event very easy to process. (Note that for simplicity the full Python Analytics SDK is included in the Lamba.)

A check is then made to see that the event contains the relevant context that contains both the user ID and the event ID:

if ENRICHMENT_KEY in snowplow_event_json:

where

ENRICHMENT_KEY = "contexts_com_codecombat_level_context_1"

Assuming it has, a check is made to see if both the user_id and level_slug parameters are set in the context, and if so, the level is updated:

        if ENRICHMENT_KEY in snowplow_event_json:

            for user_level_context in snowplow_event_json[ENRICHMENT_KEY]:

                if 'user_id' in user_level_context and 'level_slug' in user_level_context:
                    player_id = user_level_context['user_id']
                    level_id = user_level_context['level_slug']

                    utc_dt = datetime.strptime(snowplow_event_json['collector_tstamp'], '%Y-%m-%dT%H:%M:%S.%fZ')
                    timestamp = (utc_dt - datetime(1970, 1, 1)).total_seconds() * 1000 # seconds -> milliseconds
                    timestamp = int(timestamp) # remove fractional milliseconds (timestamp is now an integer)

                    # sometimes these values are null, ignore those
                    if player_id is None or level_id is None:
                        print("{} is missing a player id ({}) or level id ({})".format(ENRICHMENT_KEY, player_id, level_id))
                    else:
                        now = timenow_millis()
                        print("player name = {}\ncurrent level = {}\ntimestamp = {}\ntime now = {}".format(player_id, level_id, timestamp, now))
                        update_player_level(player_id, level_id, now)
                else:
                    print("{} in unexpected format - cannot find 'user_id' or 'level_slug'".format(ENRICHMENT_KEY))

        else:
            print("Ignoring Snowplow JSON event without {} key".format(ENRICHMENT_KEY))

Remember that not all events will contain the user_id and the level_slug: there might be events recorded against users who have not logged into the game, and other events that occur outside of specific levels.

The actual work updated the PlayerState table in DynamoDB is handled via the update_player_level function, defined as follows:

def update_player_level(player, level, timestamp):
    # write the record to dynamodb, IFF the update time we have is newer than the one in the database
    print("Changing level for {} to {} (if older than {})".format(player,level,timestamp))
    try:
        response = table.update_item(
            Key={
                'playerId': player,
            },
            UpdateExpression="set levelId = :level, lastUpdated = :timestamp",
            ConditionExpression="attribute_not_exists(lastUpdated) OR lastUpdated <= :timestamp",
            ExpressionAttributeValues={
                ':level': level,
                ':timestamp': timestamp
            },
            ReturnValues="UPDATED_NEW"
        )
        print("Level changed")
    except ClientError as e:
        if e.response['Error']['Code'] == "ConditionalCheckFailedException":
            print("Level change ignored - a newer record exists in the table")
        else:
            raise

This is the function some of the DynamoDB magic occurs. The Dynamo UpdateItem API call lets us make the update conditional: it will only go through if:

  • The PlayerState for this player is already recorded in the PlayerState table, but our event is more recent than the last event that was used to set that state. This is important because we cannot guarentee that events will arrive in chronological order: in the event that an older event is processed after a more recent event, the value of the PlayerState will remain unchanged. If the event we’re processing is more recent, the record in DynamoDB will be updated with the latest levelId and lastUpdated to reflect the most recent event.
  • The PlayerState for this player is not currently recorded. (E.g. because the playe was previously inactive - maybe this is the first event recorded in a new session.) In this case a new record will be created in DynamoDB for that player.

The complete Lambda can be found here.

5. Writing a Lamda that computes the level state

Now we have a DynamoDB table that records the current state of each of our players. We can use the DynamoDB stream assocated with this table to compute how many players are active in each level (which we’ll store in a second DynamoDB table). Our logic is very simple:

  • If we see a table update that a player has started a new level, we’ll increment the PlayerCount for that level
  • If we see a table update that a player has left a level, we’ll decrement the PlayerCount for that level

Our DynamoDB stream effectively provides a clean, easy-to-read event stream with a record each time a player enters a new level and leaves an old level. Note that the volume of data in it will be one or two orders of magnitude lower than the volume of data in the Snowplow enriched event stream: here there may be 100 events per player per level. In the DynamoDB stream, we’ll have only two corresponding events: 1 when the player entered the level, and another when the player left the level.

Our LevelState table in Dynamo looks as follows:

Our Lambda function looks as follows:

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))

    for record in event['Records']:
        print("***")
        levels = get_level_changes(record["dynamodb"])
        old_level = levels[0]
        new_level = levels[1]

        if old_level is None:
            print("previous level is undefined!")
        else:
            print("previous level: " + old_level)

        if new_level is None:
            print("current level is undefined!")
        else:
            print("current level: " + new_level)

        if old_level == new_level:
            print("Level unchanged")
        else:
            write_transition(old_level, new_level)

            if old_level is not None:
                decrement_level(old_level)

            if new_level is not None:
                increment_level(new_level)


    return 'Successfully processed {} records.'.format(len(event['Records']))

We start by using the get_level_changes function to identify what level the player has moved from, and to, from the DynamoDB stream record. This function is defined as follows:

def get_level_changes(record_change):
    old_level = None
    new_level = None

    if 'OldImage' in record_change:
        if 'levelId' in record_change['OldImage']:
            old_level = record_change['OldImage']['levelId']["S"]

    if 'NewImage' in record_change:
        if 'levelId' in record_change['NewImage']:
            new_level = record_change['NewImage']['levelId']["S"]

    return (old_level, new_level)

The function simply reads the relevant values from the DynamoDB stream.

Our LevelState table is updated via two simple functions, a increment_level function:

def increment_level(level):
    print("increase player count in " + level + " by one")
    response = table.update_item(Key={'levelId': level}, UpdateExpression="set playerCount = if_not_exists(playerCount, :initial) + :val", ExpressionAttributeValues={':val': 1, ':initial' : 0 }, ReturnValues="UPDATED_NEW")

and a corresponding decrement_level function:

def decrement_level(level):
    print("reducing player count in " + level + " by one")
    response = table.update_item(Key={'levelId': level}, UpdateExpression="set playerCount = if_not_exists(playerCount, :initial) - :val", ExpressionAttributeValues={':val': 1, ':initial' : 1 }, ReturnValues="UPDATED_NEW")

That’s it in terms of updating our LevelState table! The remaining write_transition function is used to update a separate table to keep tabs of the transitions between levels. We’ll return to this function later.

6. Pruning the LevelState table

As setup above, the LevelState table will correctly populate as players move into new levels and move between one level and the next.

What happens, however, if a player stops playing a level? In that case no more events will be received from the player. Because no specific event occurs to indicate that a player has stopped playing a level, we need a way to decrement a level counter if we haven’t received an event from a player on that level in a specific period of time.

This is easily accomplished by writing an additional PrunePlayerLevel function. It periodically runs through the PlayerState table looking for players where no update has been received in a specific period of time (in this case, 5 minutes), and removes that record in that instance. This causes the associated DynamoDB stream to record that the player has left the particular level they were last seen on, so that our SetLevelState Lambda correctly decrements the corresponding level record.

Our PrunePlayerLevel lambda makes use of the following function:

def clean_mia_players():
    # delete player records with a last updated time of before now - DELETE_OLDER_THAN_SECS environment variable
    now = timenow_millis()
    prune_older_than = now - (prune_duration_secs * 1000)
    print("Removing records older than {} ({} seconds ago)".format(prune_older_than,prune_duration_secs))

    fe = Key('lastUpdated').lt(prune_older_than) & Attr('levelId').exists();
    pe = "#player, lastUpdated"
    ean = { "#player": "playerId", }
    esk = None

    players_pruned = 0

    response = table.scan(
        FilterExpression=fe,
        ProjectionExpression=pe,
        ExpressionAttributeNames=ean
    )

    for i in response['Items']:
        clean_mia_player(i['playerId'], i['lastUpdated'])
        players_pruned += 1

    while 'LastEvaluatedKey' in response:
        response = table.scan(
            ProjectionExpression=pe,
            FilterExpression=fe,
            ExpressionAttributeNames= ean,
            ExclusiveStartKey=response['LastEvaluatedKey']
            )

        for i in response['Items']:
            clean_mia_player(i['playerId'], i['lastUpdated'])
            players_pruned += 1

    return players_pruned

It works through each entry in the PlayerState table, checking all values where the lastUpdated field is less than current time minus the timeout interval. It then prunes each of those values using the following clean_mia_player function:

def clean_mia_player(player, expected_age):
    print("{} hasn't been seen for a while (since {}, time now {} (age {}s))- marking as MIA".format(player, expected_age, timenow_millis(), (timenow_millis()-expected_age)/1000))
    try:
        response = table.delete_item(
            Key={
                'playerId': player,
            },
            ConditionExpression="attribute_not_exists(lastUpdated) OR lastUpdated <= :timestamp",
            ExpressionAttributeValues={
                ':timestamp': expected_age
            }
        )
    except ClientError as e:
        if e.response['Error']['Code'] == "ConditionalCheckFailedException":
            print("Player has returned - newer record exists in the table!")
        else:
            raise

The complete lambad can be found here.

Part 2 of this tutorial can be found here.

6 Likes