collection evolution and source-defined schema #1988
jgraettinger
started this conversation in
Ideas
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Part 1: Vision
Framing the Problem
Estuary collections have schemas: a contract that details what valid
documents of the collection may or may not look like.
The platform enforces this contract whenever documents are written or read.
When teams first create a data flow, there’s a single understanding of schema.
Everybody agrees on it, data flows along, and stakeholders are happy.
This situation typically doesn’t last long.
Soon enough new requirements impact what data is represented or structured,
and rolling out changes to various data flows requires some kind of coordination.
The kind of coordination depends on key questions: which schema, and who owns it?
Source-Defined Schema
Most captures in Estuary are considered “source defined”.
There is an upstream system or application which has a strong opinion of schema.
The capture’s job is to follow along, updating collection schema
to reflect the evolving understanding of schema from the source.
Some source systems, such as databases, are able to tell the capture of schema changes as they happen.
Other systems cannot, and the platform can only infer a useful schema
from the actual documents which it encounters as the capture progresses.
Target-Defined Schema
Other data flows are "target defined". Here, a downstream system is authoritative
and source documents must be rejected if they’re invalid with respect to its schema.
For example, the Estuary HTTP Ingest capture will reject WebHook requests which are
invalid with respect to the destination collection's current schema.
A Matter of Perspective
Any data process that uses a source-defined schema for analysis or transformation
must necessarily place some expectations on that source
-- at least over the portions it interacts with --
which effectively reinterpret the source as being target-defined.
A
SELECTquery, for example, is an implicit declaration that a set of table columns ought to exist.If they don't, or if their types or meaning have changed,
your query or DBT model will fail and you must fix it.
Or, worse, it will appear to work but produce incorrect data.
In any meaningful data flow, there exists the potential for
its components to disagree with respect to schema.
Tools in the data integrations space have historically ignored this concern,
choosing to propagate source schema changes downstream no matter what.
This approach just pushes the problem down onto applications or DBT models
which are ill-equipped to rectify or even detect the mismatch.
Robust data pipelines require that owners have tools to detect and resolve
disagreements early in the flow of data,
before "bad" data is propagated into downstream systems.
A Matter of History
Database Change Data Capture (CDC) leverages the database write-ahead log to extract changes.
While a database table can tell you its current schema,
CDC is concerned with historical events
-- transactions that have committed --
and a curiosity of CDC is that events being processed may have happened
under a different table schema than the one which exists now.
Such events are inexorably part of the table's history,
but are invalid with respect to its current schema,
and CDC tooling must account for this drift.
A related design goal of Estuary collections is that they’re a durable
record of change events which are captured once and may then be read many times.
Collection schema could change over time, but collections must still
usefully represent change events captured long ago under a much different schema.
Key Platform Features
The Estuary platform has several features to help manage
the domain concerns of source- vs target-defined schema and schema history.
Write vs Read Schema
Estuary collections have distinct write and read schemas.
A collection write schema constrains new documents being added by a capture or derivation task.
If a task produces an invalid document,
the task is halted rather than allowing the invalid document in to the collection.
To resolve this condition, either the write schema is updated to permit the document or data is fixed in the source system.
Collection read schemas constrain the existing documents of a collection,
which are being read by a derivation or materialization task.
If a task encounters a collection document which is invalid to its read schema, the task is halted.
Resolution requires updating the read schema to allow the invalid document,
which is also an opportunity to inspect and correct the assumptions of the downstream flow
before it processes data that could violate those assumptions.
Write and read schemas may change at any time and are largely independent of one another.
Each may be very restrictive, or very permissive, as is required by the use case.
In most cases, schemas are managed by the platform and are a lower-level building block
for higher-level coordination and automation.
Auto Discovery
Estuary capture tasks periodically query source systems for current schemas,
and apply those schemas as updates to the write schema of bound collections.
Auto Inference
Every Estuary collection has an automatically inferred schema,
which is the most-restrictive schema that is valid for all
source-defined schemas encountered over the collection's life-cycle.
Conceptually, an inferred schema is like shrink wrap:
it's exactly the right size to accommodate and enclose all of the collection's historical schemas to-date.
As new schemas are encountered, the platform will enlarge or "widen"
the inferred schema as necessary to fit them.
Where possible, capture connectors inform the platform of updates to
source-defined schemas as they're observed within the source system.
In other cases, an upstream application dictates the schema and
and the capture source system doesn't have a known schema available
(typical with MongoDB, for example).
The platform will additionally widen the inferred schema
as is required to accept every document actually written to the collection.
Inferred schemas do nothing on their own,
but are frequently referenced from within a collection's read schema.
Auto Discovery is the automated mechanism by which a collection's write schema is updated,
and Auto Inference is the automated mechanism by which a collections's read schema is updated.
Column Migrations
The inferred column types of a collection commonly widen over time:
a field which was previously an integer may become numeric,
or a native float might become an arbitrary-precision number.
Wherever possible, Estuary materializations perform automatic migrations of
such widened types through an in-place cast of the existing column.
Materializations never "narrow" column types,
which would require a cast that could fail (from a number to an integer, for example).
Narrowing the type of an existing column requires a backfill of the materialization binding.
Auto Inference only widens and never narrows a collection's schema,
so such backfills can only ever be required due to a user-initiated change
which is outside of the bounds of schema automation provided by the platform.
Collection Reset
At times the intended structure of a collection has changed so dramatically that
it's necessary to start over, rather than updating the existing collection in-place.
Estuary collections offer a "reset" capability,
which is semantically identical to deleting and then re-creating the collection
but expressed as a single operation.
When a collection is reset,
it's inferred schema is re-initialized and all historical data is logically dropped
(but remains in its storage bucket).
The configured collection key and any logical partitions may change during a reset.
Any capture or materialization tasks which bind the reset collection
as a target or source will automatically perform a backfill.
::: note
If a task binding is currently disabled,
an automatic backfill will occur in the future should it be re-enabled.
A backfill will similarly occur if the re-enabled collection
was manually deleted and then re-created, as separate operations.
:::
Integrity Checks
The Estuary control-plane ensures that:
Automated Handling of Failed Checks
If the key or types of a collection change, its capture may be configured to automatically reset it.
This is driven by the Auto Discovery process.
If a materialization has an incompatible column, its derivation may be configured to automatically backfill it.
TODO(johnny): Should we be? If we handle all widening cases,
and narrowing can only be user-initiated, shouldn't we require that they explicitly backfill?
Part 2: Implementation
Capture protocol: SourcedSchema
We'd add an extension of the capture protocol:
SourcedSchemais applied alongsideCaptureddocuments to widen the current schema.doc::Shapewith the write schema:because those writes would cause a schema violation.
SourcedSchemasays a key type has changed.SourcedSchema, use cases like vector embeddings should "Just Work"{minItems: 1536, maxItems: 1536}constraints aren't modified if documents have 1536 items.(Implementation ref: #1946)
An outcome that's achievable with
SourcedSchemais that the platform can "tail" and propagate the schema changes of a capture source even if no actual rows have been captured from that source binding yet. For example, the table columns created by the materialization of a collection would quickly reflect its source table's schema, even if the table itself is empty.One implementation call-out we've identified is that connectors should convey source field null-ability in their Discover responses but not in the
SourcedSchemaevents they emit. The rationale is:SourcedSchemawith types other than null, and let document-based inference widening kick in if a document is observed where it is, in fact, null (which usually never happens).flow://relaxed-write-schema
Inferred collections have historically used:
This doesn't work for schema-full systems like PostreSQL CDC, because the write schema is not a strict subset of every
historical source schema, as happens to be the case with (say) MongoDB.
Yet, we still want write schemas to be a comprehensive schema representing the current database write contract.
We should not, for example, strip away all non-key fields.
We do require certain JSON schema keywords which, today, are in the write schema:
title/descriptionreducedefaultif,then,else)secret,contentEncoding, etc.Some of these keywords could potentially be conveyed via
SourceSchemaand projected into theflow://inferred-schema.But not all of them: Conditional validation keywords, in particular, cannot survive projection into
doc::Shapeand back.Ergo, we'll need some other schema having these keywords
which is composed with
flow://inferred-schemato build a useful read schema.We could update the capture protocol to have the connector emit such a schema.
However this is work for every connector to do, and an observation is that
this would (in practice) essentially be the current write schema,
but with validation keywords removed which might conflict with the inferred schema.
This leaves
flow://relaxed-write-schema, a "special" schema which resolves totype,format,required, etc) recursively removed.It's ... a little weird, but it also appears to work well in practice,
resulting in essentially the same schema as a connector might produce.
Another concern is the brittleness of taking a hack-saw to connector validation keywords and hoping to not break things.
I don't think this concern holds much merit, however, because use of
flow://relaxed-write-schemaimplies that the connector opted in to such use, making it the connector's responsibility
that its
Discoveredschemas work withflow://relaxed-write-schemain practice.Generation ID
Generation IDs are a life-cycle identifier for a task or collection
which changes whenever that task or collection is deleted and recreated or is reset.
It allows the control plane to reconcile whether a state is "before reset" and "after reset".
Generation IDs already exist, but have been un-named to date.
They exist as a "busting" identifier which is suffixed onto
a collection's journal name template and/or task's shard ID template.
We would further surface this identifier so that it can be inspected
and compared when evaluating the before / after state of a task binding or transform.
Notably, bindings would examine the generation ID to understand if a backfill is required.
Tracking inactive bindings / transforms
This change was already motivated out of a desire for better re-initialization of materialized tables.
We need to track inactive bindings & transforms which have been active in the past, for each task.
These would be keyed on the resource path of each binding.
Their retention is partially motivated by the need to compare the Generation ID of the source / target collection
to understand whether a backfill is required.
Resource Path Pointers
All materialization connectors must surface resource path pointers
in their Spec response.
We already do this for captures, and need to bring materializations to parity.
The
validationcrate must have these available consistently so that it can matcha current binding model to its last active or inactive binding specification.
It must be able to tell whether a backfill should occur prior to calling Validation
because the generation ID of the binding has changed.
Specifically, we should:
resource_path_pointerscan be omitted / computed if theresource_config_schemaincludesx-collection-nameandx-schema-nameannotations, which we already populate today for all materializations.x-collection-nameis present, thenresource_path_pointersare implied by the locations of these annotations.resource_path_pointers.resource_path_pointers.resource_path_pointers.Beta Was this translation helpful? Give feedback.
All reactions