Add support for multiple pipelines in OpenTelemetryBuilder with Activity#735
Add support for multiple pipelines in OpenTelemetryBuilder with Activity#735cijothomas merged 6 commits intoopen-telemetry:masterfrom cijothomas:cijothomas/activityprocessingpipeline
Conversation
|
|
||
| namespace OpenTelemetry.Trace.Export.Internal | ||
| { | ||
| internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable |
There was a problem hiding this comment.
The name is misleading since "broadcast" normally means there is no need to specify the target.
Probably borrow from Java/Python:
- Java MultiSpanExporter
- Python SynchronousMultiSpanProcessor
| { | ||
| internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable | ||
| { | ||
| private readonly IEnumerable<ActivityProcessor> processors; |
There was a problem hiding this comment.
Potential perf concern:
IEnumerable<T> implies a new enumerator object creation on the hot path (each span will result in a foreach (var processor in this.processors).
Probably worth considering array or list (which maintains the count and can be accessed via index).
| throw new ArgumentException($"{nameof(processors)} collection is empty"); | ||
| } | ||
|
|
||
| this.processors = processors; |
There was a problem hiding this comment.
Should this be a shallow copy? What if the passed in processors got updated later?
| using OpenTelemetry.Trace.Export; | ||
| using Xunit; | ||
|
|
||
| namespace OpenTelemetry.Tests.Impl.Trace.Config |
There was a problem hiding this comment.
nit: "Impl" -> "Implementation"
There was a problem hiding this comment.
Yes. we need to rearrange this quite a lot. "impl" is used in many other places as well. - i'll do it as a separate PR where we will do consistent namespace. If product is namespace, then test would be namespace.test. something like that. (currently its a mix with no consistency)
| } | ||
| } | ||
|
|
||
| public override void OnStart(Activity activity) |
There was a problem hiding this comment.
nit: Probably more common to have Start before End.
| return Task.WhenAll(tasks); | ||
| } | ||
|
|
||
| public void Dispose() |
There was a problem hiding this comment.
This is different from what I've normally seen, worth double checking https://docs.microsoft.com/en-us/dotnet/standard/garbage-collection/implementing-dispose#implement-the-dispose-pattern.
There was a problem hiding this comment.
IMO because it's internal it's fine as-is. That being said, OpenTelemetrySdk isn't calling shutdown or dispose on the ActivityProcessor it is maintaining. That's a problem. We need to flush out spans & perform cleanup on app shutdown right? I think OpenTelemetrySdk.Dispose should call ActivityProcessor.Dispose which should invoke ActivityProcessor.ShutdownAsync? We might want to look at using IAsyncDisposable for that.
There was a problem hiding this comment.
There are few things we need to address:
- Improvements to SimpleSpanProcessor. #674
SimpleSpanProcessorneeds to be a synchronous call (e.g. when the control is returned to the caller, the span got processed - current it is not guaranteed). - The life-cycle management of processors/exporters, related to this comment. For example, if we have the same processor/exporter registered twice, would we flush the exporter on the first
OnEnd, or maintain a refcount and callOnEndduringDispose? DoesOnEndneed to be ref-counted as well? - We need to have a deterministic flush semantic. By default I would imagine we want to flush traces/logs before app shutdown, metrics is something debatable. And we need to give option to the user whether they want to wait indefinitely or have a timeout or no wait at all.
There was a problem hiding this comment.
Agree with all of the above - But my goal with this PR is to just mimic the existing behavior with Spans (this pr was made with pure 'copy paste & minor adjustments' ). I'd prefer to get the parity 1st, then fix underlying issues.
Let me know if this approach is okay?
There was a problem hiding this comment.
Yeah, these are not blockers for this PR.
There was a problem hiding this comment.
IMO because it's
internalit's fine as-is. That being said,OpenTelemetrySdkisn't calling shutdown or dispose on theActivityProcessorit is maintaining. That's a problem. We need to flush out spans & perform cleanup on app shutdown right? I think OpenTelemetrySdk.Dispose should call ActivityProcessor.Dispose which should invoke ActivityProcessor.ShutdownAsync? We might want to look at using IAsyncDisposable for that.
Yes this is needed. Modified OTSDK to call dispose on the Activityprocessor it has.
There was a problem hiding this comment.
Will follow up on why Span was not calling Shutdown in SimpleProcessor! (Was doing it right for batching one). Anyway - both will be fixed in next PR.
| tasks.Add(processor.ShutdownAsync(cancellationToken)); | ||
| } | ||
|
|
||
| return Task.WhenAll(tasks); |
There was a problem hiding this comment.
The cancellationToken is passed to all ShutdownAsync but if one misbehaves this line can wait forever. Is this relying on cancellation by the caller?
There was a problem hiding this comment.
The current code is vulnerable to one misbehaving processor. Not just in shutdown but even in start/stop as well. We need to address it properly.
…b.com/cijothomas/opentelemetry-dotnet into cijothomas/activityprocessingpipeline
|
Merging this as this makes Activity processing pipeline as same features as Span pipeline. |
Towards 1c in #684
Changes
Add ability to have multiple processing pipeline, exposed via
AddProcessorPipelineonOpenTelemetryBuilder. (this capability already existed in Span based pipeline)Checklist
For significant contributions please make sure you have completed the following items:
The PR will automatically request reviews from code owners and trigger CI build and tests.