Materialization Scale-Out Strategies #2581
williamhbaker
started this conversation in
Ideas
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
This discussion describes the main categories that materialization connectors fall into and how they might operate when scaled up or down across more than one task shard, briefly:
See the sections below for more details about the proposed operation of these.
Not described in detail are the various at-least-once materializations that operate on non-transactional systems. My thinking for these is that scale-out should be fairly transparent for them since they aren't trying to maintain any exactly-once delivery guarantees.
Summary of Speculative Requirements
This is a consolidated "wish-list" of capabilities that would be required for fully enabling these scale-out strategies:
AcknowledgeStartCommitfor each task shard as part of a coordinated transaction, also for out-of-band coordinationIdempotent Shards with Document Counters
This is a new mode of operation that would be enabled by idempotent runtime transactions.
It's for systems that support an append-only mode of ingestion, where a "channel" is opened in against a destination table per writer shard, and documents + an offset token are written atomically to this channel. The channel must provide some mechanism for fencing. These kinds of systems support exactly-once reading from Kafka topics etc. where there is a stable read offset to delineate progress.
With idempotent runtime transactions, a materialization connector would treat the stream of documents it receives as if they were read from a source with a stable offset, and be able to write data to these systems with exactly-once guarantees.
The term "channel" comes from Snowpipe streaming, and the Snowpipe Streaming High Performance architecture would require this kind of materialization strategy. Other interesting applications are a potential exactly-once Kafka materialization with scaleout, BigQuery storage write API, and file-based materializations (S3, GCS, etc.) using conditional writes with
If-None-Match.Even with idempotent transactions a materialization isn't exactly reading from a Kafka topic, so it needs to do some additional accounting to get the desired outcome.
Briefly, the strategy would work like this:
Storefor that binding, and records this in its driver checkpointStoreare discarded if their document count is less than the offset from the channelIt's possible that an entire transaction of documents might be written to the destination channel, but the connector crashes before committing to the recovery log. In a case like this the entire transaction would be re-sent to the connector, but it would effectively ignore all the documents. This extreme case shouldn't be very common, and would be quite inefficient, but technically possible.
Channel Identifiers
Each binding gets its own channel for writing to each table, and each shard gets its own set of channels. It follows that the identifier for each channel needs to include the materialization name (or perhaps better, a globally unique task ID), the binding name, and a stable representation for the shard.
For the shard part, I can't think of any reason not to use the beginning key of the shard range. Alternatively both the start and end key of the range could be used, or some other kind of unique per-shard identifier. But it seems fine to just use the beginning key of the shard range.
Potential for Data Inconsistency
A strategy like this is straightforward for a single task shard or a task with a fixed number of shards. But data inconsistencies could happen if the shard configuration changes and there are partially applied transactions:
Scaling Down: Two shards covering ranges
[0, 5)and[5, 10)-> One shard covering[0, 10)[0, 10)and crashes before the driver checkpoint is committedScaling Up: One shard covering
[0, 10)-> Two shards covering[0, 5)and[5, 10)[0, 10)writes 14 documents to the channel[0, 5)and[5, 10)before the driver checkpoint is committed. Each range has half the documents[0, 5)shard will see that channel with starting range 0 has 14 documents. It will skip the first 14 documents received, and miss data[5, 10)shard will create a new channel and duplicate the first 7 documents it receivesThe mitigation for both of these scenarios is to require that once a transaction is prepared, that exact same transaction AND shard split must be used to finish the transaction - to the point where the driver checkpoint is committed. Put another way, a change in the number of shards should only become active after any prior prepared transaction has been fully processed.
Disabled bindings add an additional edge case to this: Typically an idempotent apply strategy would bypass the application of a disabled binding's data, and defer that until later if the binding is ever re-enabled. But doing this combined with a shard reconfiguration could result in similar data inconsistencies as described above. The practical mitigation here is probably just recognizing this as a thing that can happen and backfilling if it ever does, but I could see more strict enforcement like requiring that a binding have its backfill counter incremented if the task has multiple shards and the binding is going from disabled -> enabled. This "disabling a binding backfills it" paradigm is something we accept for SQL CDC captures so it doesn't seem like too extreme a restriction.
Post-Commit Apply with Coordinator
This is what we've generally thought of when we've considered materialization scale-out.
The idea is that there may be many writer shards, where each writer shard executes and processes its own load queries, writes data files for
Storeto a persistent staging location, and records these files in its checkpoint. Then the "coordinator" shard (which can just be shard 0) runs a single query to apply all of the staged files to the destination and clean up after that succeeds.For this to work the writer shards need to be able to tell the coordinator shard about their staged files, which will be in addition to the files the coordinator shard itself has staged.
As a speculative proposal, it may be sufficient for each shard to produce a driver checkpoint update in its
StartedCommitmessage, which are merged-patched together across shards to form the complete driver checkpoint, and then that combined checkpoint made available in theAcknowledgemessage. Shard 0 would act on this combined checkpoint fromAcknowledgeby applying the staged files.Authoritative Remote Store
The authoritative remote store strategy is used by OLTP database materializations like Postgres, and some OLAP systems like Redshift and MotherDuck. A runtime checkpoint is updated transactionally with storing new documents, and this checkpoint is the authoritative indicator of progress.
Currently we have this implemented as each shard having its own checkpoint, and fencing based on the key range covered by each shard. With the planned shuffled read system upgrades, it may not be possible for each shard to have its own checkpoint, and instead all shards will need to use the same runtime checkpoint.
Scale Out Option: Out of Band Coordination
One idea for this is to also use a writer and coordinator strategy, but for the communication to happen outside of the
StartedCommitdriver checkpoint /Acknowledgemechanism proposed in the "post-commit apply with coordinator" section. Writers would stage data in a way that was visible to the coordinator shard.This gets somewhat messy for a system like Postgres, where we'd need to create actual tables to stage data in rather than session-local temporary tables, and manage their lifecycle. It would be cleaner for something like Redshift or MotherDuck where data is staged to object storage.
For this to work a group of shards would need to know which other shards are participating in the transaction (maybe included in the connector's
Openmessage), and for the current transaction each shard would need to know of a common "transaction ID", which could be some random bits generated just for the current transaction (maybe in theStartCommitmessage).All shards of the materialization would then update a metadata table when receiving
StartCommitwith the transaction ID and whatever other data they need to communicate where their staged data is - maybe a list of files in object storage, or the name of a staging table. Shard 0 would poll the metadata table and once all shards report in, execute a query to apply the complete transaction's data. This query would update the checkpoints table with the full runtime checkpoint for the task.Pragmatically I think this approach would be best for OLAP systems with staged files, where it's best to run a single query covering lots of files, vs many queries each with a subset of files. The coordination via metadata table pushes some complexity on the connector, but avoids complexity for all connectors to deal with new protocol interactions.
As an aside, this coordination mechanism could conceivably be used for "Post-Commit Apply with Coordinator" materializations also, instead of communication via checkpoints. The modification would be the shards would coordinate through the metadata table for the current transaction ID, but Shard 0 would build a complete driver checkpoint from this to be persisted and
committed during acknowledgement, rather than transactionally updating a checkpoints table. I think it makes sense to have support for the post-commit apply pattern as part of the materialization protocol though, since this is the most efficient strategy for materializations to use generally and it avoids the need for any extra metadata tables.
Alternative: Document Counters
The basic idea for this would be to continue to use a kind of checkpoints table and update it transactionally with table updates per shard, but instead of a runtime checkpoint use a connector-specified representation of document counters for all bindings, similar to the strategy described in the "Idempotent Shards with Document Counters" section.
At it's simplest, this would be an all-or-nothing statement of if the transaction was applied to the target or not via new counts for all bindings, and would be used to discard
Storerequests that the connector determines have already been applied by comparing the persisted document counters vs. those from the driver checkpoint. If an updated document counter is persisted but the transaction is interrupted before the recovery log commit, the connector would know to discard allStorerequests for a retried transaction.An additional assumption here is that a transaction is idempotent with respect to the number of documents it contains and the keys it includes regardless of the documents returned from the connector's
Loadedresponses. For example, a materialization with hard deletions might apply a deleted document in a transaction by deleting a row, and then in a subsequent retry of thattransaction it would not retrieve a document from a load query since it was deleted, whereas prior to that it would have existed in the load query response.
An advantage of this strategy vs. OOB coordination is that it would not require separate staging tables for systems that don't use object storage. It would only be suitable if multiple concurrent writers are able to efficiently operate on the same table with non-overlapping keys, since each shard would run a separate query. For example, I could see this working well with Postgres as a destination, but not Redshift. So ultimately I am thinking we need to support both.
Beta Was this translation helpful? Give feedback.
All reactions