Skip to content

CDK: Add hook to BaseCaptureConnector to perform online connector state migrations without merge patch#3815

Closed
JustinASmith wants to merge 1 commit intomainfrom
js/cdk-migrate-connector-state
Closed

CDK: Add hook to BaseCaptureConnector to perform online connector state migrations without merge patch#3815
JustinASmith wants to merge 1 commit intomainfrom
js/cdk-migrate-connector-state

Conversation

@JustinASmith
Copy link
Contributor

@JustinASmith JustinASmith commented Jan 22, 2026

Description:

This PR adds a new async method that allows connectors to migrate state across all bindings before processing begins. This ensures migrations complete before individual binding tasks start, preventing interleaved checkpoints causing state conflicts. This is helpful for efforts like #3626 where we need to migrate existing Production captures' states to a dictionary-based representation for effective resource sub-task and individual Shopify store state tracking.

The method is called during the open phase and checkpoints the migrated state with merge_patch=False when modifications are made.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

Added unit tests with Claude Code to test the following cases:

  • TODO...

Add a new async method that allows connectors to migrate state across all
bindings before processing begins. This ensures migrations complete before
individual binding tasks start, preventing interleaved checkpoints causing
state conflicts.

The method is called during the open phase and checkpoints the migrated
state with `merge_patch=False` when modifications are made.
@JustinASmith JustinASmith force-pushed the js/cdk-migrate-connector-state branch from b2a608e to 515bac1 Compare January 22, 2026 16:20
Copy link
Member

@Alex-Bair Alex-Bair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly glanced over the changes here, and I can take a closer look when this is ready for review.

I know I suggested adding this hook into the CDK, and I think it would still be useful for situations where we'd want/need to use merge_patch=True checkpoints. But after working on a single cursor to subtask state migration for source-outreach, I realized we can perform those types of migrations successfully with a merge patch checkpoint instead. The code snippet below shows how I was able to do it:

# Migrate from old single-cursor state to new subtask state format.
#
# JSON Merge Patch (RFC 7396) interprets null values as "delete this key".
# By including "cursor": None in the dict, we remove the old cursor field
# while adding the new realtime/lookback subtasks in a single checkpoint.
if isinstance(state.inc, ResourceState.Incremental) and isinstance(state.inc.cursor, datetime):
old_cursor = state.inc.cursor
migrated_inc_state: dict[str, ResourceState.Incremental | None] = {
"cursor": None, # Remove old cursor key via merge-patch
REALTIME: ResourceState.Incremental(cursor=old_cursor),
LOOKBACK: ResourceState.Incremental(cursor=old_cursor - LOOKBACK_LAG),
}
state.inc = migrated_inc_state
log.info("Migrating incremental state.", {
"migrated_inc_state": migrated_inc_state,
})
await task.checkpoint(
ConnectorState(bindingStateV1={binding.stateKey: ResourceState(inc=migrated_inc_state)}),
)

Can you see if specifying "cursor": None alongside the migrated subtask state in a single merge patch checkpoint works for your use case in source-shopify-native? I'm leaning towards avoiding merge_patch=True checkpoints as much as possible since they rely on connector developers to preserve all of the other ConnectorState and ResourceState fields during the migration, like last_initialized, is_connector_initiated, etc. My thought is that if we don't need to expose the capability for individual connectors to emit checkpoints before opening bindings yet, it might be better if we hold off adding that capability and public interface until we need it.

Comment on lines +139 to +141
await self._checkpoint(migrated_state, merge_patch=False)
# Update the open request to use migrated state
open.state = migrated_state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the capture protocol lifecycle (described here), captures should respond with Response.Opened before sending a Response.Checkpoint. Meaning, this migration logic that requires checkpointing should be after the line where the CDK sends Response.Opened:

await self._emit(Response(opened=opened))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants