CDK: Add hook to BaseCaptureConnector to perform online connector state migrations without merge patch#3815
CDK: Add hook to BaseCaptureConnector to perform online connector state migrations without merge patch#3815JustinASmith wants to merge 1 commit intomainfrom
BaseCaptureConnector to perform online connector state migrations without merge patch#3815Conversation
Add a new async method that allows connectors to migrate state across all bindings before processing begins. This ensures migrations complete before individual binding tasks start, preventing interleaved checkpoints causing state conflicts. The method is called during the open phase and checkpoints the migrated state with `merge_patch=False` when modifications are made.
b2a608e to
515bac1
Compare
Alex-Bair
left a comment
There was a problem hiding this comment.
I briefly glanced over the changes here, and I can take a closer look when this is ready for review.
I know I suggested adding this hook into the CDK, and I think it would still be useful for situations where we'd want/need to use merge_patch=True checkpoints. But after working on a single cursor to subtask state migration for source-outreach, I realized we can perform those types of migrations successfully with a merge patch checkpoint instead. The code snippet below shows how I was able to do it:
connectors/source-outreach/source_outreach/resources.py
Lines 45 to 64 in 90de3c7
Can you see if specifying "cursor": None alongside the migrated subtask state in a single merge patch checkpoint works for your use case in source-shopify-native? I'm leaning towards avoiding merge_patch=True checkpoints as much as possible since they rely on connector developers to preserve all of the other ConnectorState and ResourceState fields during the migration, like last_initialized, is_connector_initiated, etc. My thought is that if we don't need to expose the capability for individual connectors to emit checkpoints before opening bindings yet, it might be better if we hold off adding that capability and public interface until we need it.
| await self._checkpoint(migrated_state, merge_patch=False) | ||
| # Update the open request to use migrated state | ||
| open.state = migrated_state |
There was a problem hiding this comment.
Per the capture protocol lifecycle (described here), captures should respond with Response.Opened before sending a Response.Checkpoint. Meaning, this migration logic that requires checkpointing should be after the line where the CDK sends Response.Opened:
Description:
This PR adds a new async method that allows connectors to migrate state across all bindings before processing begins. This ensures migrations complete before individual binding tasks start, preventing interleaved checkpoints causing state conflicts. This is helpful for efforts like #3626 where we need to migrate existing Production captures' states to a dictionary-based representation for effective resource sub-task and individual Shopify store state tracking.
The method is called during the open phase and checkpoints the migrated state with
merge_patch=Falsewhen modifications are made.Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
Added unit tests with Claude Code to test the following cases:
TODO...