Skip to content

Celery instrument didn't propagate baggage #784

@homholueng

Description

@homholueng

Describe your environment

  • Platform: Darwin-20.3.0-x86_64-i386-64bit
  • Python Version: Python 3.6.7
  • Installed Dependencies:
aiocontextvars==0.2.2
amqp==2.6.1
appnope==0.1.2
backcall==0.2.0
billiard==3.6.4.0
black==21.9b0
celery==4.4.7
click==8.0.3
contextvars==2.4
dataclasses==0.8
decorator==5.1.0
Deprecated==1.2.13
googleapis-common-protos==1.53.0
grpcio==1.41.1
immutables==0.16
importlib-metadata==4.8.1
ipython==7.0.0
ipython-genutils==0.2.0
jedi==0.18.0
kombu==4.6.11
mypy-extensions==0.4.3
opentelemetry-api==1.6.2
opentelemetry-exporter-jaeger==1.6.2
opentelemetry-exporter-jaeger-proto-grpc==1.6.2
opentelemetry-exporter-jaeger-thrift==1.6.2
opentelemetry-instrumentation==0.25b2
opentelemetry-instrumentation-celery==0.25b2
opentelemetry-sdk==1.6.2
opentelemetry-semantic-conventions==0.25b2
parso==0.8.2
pathspec==0.9.0
pexpect==4.8.0
pickleshare==0.7.5
platformdirs==2.4.0
prompt-toolkit==2.0.10
protobuf==3.19.1
ptyprocess==0.7.0
Pygments==2.10.0
pytz==2021.3
regex==2021.10.23
simplegeneric==0.8.1
six==1.16.0
thrift==0.15.0
tomli==1.2.2
traitlets==4.3.3
typed-ast==1.4.3
typing-extensions==3.10.0.2
vine==1.3.0
wcwidth==0.2.5
wrapt==1.13.2
zipp==3.6.0

Steps to reproduce

Code Sample:

from opentelemetry import trace, baggage
from opentelemetry.context import attach, detach
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init


@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()


app = Celery("my_tasks", broker="amqp://localhost")


@app.task
def add(x, y):
    print("### project_id: %s" % baggage.get_baggage("project_id"))
    return x + y


if __name__ == "__main__":
    trace.set_tracer_provider(TracerProvider())
    span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    CeleryInstrumentor().instrument()
    ctx = baggage.set_baggage("project_id", "project-1")
    attach(ctx)
    add.delay(42, 50)
    detach(ctx)
  1. start celery worker with command: celery -A my_tasks worker
  2. send task with command: python my_tasks.py

What is the expected behavior?

The worker should output:

[2021-10-31 16:45:38,867: WARNING/ForkPoolWorker-7] ### project_id: project-1

What is the actual behavior?

The actual worker output:

[2021-10-31 16:45:38,867: WARNING/ForkPoolWorker-7] ### project_id: None

Additional context

Clue1

baggage header are present in rabbitmq message:

image

Clue2

Dive into the code, I found the baggage has been extracted.

image

Try to modify the code

Base on clue1 and clue2, I try to modify opentelemetry.instrumentation.celery.__init__.CeleryInstrumentor._trace_prerun function and this is what I do:

    def _trace_prerun(self, *args, **kwargs):
        task = utils.retrieve_task(kwargs)
        task_id = utils.retrieve_task_id(kwargs)

        if task is None or task_id is None:
            return

        request = task.request
        tracectx = extract(request, getter=celery_getter) or None
        
        ### add this line
        attach(tracectx)
        ### add this line

        logger.debug("prerun signal start task_id=%s", task_id)

        operation_name = f"{_TASK_RUN}/{task.name}"
        span = self._tracer.start_span(
            operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
        )

        activation = trace.use_span(span, end_on_exit=True)
        activation.__enter__()  # pylint: disable=E1101
        utils.attach_span(task, task_id, (span, activation))

and then the worker output is work as expected:

[2021-10-31 16:55:26,745: WARNING/ForkPoolWorker-7] ### project_id: project-1

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions