Skip to content

Commit bf2b0c0

Browse files
authored
[ENH] wal3::copy implemented using scan/AWS copy (#4803)
## Description of changes This PR uses the `LogReader`'s standard `scan` method to select fragments for copying. It then issues a copy in parallel and writes them to the manifest as one list, without accounting for snapshots. Alternatives considered: - Potentially walk the snapshots and copy/paste snapshots. This is what was done before, but it could only prune to boundaries of snapshot pointers in the root/manifest of the tree. - Use the GC logic to prune. This was originally my intent, but it had a hidden downside: The GC would walk all old snapshots to determine what data it had to delete. Implementing a switch to skip this behavior essentially made two distinct functions in one, obviating the advantage of reusing the GC code. ## Test plan Integration tests cover copy. - [X] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes N/A
1 parent d7429b9 commit bf2b0c0

File tree

5 files changed

+53
-120
lines changed

5 files changed

+53
-120
lines changed

rust/wal3/src/copy.rs

Lines changed: 25 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,10 @@
1+
use std::sync::Arc;
2+
13
use chroma_storage::Storage;
24
use setsum::Setsum;
35

4-
use crate::manifest::{unprefixed_snapshot_path, Manifest, Snapshot};
56
use crate::reader::LogReader;
6-
use crate::writer::copy_parquet;
7-
use crate::{Error, Fragment, LogPosition, LogWriterOptions, SnapshotPointer};
8-
9-
pub async fn copy_snapshot(
10-
storage: &Storage,
11-
options: &LogWriterOptions,
12-
reader: &LogReader,
13-
root: &SnapshotPointer,
14-
offset: LogPosition,
15-
target: &str,
16-
) -> Result<SnapshotPointer, Error> {
17-
let Some(snapshot) =
18-
Snapshot::load(&options.throttle_manifest, storage, &reader.prefix, root).await?
19-
else {
20-
return Err(Error::CorruptManifest(format!(
21-
"snapshot {} does not exist",
22-
root.setsum.hexdigest(),
23-
)));
24-
};
25-
let mut dropped = vec![];
26-
let mut snapshots = vec![];
27-
for snapshot in &snapshot.snapshots {
28-
if snapshot.limit > offset {
29-
snapshots.push(
30-
Box::pin(copy_snapshot(
31-
storage, options, reader, snapshot, offset, target,
32-
))
33-
.await?,
34-
);
35-
} else {
36-
dropped.push(snapshot.setsum);
37-
}
38-
}
39-
let mut fragments = vec![];
40-
for fragment in &snapshot.fragments {
41-
if fragment.limit > offset {
42-
fragments.push(copy_fragment(storage, options, reader, fragment, target).await?);
43-
} else {
44-
dropped.push(fragment.setsum);
45-
}
46-
}
47-
let dropped = dropped.iter().fold(Setsum::default(), |x, y| x + *y);
48-
let kept_snapshots = snapshots
49-
.iter()
50-
.fold(Setsum::default(), |x, y| x + y.setsum);
51-
let kept_fragments = fragments
52-
.iter()
53-
.fold(Setsum::default(), |x, y| x + y.setsum);
54-
if dropped + kept_snapshots + kept_fragments != root.setsum {
55-
// NOTE(rescrv): If you see this error you have to figure out where data is lost. This
56-
// will require writing a test case rather than trying to deduce it from the setsums.
57-
return Err(Error::CorruptManifest(
58-
"Copying failed because the setsum was not balanced".to_string(),
59-
));
60-
}
61-
let depth = snapshots.iter().map(|x| x.depth + 1).max().unwrap_or(0);
62-
let snapshot = Snapshot {
63-
path: unprefixed_snapshot_path(kept_snapshots + kept_fragments),
64-
depth,
65-
setsum: kept_snapshots + kept_fragments,
66-
writer: "copy task".to_string(),
67-
snapshots,
68-
fragments,
69-
};
70-
snapshot
71-
.install(&options.throttle_manifest, storage, target)
72-
.await
73-
}
74-
75-
pub async fn copy_fragment(
76-
storage: &Storage,
77-
options: &LogWriterOptions,
78-
reader: &LogReader,
79-
frag: &Fragment,
80-
target: &str,
81-
) -> Result<Fragment, Error> {
82-
copy_parquet(
83-
options,
84-
storage,
85-
&format!("{}/{}", reader.prefix, frag.path),
86-
&format!("{}/{}", target, frag.path),
87-
)
88-
.await?;
89-
Ok(frag.clone())
90-
}
7+
use crate::{prefixed_fragment_path, Error, Limits, LogPosition, LogWriterOptions, Manifest};
918

929
pub async fn copy(
9310
storage: &Storage,
@@ -96,35 +13,36 @@ pub async fn copy(
9613
offset: LogPosition,
9714
target: String,
9815
) -> Result<(), Error> {
99-
let manifest = reader
100-
.manifest()
101-
.await?
102-
.unwrap_or(Manifest::new_empty("copy task"));
103-
let mut snapshots = vec![];
104-
for snapshot in &manifest.snapshots {
105-
snapshots.push(copy_snapshot(storage, options, reader, snapshot, offset, &target).await?);
106-
}
107-
let mut fragments = vec![];
108-
for fragment in &manifest.fragments {
109-
fragments.push(copy_fragment(storage, options, reader, fragment, &target).await?);
16+
let fragments = reader.scan(offset, Limits::UNLIMITED).await?;
17+
let mut futures = vec![];
18+
for fragment in fragments.into_iter() {
19+
let target = &target;
20+
futures.push(async move {
21+
storage
22+
.copy(
23+
&prefixed_fragment_path(&reader.prefix, fragment.seq_no),
24+
&prefixed_fragment_path(target, fragment.seq_no),
25+
)
26+
.await
27+
.map(|_| fragment)
28+
});
11029
}
111-
let setsum = snapshots
30+
let fragments = futures::future::try_join_all(futures)
31+
.await
32+
.map_err(Arc::new)?;
33+
let setsum = fragments
11234
.iter()
11335
.map(|x| x.setsum)
114-
.fold(Setsum::default(), |x, y| x + y)
115-
+ fragments
116-
.iter()
117-
.map(|x| x.setsum)
118-
.fold(Setsum::default(), |x, y| x + y);
119-
let acc_bytes = snapshots.iter().map(|x| x.num_bytes).sum::<u64>()
120-
+ fragments.iter().map(|x| x.num_bytes).sum::<u64>();
36+
.fold(Setsum::default(), |x, y| x + y);
37+
let acc_bytes = fragments.iter().map(|x| x.num_bytes).sum::<u64>();
38+
let initial_offset = Some(fragments.iter().map(|f| f.start).min().unwrap_or(offset));
12139
let manifest = Manifest {
12240
setsum,
12341
acc_bytes,
12442
writer: "copy task".to_string(),
125-
snapshots,
43+
snapshots: vec![],
12644
fragments,
127-
initial_offset: manifest.initial_offset,
45+
initial_offset,
12846
};
12947
Manifest::initialize_from_manifest(options, storage, &target, manifest).await?;
13048
Ok(())

rust/wal3/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,15 @@ where
502502

503503
////////////////////////////////////////// Fragment Paths //////////////////////////////////////////
504504

505+
pub fn prefixed_fragment_path(prefix: &str, fragment_seq_no: FragmentSeqNo) -> String {
506+
format!(
507+
"{}/log/Bucket={:016x}/FragmentSeqNo={:016x}.parquet",
508+
prefix,
509+
fragment_seq_no.bucket(),
510+
fragment_seq_no.0,
511+
)
512+
}
513+
505514
pub fn unprefixed_fragment_path(fragment_seq_no: FragmentSeqNo) -> String {
506515
format!(
507516
"log/Bucket={:016x}/FragmentSeqNo={:016x}.parquet",

rust/wal3/src/manifest.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl Snapshot {
118118
}
119119
for frag in self.fragments.iter() {
120120
calculated_setsum += frag.setsum;
121+
bytes_read += frag.num_bytes;
121122
}
122123
if calculated_setsum != self.setsum {
123124
return Err(ScrubError::CorruptManifest{

rust/wal3/src/reader.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,21 @@ fn ranges_overlap(lhs: (LogPosition, LogPosition), rhs: (LogPosition, LogPositio
2222
}
2323

2424
/// Limits allows encoding things like offset, timestamp, and byte size limits for the read.
25-
#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
25+
#[derive(Copy, Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
2626
pub struct Limits {
2727
pub max_files: Option<u64>,
2828
pub max_bytes: Option<u64>,
2929
pub max_records: Option<u64>,
3030
}
3131

32+
impl Limits {
33+
pub const UNLIMITED: Limits = Limits {
34+
max_files: None,
35+
max_bytes: None,
36+
max_records: None,
37+
};
38+
}
39+
3240
/// LogReader is a reader for the log.
3341
pub struct LogReader {
3442
options: LogReaderOptions,
@@ -757,7 +765,7 @@ mod tests {
757765
max_bytes: None,
758766
max_records: Some(100), // Would need data up to exactly offset 200
759767
};
760-
let result = LogReader::scan_from_manifest(&manifest, from, limits.clone());
768+
let result = LogReader::scan_from_manifest(&manifest, from, limits);
761769
assert!(
762770
result.is_some(),
763771
"Should succeed when request stays within manifest coverage"
@@ -836,11 +844,8 @@ mod tests {
836844
fragments: vec![],
837845
initial_offset: None,
838846
};
839-
let result_empty = LogReader::scan_from_manifest(
840-
&empty_manifest,
841-
LogPosition::from_offset(0),
842-
limits.clone(),
843-
);
847+
let result_empty =
848+
LogReader::scan_from_manifest(&empty_manifest, LogPosition::from_offset(0), limits);
844849
assert!(
845850
result_empty.is_none(),
846851
"Should return None for empty manifest"

rust/wal3/tests/test_k8s_integration_82_copy_then_update_dst.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ async fn test_k8s_integration_82_copy_then_update_dst() {
1515
Manifest::initialize(
1616
&LogWriterOptions::default(),
1717
&storage,
18-
"test_k8s_integration_80_copy_source",
18+
"test_k8s_integration_82_copy_then_update_dst_source",
1919
"init",
2020
)
2121
.await
@@ -29,7 +29,7 @@ async fn test_k8s_integration_82_copy_then_update_dst() {
2929
..LogWriterOptions::default()
3030
},
3131
Arc::clone(&storage),
32-
"test_k8s_integration_80_copy_source",
32+
"test_k8s_integration_82_copy_then_update_dst_source",
3333
"load and scrub writer",
3434
(),
3535
)
@@ -45,7 +45,7 @@ async fn test_k8s_integration_82_copy_then_update_dst() {
4545
let reader = LogReader::open(
4646
LogReaderOptions::default(),
4747
Arc::clone(&storage),
48-
"test_k8s_integration_80_copy_source".to_string(),
48+
"test_k8s_integration_82_copy_then_update_dst_source".to_string(),
4949
)
5050
.await
5151
.unwrap();
@@ -55,15 +55,15 @@ async fn test_k8s_integration_82_copy_then_update_dst() {
5555
&LogWriterOptions::default(),
5656
&reader,
5757
LogPosition::default(),
58-
"test_k8s_integration_80_copy_target".to_string(),
58+
"test_k8s_integration_82_copy_then_update_dst_target".to_string(),
5959
)
6060
.await
6161
.unwrap();
6262
// Scrub the copy.
6363
let copied = LogReader::open(
6464
LogReaderOptions::default(),
6565
Arc::clone(&storage),
66-
"test_k8s_integration_80_copy_target".to_string(),
66+
"test_k8s_integration_82_copy_then_update_dst_target".to_string(),
6767
)
6868
.await
6969
.unwrap();
@@ -82,7 +82,7 @@ async fn test_k8s_integration_82_copy_then_update_dst() {
8282
..LogWriterOptions::default()
8383
},
8484
Arc::clone(&storage),
85-
"test_k8s_integration_80_copy_target",
85+
"test_k8s_integration_82_copy_then_update_dst_target",
8686
"load and scrub writer",
8787
(),
8888
)

0 commit comments

Comments
 (0)