Skip to content

Commit 0bf375c

Browse files
authored
fix: make apply patch not fail the pipeline (#690)
With this change events with bad patches or data will just end up with null states instead of preventing the entire pipeline from processing more data.
1 parent 30534bf commit 0bf375c

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

pipeline/src/aggregator/model_instance_patch.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use datafusion::{
1717
},
1818
};
1919
use json_patch::PatchOperation;
20+
use tracing::warn;
2021

2122
use super::{bytes_value_at, u32_value_at, EventDataContainer};
2223

@@ -218,9 +219,17 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
218219
} else {
219220
let patch = patches.value(i);
220221
resolved_patches.append_value(patch);
221-
let (data, model_version) = Self::apply_patch(patch, previous_state)?;
222-
new_states.append_value(data);
223-
model_versions.append_option(model_version.map(|mv| mv.to_bytes()));
222+
match Self::apply_patch(patch, previous_state) {
223+
Ok((data, model_version)) => {
224+
new_states.append_value(data);
225+
model_versions.append_option(model_version.map(|mv| mv.to_bytes()));
226+
}
227+
Err(err) => {
228+
warn!(%err, event_cid=?Cid::read_bytes(event_cids.value(i)), "failed to apply patch to model instance event");
229+
new_states.append_null();
230+
model_versions.append_null();
231+
}
232+
};
224233
}
225234
} else {
226235
// Unreachable when data is well formed.

pipeline/src/aggregator/model_patch.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use arrow::{
55
datatypes::DataType,
66
};
77
use arrow_schema::Field;
8+
use cid::Cid;
89
use datafusion::{
910
common::{
1011
cast::{as_binary_array, as_uint32_array},
@@ -16,6 +17,7 @@ use datafusion::{
1617
},
1718
};
1819
use json_patch::PatchOperation;
20+
use tracing::warn;
1921

2022
use super::{bytes_value_at, u32_value_at, EventDataContainer};
2123

@@ -168,10 +170,15 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
168170
resolved_previous_states.append_value(&state);
169171
} else {
170172
resolved_previous_states.append_value(previous_state);
171-
new_states.append_value(CeramicPatchEvaluator::apply_patch(
172-
patches.value(i),
173-
previous_state,
174-
)?);
173+
match Self::apply_patch(patches.value(i), previous_state) {
174+
Ok(data) => {
175+
new_states.append_value(data);
176+
}
177+
Err(err) => {
178+
warn!(%err, event_cid=?Cid::read_bytes(event_cids.value(i)), "failed to apply patch to model event");
179+
new_states.append_null();
180+
}
181+
};
175182
}
176183
} else {
177184
// Unreachable when data is well formed.

0 commit comments

Comments
 (0)