Skip to content

Commit c1654da

Browse files
authored
[HOTFIX] Wal3-gc related fixes from this weekend. (#5006)
## Description of changes Included PRs in order: - **[TST] Test for #4972 (#4983)** - **[CLN] Fix dedup in get_collections_with_new_data. (#4974)** - **[ENH] Implement three-phase garbage collection for WAL3 (#4984)** - **[CLN] Remove err(Display) from wal3. (#4992)** - **[ENH] Wire up garbage collector to do 3-phase GC. (#4987)** - **[ENH] Do not materialize all fragments to delete. (#5004)** ## Test plan CI through main and then CI on the hotfix branch. ## Documentation Changes N/A
1 parent 9c76673 commit c1654da

File tree

27 files changed

+610
-181
lines changed

27 files changed

+610
-181
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/log/server/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ func (s *logServer) ScrubLog(ctx context.Context, req *logservicepb.ScrubLogRequ
183183
return
184184
}
185185

186+
func (s *logServer) GarbageCollectPhase2(ctx context.Context, req *logservicepb.GarbageCollectPhase2Request) (res *logservicepb.GarbageCollectPhase2Response, err error) {
187+
return
188+
}
189+
186190
func NewLogServer(lr *repository.LogRepository) logservicepb.LogServiceServer {
187191
return &logServer{
188192
lr: lr,

idl/chromadb/proto/logservice.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ message ScrubLogResponse {
144144
repeated string errors = 3;
145145
}
146146

147+
message GarbageCollectPhase2Request {
148+
oneof log_to_collect {
149+
string collection_id = 1;
150+
string dirty_log = 2;
151+
};
152+
}
153+
154+
message GarbageCollectPhase2Response {
155+
}
156+
147157
service LogService {
148158
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
149159
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
@@ -163,4 +173,6 @@ service LogService {
163173
rpc InspectLogState(InspectLogStateRequest) returns (InspectLogStateResponse) {}
164174
// This endpoint should route to the rust log service.
165175
rpc ScrubLog(ScrubLogRequest) returns (ScrubLogResponse) {}
176+
// This endpoint should route to the rust log service.
177+
rpc GarbageCollectPhase2(GarbageCollectPhase2Request) returns (GarbageCollectPhase2Response) {}
166178
}

k8s/distributed-chroma/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ apiVersion: v2
1616
name: distributed-chroma
1717
description: A helm chart for distributed Chroma
1818
type: application
19-
version: 0.1.47
19+
version: 0.1.48
2020
appVersion: "0.4.24"
2121
keywords:
2222
- chroma

k8s/distributed-chroma/values.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,10 @@ garbageCollector:
135135
kube_namespace: "chroma"
136136
memberlist_name: "garbage-collection-service-memberlist"
137137
queue_size: 100
138+
log:
139+
grpc:
140+
host: "logservice.chroma"
141+
port: 50051
142+
connect_timeout_ms: 5000
143+
request_timeout_ms: 5000
144+
alt_host: "rust-log-service.chroma"

rust/garbage_collector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ base64 = { workspace = true }
3737

3838
chroma-config = { workspace = true }
3939
chroma-error = { workspace = true }
40+
chroma-log = { workspace = true }
4041
chroma-segment = { workspace = true }
4142
chroma-system = { workspace = true }
4243
chroma-types = { workspace = true }

rust/garbage_collector/garbage_collector_config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,11 @@ memberlist_provider:
3232
kube_namespace: "chroma"
3333
memberlist_name: "garbage-collection-service-memberlist"
3434
queue_size: 100
35+
log:
36+
grpc:
37+
host: "logservice.chroma"
38+
port: 50051
39+
connect_timeout_ms: 5000
40+
request_timeout_ms: 5000
41+
alt_host: "rust-log-service.chroma"
42+
alt_host_threshold: "ffffffff-ffff-ffff-ffff-ffffffffffff"

rust/garbage_collector/src/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use chroma_cache::CacheConfig;
2+
use chroma_log::config::LogConfig;
23
use chroma_storage::config::StorageConfig;
34
use chroma_system::DispatcherConfig;
45
use chroma_types::CollectionUuid;
@@ -60,6 +61,7 @@ pub(super) struct GarbageCollectorConfig {
6061
pub jemalloc_pprof_server_port: Option<u16>,
6162
#[serde(default)]
6263
pub disable_log_gc: bool,
64+
pub log: LogConfig,
6365
}
6466

6567
impl GarbageCollectorConfig {
@@ -88,7 +90,7 @@ impl GarbageCollectorConfig {
8890
let res = f.extract();
8991
match res {
9092
Ok(config) => config,
91-
Err(e) => panic!("Error loading config: {}", e),
93+
Err(e) => panic!("Error loading config from {path}: {}", e),
9294
}
9395
}
9496

rust/garbage_collector/src/garbage_collector_component.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use chroma_config::{
99
assignment::assignment_policy::AssignmentPolicy, registry::Registry, Configurable,
1010
};
1111
use chroma_error::ChromaError;
12+
use chroma_log::Log;
1213
use chroma_memberlist::memberlist_provider::Memberlist;
1314
use chroma_storage::Storage;
1415
use chroma_sysdb::{CollectionToGcInfo, SysDb, SysDbConfig};
1516
use chroma_system::{
16-
wrap, Component, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator,
17+
wrap, Component, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, System,
1718
TaskResult,
1819
};
1920
use chrono::{DateTime, Utc};
@@ -31,8 +32,9 @@ pub(crate) struct GarbageCollector {
3132
config: GarbageCollectorConfig,
3233
sysdb_client: SysDb,
3334
storage: Storage,
35+
logs: Log,
3436
dispatcher: Option<ComponentHandle<Dispatcher>>,
35-
system: Option<chroma_system::System>,
37+
system: Option<System>,
3638
assignment_policy: Box<dyn AssignmentPolicy>,
3739
memberlist: Memberlist,
3840
root_manager: RootManager,
@@ -64,6 +66,7 @@ impl GarbageCollector {
6466
config: GarbageCollectorConfig,
6567
sysdb_client: SysDb,
6668
storage: Storage,
69+
logs: Log,
6770
assignment_policy: Box<dyn AssignmentPolicy>,
6871
root_manager: RootManager,
6972
) -> Self {
@@ -73,6 +76,7 @@ impl GarbageCollector {
7376
config,
7477
sysdb_client,
7578
storage,
79+
logs,
7680
dispatcher: None,
7781
system: None,
7882
assignment_policy,
@@ -134,6 +138,7 @@ impl GarbageCollector {
134138
dispatcher.clone(),
135139
system.clone(),
136140
self.storage.clone(),
141+
self.logs.clone(),
137142
self.root_manager.clone(),
138143
cleanup_mode,
139144
self.config.min_versions_to_keep,
@@ -213,6 +218,7 @@ impl GarbageCollector {
213218
let truncate_dirty_log_task = wrap(
214219
Box::new(TruncateDirtyLogOperator {
215220
storage: self.storage.clone(),
221+
logs: self.logs.clone(),
216222
}),
217223
(),
218224
ctx.receiver(),
@@ -491,9 +497,9 @@ impl Handler<TaskResult<TruncateDirtyLogOutput, TruncateDirtyLogError>> for Garb
491497
}
492498

493499
#[async_trait]
494-
impl Configurable<GarbageCollectorConfig> for GarbageCollector {
500+
impl Configurable<(GarbageCollectorConfig, System)> for GarbageCollector {
495501
async fn try_from_config(
496-
config: &GarbageCollectorConfig,
502+
(config, system): &(GarbageCollectorConfig, System),
497503
registry: &Registry,
498504
) -> Result<Self, Box<dyn ChromaError>> {
499505
let sysdb_config = SysDbConfig::Grpc(config.sysdb_config.clone());
@@ -504,6 +510,8 @@ impl Configurable<GarbageCollectorConfig> for GarbageCollector {
504510
Box::<dyn AssignmentPolicy>::try_from_config(&config.assignment_policy, registry)
505511
.await?;
506512

513+
let logs = Log::try_from_config(&(config.log.clone(), system.clone()), registry).await?;
514+
507515
let root_manager_cache =
508516
chroma_cache::from_config_persistent(&config.root_cache_config).await?;
509517
let root_manager = RootManager::new(storage.clone(), root_manager_cache);
@@ -512,6 +520,7 @@ impl Configurable<GarbageCollectorConfig> for GarbageCollector {
512520
config.clone(),
513521
sysdb_client,
514522
storage,
523+
logs,
515524
assignment_policy,
516525
root_manager,
517526
))
@@ -527,6 +536,7 @@ mod tests {
527536

528537
use super::*;
529538
use crate::helper::ChromaGrpcClients;
539+
use chroma_log::config::LogConfig;
530540
use chroma_memberlist::memberlist_provider::Member;
531541
use chroma_storage::s3_config_for_localhost_with_bucket_name;
532542
use chroma_sysdb::{GetCollectionsOptions, GrpcSysDb, GrpcSysDbConfig};
@@ -724,6 +734,7 @@ mod tests {
724734
root_cache_config: Default::default(),
725735
jemalloc_pprof_server_port: None,
726736
disable_log_gc: false,
737+
log: LogConfig::default(),
727738
};
728739
let registry = Registry::new();
729740

@@ -760,15 +771,16 @@ mod tests {
760771
tokio::time::sleep(Duration::from_secs(1)).await;
761772

762773
// Run garbage collection
763-
let mut garbage_collector_component = GarbageCollector::try_from_config(&config, &registry)
764-
.await
765-
.unwrap();
774+
let system = System::new();
775+
let mut garbage_collector_component =
776+
GarbageCollector::try_from_config(&(config.clone(), system.clone()), &registry)
777+
.await
778+
.unwrap();
766779

767780
let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, &registry)
768781
.await
769782
.unwrap();
770783

771-
let system = System::new();
772784
let dispatcher_handle = system.start_component(dispatcher);
773785

774786
garbage_collector_component.set_dispatcher(dispatcher_handle);
@@ -847,6 +859,7 @@ mod tests {
847859
root_cache_config: Default::default(),
848860
jemalloc_pprof_server_port: None,
849861
disable_log_gc: false,
862+
log: LogConfig::default(),
850863
};
851864
let registry = Registry::new();
852865

@@ -868,16 +881,18 @@ mod tests {
868881
// Wait 1 second for cutoff time
869882
tokio::time::sleep(Duration::from_secs(1)).await;
870883

884+
let system = System::new();
885+
871886
// Run garbage collection
872-
let mut garbage_collector_component = GarbageCollector::try_from_config(&config, &registry)
873-
.await
874-
.unwrap();
887+
let mut garbage_collector_component =
888+
GarbageCollector::try_from_config(&(config.clone(), system.clone()), &registry)
889+
.await
890+
.unwrap();
875891

876892
let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, &registry)
877893
.await
878894
.unwrap();
879895

880-
let system = System::new();
881896
let dispatcher_handle = system.start_component(dispatcher);
882897

883898
garbage_collector_component.set_dispatcher(dispatcher_handle);
@@ -965,15 +980,16 @@ mod tests {
965980
registry: &Registry,
966981
tenant_id: String,
967982
) -> GarbageCollectResult {
968-
let mut garbage_collector_component = GarbageCollector::try_from_config(config, registry)
969-
.await
970-
.unwrap();
983+
let system = System::new();
984+
let mut garbage_collector_component =
985+
GarbageCollector::try_from_config(&(config.clone(), system.clone()), registry)
986+
.await
987+
.unwrap();
971988

972989
let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, registry)
973990
.await
974991
.unwrap();
975992

976-
let system = System::new();
977993
let mut dispatcher_handle = system.start_component(dispatcher);
978994

979995
garbage_collector_component.set_dispatcher(dispatcher_handle.clone());
@@ -1044,6 +1060,7 @@ mod tests {
10441060
root_cache_config: Default::default(),
10451061
jemalloc_pprof_server_port: None,
10461062
disable_log_gc: false,
1063+
log: LogConfig::default(),
10471064
};
10481065
let registry = Registry::new();
10491066

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::types::{
3333
use async_trait::async_trait;
3434
use chroma_blockstore::RootManager;
3535
use chroma_error::{ChromaError, ErrorCodes};
36+
use chroma_log::Log;
3637
use chroma_storage::Storage;
3738
use chroma_sysdb::{GetCollectionsOptions, SysDb};
3839
use chroma_system::{
@@ -62,6 +63,7 @@ pub struct GarbageCollectorOrchestrator {
6263
dispatcher: ComponentHandle<Dispatcher>,
6364
system: System,
6465
storage: Storage,
66+
logs: Log,
6567
root_manager: RootManager,
6668
result_channel: Option<Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>>,
6769
cleanup_mode: CleanupMode,
@@ -96,6 +98,7 @@ impl GarbageCollectorOrchestrator {
9698
dispatcher: ComponentHandle<Dispatcher>,
9799
system: System,
98100
storage: Storage,
101+
logs: Log,
99102
root_manager: RootManager,
100103
cleanup_mode: CleanupMode,
101104
min_versions_to_keep: u32,
@@ -111,6 +114,7 @@ impl GarbageCollectorOrchestrator {
111114
dispatcher,
112115
system,
113116
storage,
117+
logs,
114118
root_manager,
115119
cleanup_mode,
116120
result_channel: None,
@@ -515,6 +519,7 @@ impl GarbageCollectorOrchestrator {
515519
CleanupMode::DryRun | CleanupMode::DryRunV2
516520
),
517521
storage: self.storage.clone(),
522+
logs: self.logs.clone(),
518523
}),
519524
DeleteUnusedLogsInput {
520525
collections_to_destroy,
@@ -1096,6 +1101,7 @@ mod tests {
10961101
use super::GarbageCollectorOrchestrator;
10971102
use chroma_blockstore::RootManager;
10981103
use chroma_cache::nop::NopCache;
1104+
use chroma_log::Log;
10991105
use chroma_storage::test_storage;
11001106
use chroma_sysdb::{GetCollectionsOptions, TestSysDb};
11011107
use chroma_system::{Dispatcher, Orchestrator, System};
@@ -1180,6 +1186,7 @@ mod tests {
11801186
0,
11811187
)
11821188
.unwrap();
1189+
let logs = Log::InMemory(chroma_log::in_memory_log::InMemoryLog::default());
11831190
let orchestrator = GarbageCollectorOrchestrator::new(
11841191
root_collection_id,
11851192
root_collection.version_file_path.unwrap(),
@@ -1190,6 +1197,7 @@ mod tests {
11901197
dispatcher_handle,
11911198
system.clone(),
11921199
storage,
1200+
logs,
11931201
root_manager,
11941202
crate::types::CleanupMode::Delete,
11951203
1,

0 commit comments

Comments
 (0)