Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/pkg/log/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (s *logServer) ScrubLog(ctx context.Context, req *logservicepb.ScrubLogRequ
return
}

func (s *logServer) GarbageCollectPhase2(ctx context.Context, req *logservicepb.GarbageCollectPhase2Request) (res *logservicepb.GarbageCollectPhase2Response, err error) {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this return nil or an actual response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an empty response. I'm copying other empty responses here. Should we set the struct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's implicitly converting nil to empty response in this case. We can set the struct for clarity or leave it as is for consistency, whichever you prefer

}

func NewLogServer(lr *repository.LogRepository) logservicepb.LogServiceServer {
return &logServer{
lr: lr,
Expand Down
12 changes: 12 additions & 0 deletions idl/chromadb/proto/logservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ message ScrubLogResponse {
repeated string errors = 3;
}

message GarbageCollectPhase2Request {
oneof log_to_collect {
string collection_id = 1;
string dirty_log = 2;
};
}

message GarbageCollectPhase2Response {
}

service LogService {
rpc PushLogs(PushLogsRequest) returns (PushLogsResponse) {}
rpc ScoutLogs(ScoutLogsRequest) returns (ScoutLogsResponse) {}
Expand All @@ -163,4 +173,6 @@ service LogService {
rpc InspectLogState(InspectLogStateRequest) returns (InspectLogStateResponse) {}
// This endpoint should route to the rust log service.
rpc ScrubLog(ScrubLogRequest) returns (ScrubLogResponse) {}
// This endpoint should route to the rust log service.
rpc GarbageCollectPhase2(GarbageCollectPhase2Request) returns (GarbageCollectPhase2Response) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename this so that the name explains what this service does in more details?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you name it? It's phase2 of the process known as garbage collect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my understanding it's truncating the manifest? so maybe TruncateManifest

}
2 changes: 1 addition & 1 deletion k8s/distributed-chroma/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ apiVersion: v2
name: distributed-chroma
description: A helm chart for distributed Chroma
type: application
version: 0.1.47
version: 0.1.48
appVersion: "0.4.24"
keywords:
- chroma
Expand Down
7 changes: 7 additions & 0 deletions k8s/distributed-chroma/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,10 @@ garbageCollector:
kube_namespace: "chroma"
memberlist_name: "garbage-collection-service-memberlist"
queue_size: 100
log:
grpc:
host: "logservice.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 5000
alt_host: "rust-log-service.chroma"
1 change: 1 addition & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ base64 = { workspace = true }

chroma-config = { workspace = true }
chroma-error = { workspace = true }
chroma-log = { workspace = true }
chroma-segment = { workspace = true }
chroma-system = { workspace = true }
chroma-types = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions rust/garbage_collector/garbage_collector_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,11 @@ memberlist_provider:
kube_namespace: "chroma"
memberlist_name: "garbage-collection-service-memberlist"
queue_size: 100
log:
grpc:
host: "logservice.chroma"
port: 50051
connect_timeout_ms: 5000
request_timeout_ms: 5000
alt_host: "rust-log-service.chroma"
alt_host_threshold: "ffffffff-ffff-ffff-ffff-ffffffffffff"
4 changes: 3 additions & 1 deletion rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chroma_cache::CacheConfig;
use chroma_log::config::LogConfig;
use chroma_storage::config::StorageConfig;
use chroma_system::DispatcherConfig;
use chroma_types::CollectionUuid;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub(super) struct GarbageCollectorConfig {
pub jemalloc_pprof_server_port: Option<u16>,
#[serde(default)]
pub disable_log_gc: bool,
pub log: LogConfig,
}

impl GarbageCollectorConfig {
Expand Down Expand Up @@ -88,7 +90,7 @@ impl GarbageCollectorConfig {
let res = f.extract();
match res {
Ok(config) => config,
Err(e) => panic!("Error loading config: {}", e),
Err(e) => panic!("Error loading config from {path}: {}", e),
}
}

Expand Down
49 changes: 33 additions & 16 deletions rust/garbage_collector/src/garbage_collector_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use chroma_config::{
assignment::assignment_policy::AssignmentPolicy, registry::Registry, Configurable,
};
use chroma_error::ChromaError;
use chroma_log::Log;
use chroma_memberlist::memberlist_provider::Memberlist;
use chroma_storage::Storage;
use chroma_sysdb::{CollectionToGcInfo, SysDb, SysDbConfig};
use chroma_system::{
wrap, Component, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator,
wrap, Component, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, System,
TaskResult,
};
use chrono::{DateTime, Utc};
Expand All @@ -31,8 +32,9 @@ pub(crate) struct GarbageCollector {
config: GarbageCollectorConfig,
sysdb_client: SysDb,
storage: Storage,
logs: Log,
dispatcher: Option<ComponentHandle<Dispatcher>>,
system: Option<chroma_system::System>,
system: Option<System>,
assignment_policy: Box<dyn AssignmentPolicy>,
memberlist: Memberlist,
root_manager: RootManager,
Expand Down Expand Up @@ -64,6 +66,7 @@ impl GarbageCollector {
config: GarbageCollectorConfig,
sysdb_client: SysDb,
storage: Storage,
logs: Log,
assignment_policy: Box<dyn AssignmentPolicy>,
root_manager: RootManager,
) -> Self {
Expand All @@ -73,6 +76,7 @@ impl GarbageCollector {
config,
sysdb_client,
storage,
logs,
dispatcher: None,
system: None,
assignment_policy,
Expand Down Expand Up @@ -134,6 +138,7 @@ impl GarbageCollector {
dispatcher.clone(),
system.clone(),
self.storage.clone(),
self.logs.clone(),
self.root_manager.clone(),
cleanup_mode,
self.config.min_versions_to_keep,
Expand Down Expand Up @@ -213,6 +218,7 @@ impl GarbageCollector {
let truncate_dirty_log_task = wrap(
Box::new(TruncateDirtyLogOperator {
storage: self.storage.clone(),
logs: self.logs.clone(),
}),
(),
ctx.receiver(),
Expand Down Expand Up @@ -491,9 +497,9 @@ impl Handler<TaskResult<TruncateDirtyLogOutput, TruncateDirtyLogError>> for Garb
}

#[async_trait]
impl Configurable<GarbageCollectorConfig> for GarbageCollector {
impl Configurable<(GarbageCollectorConfig, System)> for GarbageCollector {
async fn try_from_config(
config: &GarbageCollectorConfig,
(config, system): &(GarbageCollectorConfig, System),
registry: &Registry,
) -> Result<Self, Box<dyn ChromaError>> {
let sysdb_config = SysDbConfig::Grpc(config.sysdb_config.clone());
Expand All @@ -504,6 +510,8 @@ impl Configurable<GarbageCollectorConfig> for GarbageCollector {
Box::<dyn AssignmentPolicy>::try_from_config(&config.assignment_policy, registry)
.await?;

let logs = Log::try_from_config(&(config.log.clone(), system.clone()), registry).await?;

let root_manager_cache =
chroma_cache::from_config_persistent(&config.root_cache_config).await?;
let root_manager = RootManager::new(storage.clone(), root_manager_cache);
Expand All @@ -512,6 +520,7 @@ impl Configurable<GarbageCollectorConfig> for GarbageCollector {
config.clone(),
sysdb_client,
storage,
logs,
assignment_policy,
root_manager,
))
Expand All @@ -527,6 +536,7 @@ mod tests {

use super::*;
use crate::helper::ChromaGrpcClients;
use chroma_log::config::LogConfig;
use chroma_memberlist::memberlist_provider::Member;
use chroma_storage::s3_config_for_localhost_with_bucket_name;
use chroma_sysdb::{GetCollectionsOptions, GrpcSysDb, GrpcSysDbConfig};
Expand Down Expand Up @@ -724,6 +734,7 @@ mod tests {
root_cache_config: Default::default(),
jemalloc_pprof_server_port: None,
disable_log_gc: false,
log: LogConfig::default(),
};
let registry = Registry::new();

Expand Down Expand Up @@ -760,15 +771,16 @@ mod tests {
tokio::time::sleep(Duration::from_secs(1)).await;

// Run garbage collection
let mut garbage_collector_component = GarbageCollector::try_from_config(&config, &registry)
.await
.unwrap();
let system = System::new();
let mut garbage_collector_component =
GarbageCollector::try_from_config(&(config.clone(), system.clone()), &registry)
.await
.unwrap();

let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, &registry)
.await
.unwrap();

let system = System::new();
let dispatcher_handle = system.start_component(dispatcher);

garbage_collector_component.set_dispatcher(dispatcher_handle);
Expand Down Expand Up @@ -847,6 +859,7 @@ mod tests {
root_cache_config: Default::default(),
jemalloc_pprof_server_port: None,
disable_log_gc: false,
log: LogConfig::default(),
};
let registry = Registry::new();

Expand All @@ -868,16 +881,18 @@ mod tests {
// Wait 1 second for cutoff time
tokio::time::sleep(Duration::from_secs(1)).await;

let system = System::new();

// Run garbage collection
let mut garbage_collector_component = GarbageCollector::try_from_config(&config, &registry)
.await
.unwrap();
let mut garbage_collector_component =
GarbageCollector::try_from_config(&(config.clone(), system.clone()), &registry)
.await
.unwrap();

let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, &registry)
.await
.unwrap();

let system = System::new();
let dispatcher_handle = system.start_component(dispatcher);

garbage_collector_component.set_dispatcher(dispatcher_handle);
Expand Down Expand Up @@ -965,15 +980,16 @@ mod tests {
registry: &Registry,
tenant_id: String,
) -> GarbageCollectResult {
let mut garbage_collector_component = GarbageCollector::try_from_config(config, registry)
.await
.unwrap();
let system = System::new();
let mut garbage_collector_component =
GarbageCollector::try_from_config(&(config.clone(), system.clone()), registry)
.await
.unwrap();

let dispatcher = Dispatcher::try_from_config(&config.dispatcher_config, registry)
.await
.unwrap();

let system = System::new();
let mut dispatcher_handle = system.start_component(dispatcher);

garbage_collector_component.set_dispatcher(dispatcher_handle.clone());
Expand Down Expand Up @@ -1044,6 +1060,7 @@ mod tests {
root_cache_config: Default::default(),
jemalloc_pprof_server_port: None,
disable_log_gc: false,
log: LogConfig::default(),
};
let registry = Registry::new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::types::{
use async_trait::async_trait;
use chroma_blockstore::RootManager;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_log::Log;
use chroma_storage::Storage;
use chroma_sysdb::{GetCollectionsOptions, SysDb};
use chroma_system::{
Expand Down Expand Up @@ -62,6 +63,7 @@ pub struct GarbageCollectorOrchestrator {
dispatcher: ComponentHandle<Dispatcher>,
system: System,
storage: Storage,
logs: Log,
root_manager: RootManager,
result_channel: Option<Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>>,
cleanup_mode: CleanupMode,
Expand Down Expand Up @@ -96,6 +98,7 @@ impl GarbageCollectorOrchestrator {
dispatcher: ComponentHandle<Dispatcher>,
system: System,
storage: Storage,
logs: Log,
root_manager: RootManager,
cleanup_mode: CleanupMode,
min_versions_to_keep: u32,
Expand All @@ -111,6 +114,7 @@ impl GarbageCollectorOrchestrator {
dispatcher,
system,
storage,
logs,
root_manager,
cleanup_mode,
result_channel: None,
Expand Down Expand Up @@ -515,6 +519,7 @@ impl GarbageCollectorOrchestrator {
CleanupMode::DryRun | CleanupMode::DryRunV2
),
storage: self.storage.clone(),
logs: self.logs.clone(),
}),
DeleteUnusedLogsInput {
collections_to_destroy,
Expand Down Expand Up @@ -1096,6 +1101,7 @@ mod tests {
use super::GarbageCollectorOrchestrator;
use chroma_blockstore::RootManager;
use chroma_cache::nop::NopCache;
use chroma_log::Log;
use chroma_storage::test_storage;
use chroma_sysdb::{GetCollectionsOptions, TestSysDb};
use chroma_system::{Dispatcher, Orchestrator, System};
Expand Down Expand Up @@ -1180,6 +1186,7 @@ mod tests {
0,
)
.unwrap();
let logs = Log::InMemory(chroma_log::in_memory_log::InMemoryLog::default());
let orchestrator = GarbageCollectorOrchestrator::new(
root_collection_id,
root_collection.version_file_path.unwrap(),
Expand All @@ -1190,6 +1197,7 @@ mod tests {
dispatcher_handle,
system.clone(),
storage,
logs,
root_manager,
crate::types::CleanupMode::Delete,
1,
Expand Down
13 changes: 7 additions & 6 deletions rust/garbage_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,13 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
// Start a background task to periodically check for garbage.
// Garbage collector is a component that gets notified every
// gc_interval_mins to check for garbage.
let mut garbage_collector_component = GarbageCollector::try_from_config(&config, &registry)
.await
.map_err(|e| {
error!("Failed to create garbage collector component: {:?}", e);
e
})?;
let mut garbage_collector_component =
GarbageCollector::try_from_config(&(config, system.clone()), &registry)
.await
.map_err(|e| {
error!("Failed to create garbage collector component: {:?}", e);
e
})?;

garbage_collector_component.set_dispatcher(dispatcher_handle.clone());
garbage_collector_component.set_system(system.clone());
Expand Down
Loading
Loading