Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions async_batcher/scylladb/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,37 @@ class WriteOperation:
data: dict[str, Any] | None = None


def _validate_operation(op: WriteOperation) -> ValueError | None:
"""Validate a write operation before executing it.

Returns None if valid, or a ValueError describing the issue.
"""
if op.operation == "INSERT" and op.data is None:
return ValueError("data must be provided for INSERT operations")
if op.operation == "UPDATE" and (op.key is None or op.data is None):
return ValueError("key and data must be provided for UPDATE operations")
if op.operation == "DELETE" and op.key is None:
return ValueError("key must be provided for DELETE operations")
return None


class AsyncScyllaDbWriteBatcher(AsyncBatcher[WriteOperation, None]):
"""Batcher for ScyllaDB write operations."""

def process_batch(self, *, batch: list[WriteOperation]) -> list[None | Exception]:
results = []
def process_batch(self, batch: list[WriteOperation]) -> list[None | Exception]:
# Validate all operations before executing any
results: list[None | Exception] = [_validate_operation(op) for op in batch]
valid_ops = [(i, op) for i, (op, err) in enumerate(zip(batch, results)) if err is None]

if not valid_ops:
return results

with BatchQuery() as b:
for op in batch:
for _i, op in valid_ops:
if op.operation == "INSERT":
if op.data is None:
results.append(ValueError("data must be provided for INSERT operations"))
else:
op.model.batch(b).create(**op.data)
results.append(None)
op.model.batch(b).create(**op.data)
elif op.operation == "UPDATE":
if op.key is None or op.data is None:
results.append(ValueError("key and data must be provided for UPDATE operations"))
else:
op.model.objects(**op.key).batch(b).update(**op.data)
results.append(None)
op.model.objects(**op.key).batch(b).update(**op.data)
elif op.operation == "DELETE":
if op.key is None:
results.append(ValueError("key must be provided for DELETE operations"))
else:
op.model.objects(**op.key).batch(b).delete()
results.append(None)
op.model.objects(**op.key).batch(b).delete()
return results
Loading