Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/pkg/sysdb/coordinator/model_db_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func convertCollectionToModel(collectionAndMetadataList []*dbmodel.CollectionAnd
IsDeleted: collectionAndMetadata.Collection.IsDeleted,
VersionFileName: collectionAndMetadata.Collection.VersionFileName,
CreatedAt: collectionAndMetadata.Collection.CreatedAt,
UpdatedAt: collectionAndMetadata.Collection.UpdatedAt.Unix(),
DatabaseId: types.MustParse(collectionAndMetadata.Collection.DatabaseID),
}
collection.Metadata = convertCollectionMetadataToModel(collectionAndMetadata.CollectionMetadata)
Expand Down
9 changes: 6 additions & 3 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,12 @@ func (tc *Catalog) GetCollections(ctx context.Context, collectionIDs []types.Uni
defer span.End()
}

ids := make([]string, 0, len(collectionIDs))
for _, id := range collectionIDs {
ids = append(ids, id.String())
ids := ([]string)(nil)
if collectionIDs != nil {
ids = make([]string, 0, len(collectionIDs))
for _, id := range collectionIDs {
ids = append(ids, id.String())
}
}

collectionAndMetadataList, err := tc.metaDomain.CollectionDb(ctx).GetCollections(ids, collectionName, tenantID, databaseName, limit, offset, includeSoftDeleted)
Expand Down
3 changes: 3 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestCatalog_GetCollections(t *testing.T) {
collectionName := "test_collection"

// create a mock collection and metadata list
now := time.Now()
name := "test_collection"
testKey := "test_key"
testValue := "test_value"
Expand All @@ -105,6 +106,7 @@ func TestCatalog_GetCollections(t *testing.T) {
ConfigurationJsonStr: &collectionConfigurationJsonStr,
Ts: types.Timestamp(1234567890),
DatabaseID: dbId.String(),
UpdatedAt: now,
},
CollectionMetadata: []*dbmodel.CollectionMetadata{
{
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestCatalog_GetCollections(t *testing.T) {
Ts: types.Timestamp(1234567890),
Metadata: metadata,
DatabaseId: dbId,
UpdatedAt: now.Unix(),
},
}, collections)

Expand Down
24 changes: 15 additions & 9 deletions go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,30 @@ func (s *Server) GetCollections(ctx context.Context, req *coordinatorpb.GetColle

res := &coordinatorpb.GetCollectionsResponse{}

collectionIDs := []types.UniqueID{}
collectionIDs := ([]types.UniqueID)(nil)
parsedCollectionID, err := types.ToUniqueID(collectionID)
if err != nil {
log.Error("GetCollections failed. collection id format error", zap.Error(err), zap.Stringp("collection_id", collectionID), zap.Stringp("collection_name", collectionName))
return res, grpcutils.BuildInternalGrpcError(err.Error())
}
if parsedCollectionID != types.NilUniqueID() {
collectionIDs = append(collectionIDs, parsedCollectionID)
collectionIDs = []types.UniqueID{parsedCollectionID}
}

for _, id := range req.Ids {
parsedCollectionID, err := types.ToUniqueID(&id)
if err != nil {
log.Error("GetCollections failed. collection id format error", zap.Error(err), zap.Stringp("collection_id", &id), zap.Stringp("collection_name", collectionName))
return res, grpcutils.BuildInternalGrpcError(err.Error())
if req.IdsFilter != nil {
if collectionIDs == nil {
collectionIDs = make([]types.UniqueID, 0, len(req.IdsFilter.Ids))
}
if parsedCollectionID != types.NilUniqueID() {
collectionIDs = append(collectionIDs, parsedCollectionID)

for _, id := range req.IdsFilter.Ids {
parsedCollectionID, err := types.ToUniqueID(&id)
if err != nil {
log.Error("GetCollections failed. collection id format error", zap.Error(err), zap.Stringp("collection_id", &id), zap.Stringp("collection_name", collectionName))
return res, grpcutils.BuildInternalGrpcError(err.Error())
}
if parsedCollectionID != types.NilUniqueID() {
collectionIDs = append(collectionIDs, parsedCollectionID)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions go/pkg/sysdb/grpc/proto_model_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)

func convertCollectionMetadataToModel(collectionMetadata *coordinatorpb.UpdateMetadata) (*model.CollectionMetadata[model.CollectionMetadataValueType], error) {
Expand Down Expand Up @@ -55,6 +56,10 @@ func convertCollectionToProto(collection *model.Collection) *coordinatorpb.Colle
LastCompactionTimeSecs: collection.LastCompactionTimeSecs,
VersionFilePath: &collection.VersionFileName,
LineageFilePath: collection.LineageFileName,
UpdatedAt: &timestamppb.Timestamp{
Seconds: collection.UpdatedAt,
Nanos: 0,
},
}

if collection.RootCollectionID != nil {
Expand Down
8 changes: 5 additions & 3 deletions go/pkg/sysdb/metastore/db/dao/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *collectionDb) getCollections(ids []string, name *string, tenantID strin
if tenantID != "" {
query = query.Where("databases.tenant_id = ?", tenantID)
}
if ids != nil && len(ids) > 0 {
if ids != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small bug: this method should not return all collections if ids: []

query = query.Where("collections.id IN ?", ids)
}
if name != nil {
Expand Down Expand Up @@ -219,6 +219,8 @@ func (s *collectionDb) getCollections(ids []string, name *string, tenantID strin
SizeBytesPostCompaction: r.SizeBytesPostCompaction,
LastCompactionTimeSecs: r.LastCompactionTimeSecs,
Tenant: r.Tenant,
UpdatedAt: *r.CollectionUpdatedAt,
CreatedAt: *r.CollectionCreatedAt,
}
if r.CollectionTs != nil {
col.Ts = *r.CollectionTs
Expand Down Expand Up @@ -336,9 +338,9 @@ func (s *collectionDb) GetCollectionSize(id string) (uint64, error) {

func (s *collectionDb) GetSoftDeletedCollections(collectionID *string, tenantID string, databaseName string, limit int32) ([]*dbmodel.CollectionAndMetadata, error) {
isDeleted := true
ids := []string{}
ids := ([]string)(nil)
if collectionID != nil {
ids = append(ids, *collectionID)
ids = []string{*collectionID}
}
return s.getCollections(ids, nil, tenantID, databaseName, &limit, nil, &isDeleted)
}
Expand Down
3 changes: 3 additions & 0 deletions idl/chromadb/proto/chroma.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package chroma;

option go_package = "github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb";

import "google/protobuf/timestamp.proto";

// Types here should mirror chromadb/types.py
enum Operation {
ADD = 0;
Expand Down Expand Up @@ -61,6 +63,7 @@ message Collection {
optional string version_file_path = 13;
optional string root_collection_id = 14;
optional string lineage_file_path = 15;
google.protobuf.Timestamp updated_at = 16;
}

message Database {
Expand Down
6 changes: 5 additions & 1 deletion idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,18 @@ message GetCollectionResponse {
Collection collection = 1;
}

message CollectionIdsFilter {
repeated string ids = 1;
}

message GetCollectionsRequest {
optional string id = 1;
optional string name = 2;
string tenant = 4;
string database = 5;
optional int32 limit = 6;
optional int32 offset = 7;
repeated string ids = 8;
optional CollectionIdsFilter ids_filter = 8;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this wrapper type is unfortunately needed because otherwise we can't distinguish the case when no IDs filter is provided and when an empty IDs filter ([]) is provided

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is it backwards and forwards compatible? For e.g. if old sysdb receives the new type, it will error out trying to deserialize this field (vice-versa)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline--yes, it's ok as long as the change that introduced GetCollectionsRequest.ids doesn't get promoted to prod separately from this change

optional bool include_soft_deleted = 9;
}

Expand Down
19 changes: 15 additions & 4 deletions rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@ where
Ok(Duration::from_secs(secs))
}

#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Deserialize, Clone)]
pub(super) struct GarbageCollectorConfig {
pub(super) service_name: String,
pub(super) otel_endpoint: String,
#[serde(
rename = "relative_cutoff_time_seconds",
rename = "collection_soft_delete_grace_period_seconds",
deserialize_with = "deserialize_duration_from_seconds",
default = "GarbageCollectorConfig::default_collection_soft_delete_grace_period"
)]
pub(super) collection_soft_delete_grace_period: Duration,
#[serde(
rename = "version_relative_cutoff_time_seconds",
alias = "relative_cutoff_time_seconds",
deserialize_with = "deserialize_duration_from_seconds"
)]
pub(super) relative_cutoff_time: Duration,
pub(super) version_cutoff_time: Duration,
pub(super) max_collections_to_gc: u32,
pub(super) gc_interval_mins: u32,
pub(super) disallow_collections: Vec<String>,
Expand Down Expand Up @@ -69,6 +76,10 @@ impl GarbageCollectorConfig {
fn default_port() -> u16 {
50055
}

fn default_collection_soft_delete_grace_period() -> Duration {
Duration::from_secs(60 * 60 * 24) // 1 day
}
}

#[cfg(test)]
Expand All @@ -83,7 +94,7 @@ mod tests {
assert_eq!(config.service_name, "garbage-collector");
assert_eq!(config.otel_endpoint, "http://otel-collector:4317");
assert_eq!(
config.relative_cutoff_time,
config.version_cutoff_time,
Duration::from_secs(12 * 60 * 60)
); // 12 hours
assert_eq!(config.max_collections_to_gc, 1000);
Expand Down
Loading
Loading