feat(spans): Distribute span payload keys across Redis cluster#110593
feat(spans): Distribute span payload keys across Redis cluster#110593
Conversation
Spread span payload sets across Redis cluster nodes to eliminate
expensive colocated set merges (SMEMBERS+SADD) that block a single
node for large segments.
Instead of colocating all payload keys under {project_id:trace_id},
write them to {project_id:trace_id:span_id} so they shard across
nodes. A member-keys tracking set (span-buf:mk) indexes which
distributed keys belong to each segment.
Three-phase rollout:
- Phase 1 (distribute-payload-keys): Dual-write to both key formats,
read from colocated.
- Phase 2 (distribute-payload-keys-read): Dual-write continues,
flusher reads from distributed keys.
- Phase 3 (distribute-payload-keys-stop-colocated): Stop colocated
writes.
5249883 to
7b770e9
Compare
| project_id_bytes, _, _ = parse_segment_key(key) | ||
| project_id = int(project_id_bytes) | ||
| project_id_int = int(project_id_bytes) | ||
| try: | ||
| project = Project.objects.get_from_cache(id=project_id) | ||
| project = Project.objects.get_from_cache(id=project_id_int) | ||
| except Project.DoesNotExist: | ||
| logger.warning( | ||
| "Project does not exist for segment with dropped spans", | ||
| extra={"project_id": project_id}, | ||
| extra={"project_id": project_id_int}, | ||
| ) | ||
| else: | ||
| track_outcome( | ||
| org_id=project.organization_id, | ||
| project_id=project_id, | ||
| project_id=project_id_int, |
There was a problem hiding this comment.
renaming since I'm declaring project_id above as type bytes, so redeclaring as int will cause mypy errors.
evanh
left a comment
There was a problem hiding this comment.
I think this all makes sense. Nice job!
| scan_key_to_segment[key] = key | ||
| cursors[key] = 0 | ||
|
|
||
| self._distributed_payload_keys_map = {} |
There was a problem hiding this comment.
Why did you make _distributed_payload_keys_map an attribute of the class if you are supposed to manage it only in this method or within an iteration to flush?
Having it as a mutable attribute is a liability as it can easily cause race conditions we can prevent. It is not given that we will only call process_spans or flush segment from the same thread.
IF you need it only through an iteration of the flush method please make it a local variable. You can pass it back from load_segment_data
| project_id_bytes, _, _ = parse_segment_key(key) | ||
| project_id = int(project_id_bytes) | ||
| project_id_int = int(project_id_bytes) | ||
| try: | ||
| project = Project.objects.get_from_cache(id=project_id) | ||
| project = Project.objects.get_from_cache(id=project_id_int) | ||
| except Project.DoesNotExist: | ||
| logger.warning( | ||
| "Project does not exist for segment with dropped spans", | ||
| extra={"project_id": project_id}, | ||
| extra={"project_id": project_id_int}, | ||
| ) | ||
| else: | ||
| track_outcome( | ||
| org_id=project.organization_id, | ||
| project_id=project_id, | ||
| project_id=project_id_int, |
| read_distributed_payloads = options.get("spans.buffer.read-distributed-payloads") | ||
| write_distributed_payloads = options.get("spans.buffer.write-distributed-payloads") |
There was a problem hiding this comment.
When you are adding multiple ways to perform some operation in a very complex method, adding more branches make things even more complex and hard to follow.
A safer way to proceed is to break the method into multiple. Have a common interface between the two options and use the main method to switch between one implementation and the other.
|
|
||
| for key, sub_span_ids in zip(segment_keys, mk_results): | ||
| project_id, trace_id, _ = parse_segment_key(key) | ||
| pat = f"{project_id.decode('ascii')}:{trace_id.decode('ascii')}" |
| project_id, trace_id, _ = parse_segment_key(key) | ||
| pat = f"{project_id.decode('ascii')}:{trace_id.decode('ascii')}" | ||
| distributed_keys: list[bytes] = [] | ||
| for sub_span_id in sub_span_ids: |
Spread span payload sets across Redis cluster nodes to avoid concentrated large traces on a single node.
Instead of merging all payloads under {project_id:trace_id}, write them to {project_id:trace_id:span_id} so they shard across nodes. A member-keys tracking set (span-buf:mk) indexes which distributed keys belong to each segment.
Three-phase rollout (similar to the ZSET to SET change):