-
Notifications
You must be signed in to change notification settings - Fork 2k
[ENH]: Persist a backfill record on log to trigger backfill #5897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
55ca743 to
cae0462
Compare
c11f0ae to
4629c97
Compare
4629c97 to
a680a54
Compare
cae0462 to
b2e7b15
Compare
a680a54 to
beaea83
Compare
|
Persist ‘backfill_fn’ log entry and execution path to automatically back-fill newly attached functions This PR introduces a durable, end-to-end mechanism that guarantees all previously ingested records are re-processed whenever a user attaches a new function to an existing collection. A new log entry type ( The change is additive and fully backward compatible: older binaries that do not understand the new enum will simply ignore it, so no wire-protocol or DB migrations are required. Normal operation is unaffected unless the new log entry is present. Key Changes• Added Affected Areas• Protocol definitions (protobuf) This summary was automatically generated by @propel-code-bot |
| self.log_client | ||
| .push_logs( | ||
| &tenant, | ||
| collection_id, | ||
| vec![OperationRecord { | ||
| id: "backfill_id".to_string(), | ||
| embedding: Some(fake_embedding), | ||
| encoding: None, | ||
| metadata: None, | ||
| document: None, | ||
| operation: Operation::BackfillFn, | ||
| }], | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Resource leak in error path: The start_backfill function pushes a fake backfill record to the log, but if push_logs fails, the collection state may become inconsistent since the function was already registered in step 1. The sysdb update in step 3 won't execute, leaving the system in a partial state.
// Add cleanup or make this transactional
if let Err(e) = self.log_client.push_logs(...).await {
// Rollback function registration or mark as failed
self.sysdb_client.detach_function(attached_function_id).await?;
return Err(e);
}Context for Agents
**Resource leak in error path**: The `start_backfill` function pushes a fake backfill record to the log, but if `push_logs` fails, the collection state may become inconsistent since the function was already registered in step 1. The sysdb update in step 3 won't execute, leaving the system in a partial state.
```rust
// Add cleanup or make this transactional
if let Err(e) = self.log_client.push_logs(...).await {
// Rollback function registration or mark as failed
self.sysdb_client.detach_function(attached_function_id).await?;
return Err(e);
}
```
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1992| collection_id, | ||
| vec![OperationRecord { | ||
| id: "backfill_id".to_string(), | ||
| embedding: Some(fake_embedding), | ||
| encoding: None, | ||
| metadata: None, | ||
| document: None, | ||
| operation: Operation::BackfillFn, | ||
| }], | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Race condition: Non-idempotent backfill operation: This code pushes a backfill record with a hardcoded ID "backfill_id". If start_backfill is called multiple times (retry, duplicate request, etc.), it will push duplicate backfill records with the same ID, potentially triggering multiple backfill operations.
// Use unique ID per invocation
OperationRecord {
id: format!("backfill_{}_{}", collection_id, Uuid::new_v4()),
// ... rest of fields
}Or add deduplication logic to check if backfill is already in progress before pushing the record.
Context for Agents
**Race condition: Non-idempotent backfill operation**: This code pushes a backfill record with a hardcoded ID `"backfill_id"`. If `start_backfill` is called multiple times (retry, duplicate request, etc.), it will push duplicate backfill records with the same ID, potentially triggering multiple backfill operations.
```rust
// Use unique ID per invocation
OperationRecord {
id: format!("backfill_{}_{}", collection_id, Uuid::new_v4()),
// ... rest of fields
}
```
Or add deduplication logic to check if backfill is already in progress before pushing the record.
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1991beaea83 to
25b2038
Compare
rust/types/src/strategies.rs
Outdated
| } | ||
| Operation::Delete => None, | ||
| Operation::BackfillFn => { | ||
| // TODO(tanujnay112): Can we even reach here with this in prod? Ask @jason. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be done before PR?
b2e7b15 to
d700cc7
Compare
25b2038 to
8234603
Compare
| .ok_or_else(|| { | ||
| AttachFunctionError::GetCollectionError(GetCollectionError::CollectionNotFound( | ||
| collection_id.to_string(), | ||
| )) | ||
| })?; | ||
| let fake_embedding = vec![0.0; embedding_dim as usize]; | ||
| // TODO(tanujnay112): Make this either a configurable or better yet a separate | ||
| // RPC to the logs service. | ||
| let num_fake_logs = 250; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
Performance issue: Allocating 250 fake logs with full embedding vectors will consume significant memory. With embedding_dim potentially being 1536 (common for OpenAI embeddings), this creates 250 * 1536 * 8 bytes = ~3MB per call. If multiple backfills run concurrently, this could exhaust memory.
Consider:
- Making
num_fake_logssmaller (e.g., 10-50) - Streaming logs in batches instead of allocating all at once
- Using a smaller sentinel embedding for backfill markers
// Option 3: Use minimal embedding for backfill marker
let fake_embedding = vec![0.0; 1]; // Single sentinel valueContext for Agents
**Performance issue**: Allocating 250 fake logs with full embedding vectors will consume significant memory. With `embedding_dim` potentially being 1536 (common for OpenAI embeddings), this creates `250 * 1536 * 8 bytes = ~3MB` per call. If multiple backfills run concurrently, this could exhaust memory.
Consider:
1. Making `num_fake_logs` smaller (e.g., 10-50)
2. Streaming logs in batches instead of allocating all at once
3. Using a smaller sentinel embedding for backfill markers
```rust
// Option 3: Use minimal embedding for backfill marker
let fake_embedding = vec![0.0; 1]; // Single sentinel value
```
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 19848234603 to
2b54073
Compare
Merge activity
|
2b54073 to
a53270d
Compare
| ctx, | ||
| ) | ||
| .await; | ||
| if self.has_backfill { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Logic bug: The condition if self.has_backfill only checks a boolean flag that's set to true if any MaterializeLogOutput has backfill. However, the code takes all materialized outputs (including non-backfill ones) and returns them as RequireFunctionBackfill.
This means if you have:
- 5 normal log records materialized
- 1 backfill log record materialized
All 6 records are sent to the backfill path, not just the 1 backfill record.
Fix: Filter the materialized outputs:
if self.has_backfill {
let (backfill_outputs, normal_outputs): (Vec<_>, Vec<_>) =
materialized.into_iter().partition(|o| o.result.has_backfill());
if !backfill_outputs.is_empty() {
self.terminate_with_result(
Ok(RequireFunctionBackfill::new(backfill_outputs, collection_info).into()),
ctx,
).await;
}
// Handle normal_outputs separately or in next iteration
}Context for Agents
**Logic bug**: The condition `if self.has_backfill` only checks a boolean flag that's set to `true` if *any* MaterializeLogOutput has backfill. However, the code takes *all* materialized outputs (including non-backfill ones) and returns them as `RequireFunctionBackfill`.
This means if you have:
- 5 normal log records materialized
- 1 backfill log record materialized
All 6 records are sent to the backfill path, not just the 1 backfill record.
**Fix**: Filter the materialized outputs:
```rust
if self.has_backfill {
let (backfill_outputs, normal_outputs): (Vec<_>, Vec<_>) =
materialized.into_iter().partition(|o| o.result.has_backfill());
if !backfill_outputs.is_empty() {
self.terminate_with_result(
Ok(RequireFunctionBackfill::new(backfill_outputs, collection_info).into()),
ctx,
).await;
}
// Handle normal_outputs separately or in next iteration
}
```
File: rust/worker/src/execution/orchestration/log_fetch_orchestrator.rs
Line: 809
Description of changes
Summarize the changes made by this PR.
This diff persists a backfill signal that goes from a call to
attach_function torun_backfilled_attached_function_workflow, introduced by #5896 . This diff achieves that by pushing a newbackfill_fn log to the concerned input collection's logs. This log should effectively be a no-op on the compactor path as materialize_logs makes sure to ignore these logs. In compact.rs::compact(), if the initialrun_fetch_logs call detects the presence of this backfill log,run_backfilled_attached_function_workflowwill be called. If this call succeeds, we return and ignore the usual compaction flow. If backfill was not needed, the usual pre-existing compaction + attached function execution is called.Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_