Redshift automatic table migrations RFC

As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we will be regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!

This RFC proposes new Redshift loading architecture with automatic table creation and migration done by RDB Loader instead of manual error-prone process.

Projects that need to be changed

  • Iglu Server - defining new endpoints
  • Iglu Client - using new endpoints
  • Schema DDL - core algorithms for migrations and shredding
  • RDB Shredder - embed Schema DDL and use its shredding mechanism to convert JSON to CSV
  • RDB Loader - new discovery algorithm and table alterations
  • EmrEtlRunner - to reflect changes in Shredder and Loader

Background

The current schema-creation process from users’ perspective includes following steps:

  1. Design JSON Schema and upload it to Iglu Registry
  2. Run igluctl to generate JSON Paths and DDL
  3. Upload JSON Paths to specific S3 folder
  4. Create a table from generated DDL

The current schema-update process (e.g. ADDITION, 1-0-1) from users’ perspective includes following steps:

  1. Identify if new schema (business model) is an ADDITION, REVISION or MODEL from SchemaVer perspective
  2. Upload the new schema to Iglu Registry
  3. Generate new JSON Paths and migration script
  4. Check if the migration script does what they want (otherwise change it manually)
  5. Overwrite existing JSON Paths file on S3
  6. Apply the migration script via psql (or similar)

The goal of new architecture is to reduce creation and update algorithms to single first and single second steps respectively.

There are three challenges that we need to address:

  1. JSON Paths (and DDL) files disposal
  2. Proper migrations
  3. Table alteration functionality (Loader should be able to perform DDL statements)

JSON Paths disposal

JSON Paths files is Redshift’s built-in mechanism to make sure that source data (RDB Shredder’s output in our case) in JSON format can be mapped into ordered list of columns in a table. In other words, in JSON, values are nested and addressed by keys and cannot have an order, but in table they must be ordered and JSON Paths make them match.

There are multiple problems and requirements imposed by this mapping approach (e.g. JSON Paths files must be on S3), hence disposal of whole Redshift JSON Paths approach was the first and easiest decision we had to make. Redshift provides at least three file formats that unlike newline-delimited JSON (or Avro) do not require any mappings: Parquet, CSV and ORC. We chose CSV (see reasoning in Appendix 1).

However, migration to CSV also brings some challenges and open questions.

  • We need a well-defined algorithm that maps JSON Schemas to “CSV Schemas” (intentionally, we’re not talking about Redshift - this must be a generic mechanism). We already have such algorithm in igluctl
  • We need to implement the shredding algorithm for per-value process (in igluctl it exist for per-schema)
  • The hardest challenge is to maintain a state for between schema versions (see "Proper migrations" challenge)

Proper migrations

The problem

Our SchemaVers have only partial ordering relationship by design.

In other words, having only list with 1-0-0, 1-0-1 and 1-1-0, without additional metadata, there’s no way to say that 1-1-0 was added before or after 1-0-1. User can add these schemas in any order they like and it is absolutely valid to start tracking 1-0-1 years after 1-1-0 (or even 2-0-0) was added and track them together. SchemaVers are ordered only within a migration (ADDITION, REVISION or MODEL).

Effectively it means that if we have three JSON Schemas:

  • 1-0-0 with a and b properties
  • 1-0-1 with optional c
  • 1-1-0 with optional d (REVISION can include type change AND property additions)

We cannot say if they must be converted into columns in a,b,c,d or a,b,d,c order, they both can be valid. If RDB Shredder will receive all these three schemas - it won’t be able to tell what shredding algorithm should be applied.

It is important however, that we know that 1-1-1 cannot go before 1-1-0 (there is an order within a migration), which means we cannot ignore later one when we encountered only former one.

The solution

Obvious approach is to build the relationship between different versions of one schema. This can be achieved in two ways:

  • Processing manifest - to store state for a pipeline, which can be used to deduce the order in which schemas were added to the datalake
  • Iglu registry - to get just an order in which schemas were added to the registry

With proper relationships we can get an order for schemas and hence an order for columns within a table.

Both solutions vary a lot philosophically and in implementations and we chose Iglu Server. Using Iglu means that client software (either RDB Shredder or Loader) should be able get an order in which schemas were added. The simplest implementation involves new endpoint (/vendor/name/format/ probably) listing schemas in proper order.

For each run, RDB Shredder would generate in-memory JSON Paths getters with following rules:

  1. Order properties in a single version of schema alphabetically
  2. Order properties in a single version of schema with NOT NULL first
  3. All properties added in subsequent ordered schemas should follow same two rules, but added to the end of the table

This follows exactly same algorithm we’re using in igluctl right now.

This also brings an important point that Iglu Server should start inspecting schemas for their relations with other schemas and prohibit invalid migrations and patches when necessary, i.e. we need to make sure that if schema that user adds as an ADDITION is indeed an ADDITION. At very least we need to filter out MODEL changes that user tries to represent as ADDITIONs or REVISIONs. At the same time, we’d like to leave an ability to patch schemas in some sitгations - e.g. when Server used as part of dev/snowplow mini environment - in these situations patches are acceptable. This should behavior should be enabled in server’s config (because we know whether behavior is desired in deploy time)

RDB Shredder Example

  1. RDB Shredder starts to process enriched data and encounters an event with com.acme/event/jsonschema/1-2-0
  2. RDB Shredder requests Iglu Server at com.acme/event/jsonschema and gets [1-0-0, 1-1-0, 1-2-0, 1-0-1, 1-2-1]
  3. RDB Shredder generates ordered in-memory JSON Paths getters
  4. RDB Shredder applies these getters to all encountered JSON entities, which effectively turns the JSON into CSV and saves all com.acme/event entities in s3://shredded/run=2018-12-11-12-10-00/vendor=com.acme/name=event/model=1/ directory
  5. RDB Shredder writes a manifest (explained in next section)

Race condition

Amount of columns and CSV and in table should always match. If user added a property (and schema) after RDB Shredder started to work and before RDB Loader started to load, RDB Loader at the beginning would apply a migration and table will contain more columns than source data. In order to avoid this, we can use a processing manifest or at least a manifest file (with format of processing manifest record), written after RDB Shredder finishes its job and containing only set of schemas it used. RDB Loader can check that manifest and decide to not apply the newest migration yet

Table alteration functionality

The most common operation - table creation is easy. We only need to check if table does not exist, generate in-memory DDL and perform it.

For migrations we can just count amount of columns and check if it matches the amount of expected JSON Paths getters. Without patches this should be fairly safe assumption to work with.

For next versions we can start working on DDL parsers, which would compare if table is identical to expected definition. This would help us to prevent mixing columns, which is unlikely to happen, but extremely nasty case to deal with (especially after Stream Enrich - there’s no way to clean-up one load).

RDB Loader Example

Goes after RDB Shredder.

  1. RDB Loader discovers data by inspecting S3
  2. RDB Loader fetches all schemas found in shredded data and generates an in-memory DDL
  3. RDB Loader compares in-memory DDL and current DDL fetched from PG_TABLE_DEF (system table containing definitions)
  4. RDB Loader performs migration if tables do not match
  5. RDB Loader loads the data

Other changes

This new algorithm implies following changes in EmrEtlRunner:

  • Archiving should take new paths in account
  • We want to make sure that RDB Shredder and Loader use exactly same algorithm. Which means their versions should be unified and we should not be able to use shredder-0.16.0 with loader-0.15.0. This should be reflected in config.yml

We need to make sure that Iglu Server cannot add a schema with wrong SchemaVer and that there’s no gaps between SchemaVers. First requirement is very important for consistency and can turn into significant amount of work.

Summary

We implement a common ground for JSON-to-CSV transformation (including DDLs and shredding algorithm) in Schema DDL library

Iglu Server has an endpoint with all version of a particular schema ordered by creation date (1-0-0, 1-1-0, 1-0-1)

RDB Shredder produces data in CSV, using mentioned shredding algorithm in Schema DDL

Every time Loader launched it checks if table contains the same amount of columns

Appendix 1. Binary columnar formats vs CSV

All available options have some pros and cons, mostly in terms of performance. However it is worth to note that no matter what option we choose, any of them must be significantly faster than current NDJSON format.

  • CSV - simple human-readable, standard de-facto in analytics world, compatible with effectively any tool. Fastest to produce (which is most important property for our use case)
  • Parquet - most popular (IMO) columnar format. Supports schema (which is not very important for our loading), considerably more compact (which is mildly important having that our datalake consists of enriched data) and significantly more efficient for querying (which is not important at all for our usecase, but opens up new opportunities for Redshift Spectrum)
  • ORC - Parquet analog. Seems to be more efficient, but at the same time surprisingly less popular

Avro won’t work because it is not columnar and we’ll have to have JSON Paths anyway.

CSV is the simplest option perfectly matching our needs (especially because it is faster to produce and works better with streaming), however other two columnar options has a serious advantage if we’ll consider adding Redshift Spectrum support - they’re extremely fast for querying data. With current format, Spectrum is not an option at all.

If future we should be able to leave this decision for users: whether they want to use Spectrum or only care about load performance. And RDB Shredder should be able to produce whatever data user choses.

Appendix 2. Evolution of proposed approach

For this RFC we tried to change as few things as possible in Iglu subsystem and ended up with just one new endpoint, which doesn’t change any aspects of the registry concept.

At the same time, there’s one unsolved problem - proposed approach will work only for new pipelines, where we entirely sure that no invalid SchemaVers uploaded and no legacy pre-igluctl tables deployed. If users with legacy table want to use new migrations - they should re-create all tables (which can be not a huge issue).

However, if we change the concept of a registry from just "serving schemas" to something that helps pipelines to map different schema formats between each other and start serving DDLs and JSON Paths (which is very well-aligned with what igluctl does) then we can:

  • Upload custom JSON Paths to map JSONs to legacy tables
  • Upload custom DDLs with e.g. user-picked encodings or even column names - with proposed solution we always need to rely on decisions made by Schema DDL
2 Likes