Skip to content

Commit a7e7b45

Browse files
[cherry-pick] sui-kvstore: fix BigTableConnection init_watermark (#26225) (#26228)
## Summary Cherry-pick of #26225 onto `releases/sui-v1.70.0-release` to unblock testnet v1.70.1 rollout. Without this fix, `sui-kvstore-alt` in v1.70.1 cannot read its prior watermark from the existing Bigtable table and restarts ingestion from `checkpoint=0`, which the upstream fullnode has long pruned. The kvstore indexer fails indefinitely, which cascades: - `kv-store-testnet-d9532e5` Bigtable table receives zero new rows - `kv-rpc` has no current data to serve - `sui-indexer-alt-graphql`'s `ledger_grpc` + `consistent` pipelines stall - `https://graphql.testnet.sui.io/graphql/health` returns 500 "Watermark lag is too high" Observed live in testnet after the v1.70.1 deploy today — all 3 `sui-kvstore-testnet` replicas running for 89+ min with zero rows written. ## Test plan - [x] Clean cherry-pick (no conflicts) - [ ] Deploy to testnet and verify `sui-kvstore-testnet` replicas resume from their prior Bigtable watermark (not 0) - [ ] Verify `https://graphql.testnet.sui.io/graphql/health` returns 200 and `consistent`/`ledger_grpc` lag drops to normal 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Evan Wall <evan.wall@mystenlabs.com>
1 parent 397af76 commit a7e7b45

1 file changed

Lines changed: 63 additions & 1 deletion

File tree

crates/sui-kvstore/src/bigtable/store.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,13 @@ impl Connection for BigTableConnection<'_> {
7171
pipeline_task: &str,
7272
_checkpoint_hi_inclusive: Option<u64>,
7373
) -> Result<Option<InitWatermark>> {
74-
self.delegate_to_reader_watermark(pipeline_task).await
74+
Ok(self
75+
.committer_watermark(pipeline_task)
76+
.await?
77+
.map(|w| InitWatermark {
78+
checkpoint_hi_inclusive: Some(w.checkpoint_hi_inclusive),
79+
reader_lo: None,
80+
}))
7581
}
7682

7783
async fn accepts_chain_id(
@@ -137,3 +143,59 @@ impl ConcurrentConnection for BigTableConnection<'_> {
137143
Ok(false)
138144
}
139145
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use super::*;
150+
151+
use crate::testing::BigTableEmulator;
152+
use crate::testing::INSTANCE_ID;
153+
use crate::testing::create_tables;
154+
use crate::testing::require_bigtable_emulator;
155+
156+
const PIPELINE: &str = "pipeline";
157+
const EPOCH_HI: u64 = 7;
158+
const CHECKPOINT_HI: u64 = 200;
159+
const TX_HI: u64 = 42;
160+
const TIMESTAMP_MS_HI: u64 = 99;
161+
162+
/// Spawn a BigTable emulator and return a connected store.
163+
async fn store_conn() -> (BigTableEmulator, BigTableStore) {
164+
require_bigtable_emulator();
165+
let emulator = tokio::task::spawn_blocking(BigTableEmulator::start)
166+
.await
167+
.unwrap()
168+
.unwrap();
169+
create_tables(emulator.host(), INSTANCE_ID).await.unwrap();
170+
let client = BigTableClient::new_local(emulator.host().to_string(), INSTANCE_ID.into())
171+
.await
172+
.unwrap();
173+
(emulator, BigTableStore::new(client))
174+
}
175+
176+
#[tokio::test]
177+
async fn test_init_watermark_returns_existing_on_conflict() {
178+
let (_emulator, store) = store_conn().await;
179+
let mut conn = store.connect().await.unwrap();
180+
181+
let watermark = CommitterWatermark {
182+
epoch_hi_inclusive: EPOCH_HI,
183+
checkpoint_hi_inclusive: CHECKPOINT_HI,
184+
tx_hi: TX_HI,
185+
timestamp_ms_hi_inclusive: TIMESTAMP_MS_HI,
186+
};
187+
conn.set_committer_watermark(PIPELINE, watermark)
188+
.await
189+
.unwrap();
190+
191+
// init must surface the existing committer watermark regardless of the input.
192+
let init = conn
193+
.init_watermark(PIPELINE, Some(0))
194+
.await
195+
.unwrap()
196+
.unwrap();
197+
assert_eq!(init.checkpoint_hi_inclusive, Some(CHECKPOINT_HI));
198+
// BigTable has no trailing-edge / reader watermark concept.
199+
assert_eq!(init.reader_lo, None);
200+
}
201+
}

0 commit comments

Comments
 (0)