Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-util-http` no longer contains an instrumentation entrypoint and will not be loaded
automatically by the auto instrumentor.
([#745](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/745))
- `opentelemetry-instrumentation-pika` Bugfix use properties.headers. It will prevent the header injection from raising.
([#740](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/740))

## [1.6.0-0.25b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.0-0.25b0) - 2021-10-13
### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Tracer
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.trace.span import Span


Expand Down Expand Up @@ -40,16 +40,15 @@ def decorated_callback(
body: bytes,
) -> Any:
if not properties:
properties = BasicProperties()
if properties.headers is None:
properties.headers = {}
properties = BasicProperties(headers={})
ctx = propagate.extract(properties.headers, getter=_pika_getter)
if not ctx:
ctx = context.get_current()
span = _get_span(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=task_name,
ctx=ctx,
operation=MessagingOperationValues.RECEIVE,
Expand All @@ -74,12 +73,13 @@ def decorated_function(
mandatory: bool = False,
) -> Any:
if not properties:
properties = BasicProperties()
properties = BasicProperties(headers={})
ctx = context.get_current()
span = _get_span(
tracer,
channel,
properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=ctx,
operation=None,
Expand All @@ -104,6 +104,7 @@ def _get_span(
channel: Channel,
properties: BasicProperties,
task_name: str,
span_kind: SpanKind,
ctx: context.Context,
operation: Optional[MessagingOperationValues] = None,
) -> Optional[Span]:
Expand All @@ -113,7 +114,9 @@ def _get_span(
return None
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
context=ctx, name=_generate_span_name(task_name, operation)
context=ctx,
name=_generate_span_name(task_name, operation),
kind=span_kind,
)
if span.is_recording():
_enrich_span(span, channel, properties, task_name, operation)
Expand Down
136 changes: 131 additions & 5 deletions instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
# limitations under the License.
from unittest import TestCase, mock

from pika.channel import Channel
from pika.spec import Basic, BasicProperties

from opentelemetry.instrumentation.pika import utils
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, Tracer
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Span, SpanKind, Tracer


class TestUtils(TestCase):
Expand All @@ -32,12 +38,15 @@ def test_get_span(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = None
ctx = mock.MagicMock()
_ = utils._get_span(tracer, channel, properties, task_name, ctx)
_ = utils._get_span(
tracer, channel, properties, task_name, span_kind, ctx
)
generate_span_name.assert_called_once()
tracer.start_span.assert_called_once_with(
context=ctx, name=generate_span_name.return_value
context=ctx, name=generate_span_name.return_value, kind=span_kind
)
enrich_span.assert_called_once()

Expand All @@ -54,9 +63,12 @@ def test_get_span_suppressed(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = True
ctx = mock.MagicMock()
span = utils._get_span(tracer, channel, properties, task_name, ctx)
span = utils._get_span(
tracer, channel, properties, task_name, span_kind, ctx
)
self.assertEqual(span, None)
generate_span_name.assert_not_called()
enrich_span.assert_not_called()
Expand Down Expand Up @@ -158,3 +170,117 @@ def test_enrich_span_unique_connection() -> None:
),
],
)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.extract")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_callback(
self,
use_span: mock.MagicMock,
extract: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
mock_task_name = "mock_task_name"
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_callback = utils._decorate_callback(
callback, tracer, mock_task_name
)
retval = decorated_callback(channel, method, properties, mock_body)
extract.assert_called_once_with(
properties.headers, getter=utils._pika_getter
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=mock_task_name,
ctx=extract.return_value,
operation=MessagingOperationValues.RECEIVE,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
callback.assert_called_once_with(
channel, method, properties, mock_body
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish(
self,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(
channel, method, mock_body, properties
)
get_current.assert_called_once()
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=get_current.return_value,
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
channel, method, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
@mock.patch("pika.spec.BasicProperties.__new__")
def test_decorate_basic_publish_no_properties(
self,
basic_properties: mock.MagicMock,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(channel, method, body=mock_body)
basic_properties.assert_called_once_with(BasicProperties, headers={})
get_current.assert_called_once()
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)