|
2 | 2 |
|
3 | 3 | import itertools |
4 | 4 | from collections.abc import Sequence |
| 5 | +from typing import cast |
5 | 6 | from unittest import mock |
6 | 7 |
|
7 | 8 | import orjson |
8 | 9 | import pytest |
| 10 | +import sentry_kafka_schemas |
| 11 | +from sentry_kafka_schemas.schema_types.ingest_spans_v1 import SpanEvent |
9 | 12 | from sentry_redis_tools.clients import StrictRedis |
10 | 13 |
|
| 14 | +from sentry.conf.types.kafka_definition import Topic, get_topic_codec |
11 | 15 | from sentry.spans.buffer import FlushedSegment, OutputSpan, Span, SpansBuffer |
| 16 | +from sentry.spans.consumers.process.factory import SPANS_CODEC, validate_span_event |
| 17 | +from sentry.spans.consumers.process_segments.types import attribute_value |
12 | 18 | from sentry.spans.segment_key import SegmentKey |
13 | 19 | from sentry.testutils.helpers.options import override_options |
14 | 20 |
|
|
32 | 38 | "spans.buffer.evalsha-latency-threshold": 100, |
33 | 39 | "spans.buffer.debug-traces": [], |
34 | 40 | "spans.buffer.evalsha-cumulative-logger-enabled": True, |
| 41 | + "spans.process-segments.schema-validation": 1.0, |
35 | 42 | "spans.buffer.zero-copy-dest-threshold-bytes": 0, |
36 | 43 | "spans.buffer.done-flush-conditional-zrem": False, |
37 | 44 | "spans.buffer.write-distributed-payloads": False, |
@@ -1646,3 +1653,69 @@ def test_distributed_transition_write_then_read() -> None: |
1646 | 1653 | assert len(rv[seg_key].spans) == 2 |
1647 | 1654 | buf.done_flush_segments(rv) |
1648 | 1655 | assert_clean_distributed(buf.client) |
| 1656 | + |
| 1657 | + |
| 1658 | +def _get_schema_examples(): |
| 1659 | + """Load all ingest-spans schema examples for parametrization.""" |
| 1660 | + examples = [] |
| 1661 | + for ex in sentry_kafka_schemas.iter_examples("ingest-spans"): |
| 1662 | + examples.append(pytest.param(ex.load(), id=ex.path.stem)) |
| 1663 | + return examples |
| 1664 | + |
| 1665 | + |
| 1666 | +@pytest.mark.parametrize("example", _get_schema_examples()) |
| 1667 | +def test_schema_examples(buffer: SpansBuffer, example: dict) -> None: |
| 1668 | + """ |
| 1669 | + Feed official ingest-spans schema examples through the buffer pipeline |
| 1670 | + to verify they are handled without errors. |
| 1671 | + """ |
| 1672 | + # Replicate the parsing logic from process_batch() in factory.py |
| 1673 | + segment_id = cast(str | None, attribute_value(example, "sentry.segment.id")) |
| 1674 | + validate_span_event(cast(SpanEvent, example), segment_id) |
| 1675 | + |
| 1676 | + payload = orjson.dumps(example) |
| 1677 | + |
| 1678 | + span = Span( |
| 1679 | + trace_id=example["trace_id"], |
| 1680 | + span_id=example["span_id"], |
| 1681 | + parent_span_id=example.get("parent_span_id"), |
| 1682 | + segment_id=segment_id, |
| 1683 | + project_id=example["project_id"], |
| 1684 | + payload=payload, |
| 1685 | + end_timestamp=cast(float, example["end_timestamp"]), |
| 1686 | + is_segment_span=bool(example.get("parent_span_id") is None or example.get("is_segment")), |
| 1687 | + ) |
| 1688 | + |
| 1689 | + process_spans([span], buffer, now=0) |
| 1690 | + assert_ttls(buffer.client) |
| 1691 | + |
| 1692 | + # Flush past both root-timeout (10s) and timeout (60s) |
| 1693 | + rv = buffer.flush_segments(now=61) |
| 1694 | + |
| 1695 | + assert len(rv) == 1 |
| 1696 | + segment = list(rv.values())[0] |
| 1697 | + assert len(segment.spans) == 1 |
| 1698 | + |
| 1699 | + output_span = segment.spans[0] |
| 1700 | + assert output_span.payload["span_id"] == example["span_id"] |
| 1701 | + assert output_span.payload["trace_id"] == example["trace_id"] |
| 1702 | + |
| 1703 | + # Verify top-level keys are preserved (except is_segment and attributes |
| 1704 | + # which the buffer modifies) |
| 1705 | + for key in example: |
| 1706 | + if key in ("is_segment", "attributes"): |
| 1707 | + continue |
| 1708 | + assert key in output_span.payload, f"Key {key!r} missing from output payload" |
| 1709 | + |
| 1710 | + # Validate that the output span still conforms to the ingest-spans schema. |
| 1711 | + # It's not explicitly written anywhere that the spans schema in |
| 1712 | + # buffered-segments is the same one as the input schema, but right now |
| 1713 | + # that's what it is. |
| 1714 | + SPANS_CODEC.validate(cast(SpanEvent, output_span.payload)) |
| 1715 | + |
| 1716 | + # Validate that the assembled segment conforms to the buffered-segments schema |
| 1717 | + buffered_segments_codec = get_topic_codec(Topic.BUFFERED_SEGMENTS) |
| 1718 | + buffered_segments_codec.validate({"spans": [span.payload for span in segment.spans]}) |
| 1719 | + |
| 1720 | + buffer.done_flush_segments(rv) |
| 1721 | + assert_clean(buffer.client) |
0 commit comments