-
Notifications
You must be signed in to change notification settings - Fork 890
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe your environment
- Platform:
Darwin-20.3.0-x86_64-i386-64bit - Python Version:
Python 3.6.7 - Installed Dependencies:
aiocontextvars==0.2.2
amqp==2.6.1
appnope==0.1.2
backcall==0.2.0
billiard==3.6.4.0
black==21.9b0
celery==4.4.7
click==8.0.3
contextvars==2.4
dataclasses==0.8
decorator==5.1.0
Deprecated==1.2.13
googleapis-common-protos==1.53.0
grpcio==1.41.1
immutables==0.16
importlib-metadata==4.8.1
ipython==7.0.0
ipython-genutils==0.2.0
jedi==0.18.0
kombu==4.6.11
mypy-extensions==0.4.3
opentelemetry-api==1.6.2
opentelemetry-exporter-jaeger==1.6.2
opentelemetry-exporter-jaeger-proto-grpc==1.6.2
opentelemetry-exporter-jaeger-thrift==1.6.2
opentelemetry-instrumentation==0.25b2
opentelemetry-instrumentation-celery==0.25b2
opentelemetry-sdk==1.6.2
opentelemetry-semantic-conventions==0.25b2
parso==0.8.2
pathspec==0.9.0
pexpect==4.8.0
pickleshare==0.7.5
platformdirs==2.4.0
prompt-toolkit==2.0.10
protobuf==3.19.1
ptyprocess==0.7.0
Pygments==2.10.0
pytz==2021.3
regex==2021.10.23
simplegeneric==0.8.1
six==1.16.0
thrift==0.15.0
tomli==1.2.2
traitlets==4.3.3
typed-ast==1.4.3
typing-extensions==3.10.0.2
vine==1.3.0
wcwidth==0.2.5
wrapt==1.13.2
zipp==3.6.0
Steps to reproduce
Code Sample:
from opentelemetry import trace, baggage
from opentelemetry.context import attach, detach
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from celery import Celery
from celery.signals import worker_process_init
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
trace.set_tracer_provider(TracerProvider())
span_processor = BatchSpanProcessor(ConsoleSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)
CeleryInstrumentor().instrument()
app = Celery("my_tasks", broker="amqp://localhost")
@app.task
def add(x, y):
print("### project_id: %s" % baggage.get_baggage("project_id"))
return x + y
if __name__ == "__main__":
trace.set_tracer_provider(TracerProvider())
span_processor = BatchSpanProcessor(ConsoleSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)
CeleryInstrumentor().instrument()
ctx = baggage.set_baggage("project_id", "project-1")
attach(ctx)
add.delay(42, 50)
detach(ctx)- start celery worker with command:
celery -A my_tasks worker - send task with command:
python my_tasks.py
What is the expected behavior?
The worker should output:
[2021-10-31 16:45:38,867: WARNING/ForkPoolWorker-7] ### project_id: project-1
What is the actual behavior?
The actual worker output:
[2021-10-31 16:45:38,867: WARNING/ForkPoolWorker-7] ### project_id: None
Additional context
Clue1
baggage header are present in rabbitmq message:
Clue2
Dive into the code, I found the baggage has been extracted.
Try to modify the code
Base on clue1 and clue2, I try to modify opentelemetry.instrumentation.celery.__init__.CeleryInstrumentor._trace_prerun function and this is what I do:
def _trace_prerun(self, *args, **kwargs):
task = utils.retrieve_task(kwargs)
task_id = utils.retrieve_task_id(kwargs)
if task is None or task_id is None:
return
request = task.request
tracectx = extract(request, getter=celery_getter) or None
### add this line
attach(tracectx)
### add this line
logger.debug("prerun signal start task_id=%s", task_id)
operation_name = f"{_TASK_RUN}/{task.name}"
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)
activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=E1101
utils.attach_span(task, task_id, (span, activation))and then the worker output is work as expected:
[2021-10-31 16:55:26,745: WARNING/ForkPoolWorker-7] ### project_id: project-1
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working

