Skip to content

Commit b099250

Browse files
committed
[ENH]: soft delete databases, hard delete from garbage collector
1 parent 738703d commit b099250

File tree

12 files changed

+160
-36
lines changed

12 files changed

+160
-36
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package coordinator
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/chroma-core/chroma/go/pkg/common"
78
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
@@ -255,3 +256,13 @@ func (s *Coordinator) BatchGetCollectionVersionFilePaths(ctx context.Context, re
255256
func (s *Coordinator) BatchGetCollectionSoftDeleteStatus(ctx context.Context, req *coordinatorpb.BatchGetCollectionSoftDeleteStatusRequest) (*coordinatorpb.BatchGetCollectionSoftDeleteStatusResponse, error) {
256257
return s.catalog.BatchGetCollectionSoftDeleteStatus(ctx, req.CollectionIds)
257258
}
259+
260+
func (s *Coordinator) FinishDatabaseDeletion(ctx context.Context, req *coordinatorpb.FinishDatabaseDeletionRequest) (*coordinatorpb.FinishDatabaseDeletionResponse, error) {
261+
err := s.catalog.FinishDatabaseDeletion(ctx, time.Unix(req.CutoffTime.Seconds, int64(req.CutoffTime.Nanos)))
262+
if err != nil {
263+
return nil, err
264+
}
265+
266+
res := &coordinatorpb.FinishDatabaseDeletionResponse{}
267+
return res, nil
268+
}

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,32 @@ func (tc *Catalog) DeleteDatabase(ctx context.Context, deleteDatabase *model.Del
184184
if len(databases) == 0 {
185185
return common.ErrDatabaseNotFound
186186
}
187-
err = tc.metaDomain.DatabaseDb(txCtx).Delete(databases[0].ID)
187+
err = tc.metaDomain.DatabaseDb(txCtx).SoftDelete(databases[0].ID)
188188
if err != nil {
189189
return err
190190
}
191+
192+
collections, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(nil, nil, deleteDatabase.Tenant, deleteDatabase.Name, nil, nil)
193+
if err != nil {
194+
return err
195+
}
196+
197+
for _, collection := range collections {
198+
collectionID, err := types.Parse(collection.Collection.ID)
199+
if err != nil {
200+
return err
201+
}
202+
203+
err = tc.softDeleteCollection(txCtx, &model.DeleteCollection{
204+
ID: collectionID,
205+
TenantID: deleteDatabase.Tenant,
206+
DatabaseName: deleteDatabase.Name,
207+
})
208+
if err != nil {
209+
return err
210+
}
211+
}
212+
191213
return nil
192214
})
193215
}
@@ -2196,3 +2218,7 @@ func (tc *Catalog) GetVersionFileNamesForCollection(ctx context.Context, tenantI
21962218

21972219
return collectionEntry.VersionFileName, nil
21982220
}
2221+
2222+
func (tc *Catalog) FinishDatabaseDeletion(ctx context.Context, cutoffTime time.Time) error {
2223+
return tc.metaDomain.DatabaseDb(ctx).FinishDatabaseDeletion(cutoffTime)
2224+
}

go/pkg/sysdb/grpc/tenant_database_service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,11 @@ func (s *Server) GetLastCompactionTimeForTenant(ctx context.Context, req *coordi
158158
}
159159
return res, nil
160160
}
161+
162+
func (s *Server) FinishDatabaseDeletion(ctx context.Context, req *coordinatorpb.FinishDatabaseDeletionRequest) (*coordinatorpb.FinishDatabaseDeletionResponse, error) {
163+
res, err := s.coordinator.FinishDatabaseDeletion(ctx, req)
164+
if err != nil {
165+
return nil, grpcutils.BuildInternalGrpcError(err.Error())
166+
}
167+
return res, nil
168+
}

go/pkg/sysdb/grpc/tenant_database_service_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (suite *TenantDatabaseServiceTestSuite) TestServer_DeleteDatabase() {
145145
})
146146
suite.NoError(err)
147147

148-
// Check that associated collection was deleted
148+
// Check that associated collection was soft deleted
149149
var count int64
150150
var collections []*dbmodel.Collection
151151
suite.NoError(suite.db.Find(&collections).Count(&count).Error)

go/pkg/sysdb/metastore/db/dao/database.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dao
22

33
import (
44
"errors"
5+
"time"
56

67
"github.com/chroma-core/chroma/go/pkg/common"
78
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
@@ -95,13 +96,9 @@ func (s *databaseDb) Insert(database *dbmodel.Database) error {
9596
return err
9697
}
9798

98-
func (s *databaseDb) Delete(databaseID string) error {
99+
func (s *databaseDb) SoftDelete(databaseID string) error {
99100
return s.db.Transaction(func(tx *gorm.DB) error {
100-
if err := tx.Where("id = ?", databaseID).Delete(&dbmodel.Database{}).Error; err != nil {
101-
return err
102-
}
103-
104-
if err := tx.Where("database_id = ?", databaseID).Delete(&dbmodel.Collection{}).Error; err != nil {
101+
if err := tx.Table("databases").Where("id = ?", databaseID).Update("is_deleted", true).Error; err != nil {
105102
return err
106103
}
107104

@@ -121,3 +118,30 @@ func (s *databaseDb) GetDatabasesByTenantID(tenantID string) ([]*dbmodel.Databas
121118
}
122119
return databases, nil
123120
}
121+
122+
func (s *databaseDb) FinishDatabaseDeletion(cutoffTime time.Time) error {
123+
for {
124+
// Only hard delete databases that are soft deleted and have no collections
125+
databasesSubQuery := s.db.
126+
Table("databases d").
127+
Select("d.id").
128+
Joins("LEFT JOIN collections c ON c.database_id = d.id").
129+
Where("d.is_deleted = ?", true).
130+
Where("d.updated_at < ?", cutoffTime).
131+
Group("d.id").
132+
Having("COUNT(c.id) = 0").
133+
Limit(1000)
134+
135+
res := s.db.Table("databases").
136+
Where("id IN (?)", databasesSubQuery).
137+
Delete(&dbmodel.Database{})
138+
if res.Error != nil {
139+
return res.Error
140+
}
141+
if res.RowsAffected == 0 {
142+
break
143+
}
144+
}
145+
146+
return nil
147+
}

go/pkg/sysdb/metastore/db/dbmodel/database.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ type IDatabaseDb interface {
2727
ListDatabases(limit *int32, offset *int32, tenantID string) ([]*Database, error)
2828
Insert(in *Database) error
2929
DeleteAll() error
30-
Delete(databaseID string) error
30+
SoftDelete(databaseID string) error
31+
FinishDatabaseDeletion(cutoffTime time.Time) error
3132
}

idl/chromadb/proto/coordinator.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ message DeleteDatabaseRequest {
4646

4747
message DeleteDatabaseResponse {}
4848

49+
message FinishDatabaseDeletionRequest {
50+
google.protobuf.Timestamp cutoff_time = 1;
51+
}
52+
53+
message FinishDatabaseDeletionResponse {
54+
uint64 num_deleted = 1;
55+
}
56+
4957
message CreateTenantRequest {
5058
string name = 2; // Names are globally unique
5159
}
@@ -496,6 +504,7 @@ service SysDB {
496504
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
497505
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse) {}
498506
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse) {}
507+
rpc FinishDatabaseDeletion(FinishDatabaseDeletionRequest) returns (FinishDatabaseDeletionResponse) {}
499508
rpc CreateTenant(CreateTenantRequest) returns (CreateTenantResponse) {}
500509
rpc GetTenant(GetTenantRequest) returns (GetTenantResponse) {}
501510
rpc CreateSegment(CreateSegmentRequest) returns (CreateSegmentResponse) {}

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use chroma_system::{
3434
use chroma_types::chroma_proto::{CollectionVersionFile, VersionListForCollection};
3535
use chroma_types::{
3636
BatchGetCollectionSoftDeleteStatusError, CollectionUuid, DeleteCollectionError,
37+
DeleteDatabaseError,
3738
};
3839
use chrono::{DateTime, Utc};
3940
use std::collections::{HashMap, HashSet};
@@ -158,6 +159,8 @@ pub enum GarbageCollectorError {
158159
UnparsableUuid(#[from] uuid::Error),
159160
#[error("Collection deletion failed: {0}")]
160161
CollectionDeletionFailed(#[from] DeleteCollectionError),
162+
#[error("Database deletion failed: {0}")]
163+
DeleteDatabaseFailed(#[from] DeleteDatabaseError),
161164
}
162165

163166
impl ChromaError for GarbageCollectorError {
@@ -716,6 +719,19 @@ impl GarbageCollectorOrchestrator {
716719

717720
self.num_pending_tasks -= 1;
718721
if self.num_pending_tasks == 0 {
722+
let tenant = self
723+
.tenant
724+
.clone()
725+
.ok_or(GarbageCollectorError::InvariantViolation(
726+
"Expected tenant to be set".to_string(),
727+
))?;
728+
let database_name =
729+
self.database_name
730+
.clone()
731+
.ok_or(GarbageCollectorError::InvariantViolation(
732+
"Expected database to be set".to_string(),
733+
))?;
734+
719735
for collection_id in self.soft_deleted_collections_to_gc.iter() {
720736
let graph =
721737
self.graph
@@ -758,16 +774,8 @@ impl GarbageCollectorOrchestrator {
758774
if are_all_children_in_fork_tree_also_soft_deleted {
759775
self.sysdb_client
760776
.finish_collection_deletion(
761-
self.tenant.clone().ok_or(
762-
GarbageCollectorError::InvariantViolation(
763-
"Expected tenant to be set".to_string(),
764-
),
765-
)?,
766-
self.database_name.clone().ok_or(
767-
GarbageCollectorError::InvariantViolation(
768-
"Expected database to be set".to_string(),
769-
),
770-
)?,
777+
tenant.clone(),
778+
database_name.clone(),
771779
*collection_id,
772780
)
773781
.await?;

rust/garbage_collector/tests/proptest_helpers/garbage_collector_reference.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,6 @@ impl ReferenceStateMachine for ReferenceGarbageCollector {
232232
segments: segment_group,
233233
});
234234

235-
let _delete_collection_transition = alive_collection_id_strategy
236-
.clone()
237-
.prop_map(Transition::DeleteCollection);
238-
239235
let fork_collection_transition =
240236
alive_collection_id_strategy
241237
.clone()
@@ -277,11 +273,11 @@ impl ReferenceStateMachine for ReferenceGarbageCollector {
277273

278274
if alive_collection_ids.is_empty() {
279275
if state.root_collection_id.is_some() {
280-
// If all collections are deleted, we cannot create a new collection, so there is nothing further to do
276+
// If all collections are deleted, we cannot create a new collection
281277
return Just(Transition::NoOp).boxed();
282278
}
283279

284-
return prop_oneof![create_collection_transition,].boxed();
280+
return create_collection_transition.boxed();
285281
}
286282

287283
// While the garbage collector can technically run on any collection in a fork tree, we always run it on the root collection as the test fixture will call `ListCollectionsToGc()` which only returns the root collection.
@@ -293,10 +289,10 @@ impl ReferenceStateMachine for ReferenceGarbageCollector {
293289
});
294290

295291
return prop_oneof![
296-
2 => fork_collection_transition,
297-
3 => increment_collection_version_transition,
298-
2 => garbage_collect_transition,
299-
1 => delete_collection_transition,
292+
5 => increment_collection_version_transition,
293+
4 => fork_collection_transition,
294+
4 => garbage_collect_transition,
295+
3 => delete_collection_transition,
300296
]
301297
.boxed();
302298
}

rust/garbage_collector/tests/proptest_helpers/garbage_collector_under_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ define_thread_local_stats!(STATS);
3232
pub struct GarbageCollectorUnderTest {
3333
runtime: Arc<tokio::runtime::Runtime>,
3434
tenant: String,
35-
database: String,
35+
database_name: String,
3636
sysdb: SysDb,
3737
storage: Storage,
3838
root_manager: RootManager,
@@ -125,7 +125,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
125125
Self {
126126
runtime: ref_state.runtime.clone(),
127127
tenant: tenant_name,
128-
database: database_name,
128+
database_name,
129129
sysdb,
130130
storage: storage.clone(),
131131
root_manager,
@@ -192,7 +192,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
192192
.sysdb
193193
.create_collection(
194194
state.tenant.clone(),
195-
state.database.clone(),
195+
state.database_name.clone(),
196196
collection_id,
197197
format!("Collection {}", collection_id),
198198
segments,
@@ -211,7 +211,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
211211
.runtime
212212
.block_on(state.sysdb.delete_collection(
213213
state.tenant.clone(),
214-
state.database.clone(),
214+
state.database_name.clone(),
215215
collection_id,
216216
vec![],
217217
))

0 commit comments

Comments
 (0)