1414
1515import abc
1616import atexit
17+ import concurrent .futures
1718import logging
1819import os
19- from typing import Any , Optional , cast
20+ import threading
21+ from typing import Any , Callable , Optional , Tuple , Union , cast
2022
2123from opentelemetry .sdk .environment_variables import (
2224 OTEL_PYTHON_LOG_EMITTER_PROVIDER ,
2729from opentelemetry .trace import get_current_span
2830from opentelemetry .trace .span import TraceFlags
2931from opentelemetry .util ._providers import _load_provider
32+ from opentelemetry .util ._time import _time_ns
3033from opentelemetry .util .types import Attributes
3134
3235_logger = logging .getLogger (__name__ )
@@ -112,6 +115,135 @@ def force_flush(self, timeout_millis: int = 30000):
112115 """
113116
114117
118+ # Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
119+ # pylint:disable=no-member
120+ class SynchronousMultiLogProcessor (LogProcessor ):
121+ """Implementation of class:`LogProcessor` that forwards all received
122+ events to a list of log processors sequentially.
123+
124+ The underlying log processors are called in sequential order as they were
125+ added.
126+ """
127+
128+ def __init__ (self ):
129+ # use a tuple to avoid race conditions when adding a new log and
130+ # iterating through it on "emit".
131+ self ._log_processors = () # type: Tuple[LogProcessor, ...]
132+ self ._lock = threading .Lock ()
133+
134+ def add_log_processor (self , log_processor : LogProcessor ) -> None :
135+ """Adds a Logprocessor to the list of log processors handled by this instance"""
136+ with self ._lock :
137+ self ._log_processors = self ._log_processors + (log_processor ,)
138+
139+ def emit (self , log_data : LogData ) -> None :
140+ for lp in self ._log_processors :
141+ lp .emit (log_data )
142+
143+ def shutdown (self ) -> None :
144+ """Shutdown the log processors one by one"""
145+ for lp in self ._log_processors :
146+ lp .shutdown ()
147+
148+ def force_flush (self , timeout_millis : int = 30000 ) -> bool :
149+ """Force flush the log processors one by one
150+
151+ Args:
152+ timeout_millis: The maximum amount of time to wait for logs to be
153+ exported. If the first n log processors exceeded the timeout
154+ then remaining log processors will not be flushed.
155+
156+ Returns:
157+ True if all the log processors flushes the logs within timeout,
158+ False otherwise.
159+ """
160+ deadline_ns = _time_ns () + timeout_millis * 1000000
161+ for lp in self ._log_processors :
162+ current_ts = _time_ns ()
163+ if current_ts >= deadline_ns :
164+ return False
165+
166+ if not lp .force_flush ((deadline_ns - current_ts ) // 1000000 ):
167+ return False
168+
169+ return True
170+
171+
172+ class ConcurrentMultiLogProcessor (LogProcessor ):
173+ """Implementation of :class:`LogProcessor` that forwards all received
174+ events to a list of log processors in parallel.
175+
176+ Calls to the underlying log processors are forwarded in parallel by
177+ submitting them to a thread pool executor and waiting until each log
178+ processor finished its work.
179+
180+ Args:
181+ max_workers: The number of threads managed by the thread pool executor
182+ and thus defining how many log processors can work in parallel.
183+ """
184+
185+ def __init__ (self , max_workers : int = 2 ):
186+ # use a tuple to avoid race conditions when adding a new log and
187+ # iterating through it on "emit".
188+ self ._log_processors = () # type: Tuple[LogProcessor, ...]
189+ self ._lock = threading .Lock ()
190+ self ._executor = concurrent .futures .ThreadPoolExecutor (
191+ max_workers = max_workers
192+ )
193+
194+ def add_log_processor (self , log_processor : LogProcessor ):
195+ with self ._lock :
196+ self ._log_processors = self ._log_processors + (log_processor ,)
197+
198+ def _submit_and_wait (
199+ self ,
200+ func : Callable [[LogProcessor ], Callable [..., None ]],
201+ * args : Any ,
202+ ** kwargs : Any ,
203+ ):
204+ futures = []
205+ for lp in self ._log_processors :
206+ future = self ._executor .submit (func (lp ), * args , ** kwargs )
207+ futures .append (future )
208+ for future in futures :
209+ future .result ()
210+
211+ def emit (self , log_data : LogData ):
212+ self ._submit_and_wait (lambda lp : lp .emit , log_data )
213+
214+ def shutdown (self ):
215+ self ._submit_and_wait (lambda lp : lp .shutdown )
216+
217+ def force_flush (self , timeout_millis : int = 30000 ) -> bool :
218+ """Force flush the log processors in parallel.
219+
220+ Args:
221+ timeout_millis: The maximum amount of time to wait for logs to be
222+ exported.
223+
224+ Returns:
225+ True if all the log processors flushes the logs within timeout,
226+ False otherwise.
227+ """
228+ futures = []
229+ for lp in self ._log_processors :
230+ future = self ._executor .submit (lp .force_flush , timeout_millis )
231+ futures .append (future )
232+
233+ done_futures , not_done_futures = concurrent .futures .wait (
234+ futures , timeout_millis / 1e3
235+ )
236+
237+ if not_done_futures :
238+ return False
239+
240+ for future in done_futures :
241+ if not future .result ():
242+ return False
243+
244+ return True
245+
246+
115247class OTLPHandler (logging .Handler ):
116248 """A handler class which writes logging records, in OTLP format, to
117249 a network destination or file.
@@ -155,36 +287,49 @@ def flush(self) -> None:
155287
156288
157289class LogEmitter :
158- # TODO: Add multi_log_processor
159290 def __init__ (
160291 self ,
161292 resource : Resource ,
293+ multi_log_processor : Union [
294+ SynchronousMultiLogProcessor , ConcurrentMultiLogProcessor
295+ ],
162296 instrumentation_info : InstrumentationInfo ,
163297 ):
164298 self ._resource = resource
299+ self ._multi_log_processor = multi_log_processor
165300 self ._instrumentation_info = instrumentation_info
166301
167302 @property
168303 def resource (self ):
169304 return self ._resource
170305
171306 def emit (self , record : LogRecord ):
172- # TODO: multi_log_processor.emit
173- pass
307+ """Emits the :class:`LogData` by associating :class:`LogRecord`
308+ and instrumentation info.
309+ """
310+ log_data = LogData (record , self ._instrumentation_info )
311+ self ._multi_log_processor .emit (log_data )
174312
313+ # TODO: Should this flush everything in pipeline?
314+ # Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290
175315 def flush (self ):
176- # TODO: multi_log_processor.force_flush
177- pass
316+ """Ensure all logging output has been flushed."""
317+ self . _multi_log_processor . force_flush ()
178318
179319
180320class LogEmitterProvider :
181- # TODO: Add multi_log_processor
182321 def __init__ (
183322 self ,
184323 resource : Resource = Resource .create (),
185324 shutdown_on_exit : bool = True ,
325+ multi_log_processor : Union [
326+ SynchronousMultiLogProcessor , ConcurrentMultiLogProcessor
327+ ] = None ,
186328 ):
187329 self ._resource = resource
330+ self ._multi_log_processor = (
331+ multi_log_processor or SynchronousMultiLogProcessor ()
332+ )
188333 self ._at_exit_handler = None
189334 if shutdown_on_exit :
190335 self ._at_exit_handler = atexit .register (self .shutdown )
@@ -200,6 +345,7 @@ def get_log_emitter(
200345 ) -> LogEmitter :
201346 return LogEmitter (
202347 self ._resource ,
348+ self ._multi_log_processor ,
203349 InstrumentationInfo (
204350 instrumenting_module_name , instrumenting_module_verison
205351 ),
@@ -210,11 +356,11 @@ def add_log_processor(self, log_processor: LogProcessor):
210356
211357 The log processors are invoked in the same order they are registered.
212358 """
213- # TODO: multi_log_processor.add_log_processor.
359+ self . _multi_log_processor . add_log_processor ( log_processor )
214360
215361 def shutdown (self ):
216362 """Shuts down the log processors."""
217- # TODO: multi_log_processor. shutdown
363+ self . _multi_log_processor . shutdown ()
218364 if self ._at_exit_handler is not None :
219365 atexit .unregister (self ._at_exit_handler )
220366 self ._at_exit_handler = None
@@ -230,7 +376,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
230376 True if all the log processors flushes the logs within timeout,
231377 False otherwise.
232378 """
233- # TODO: multi_log_processor. force_flush
379+ return self . _multi_log_processor . force_flush ( timeout_millis )
234380
235381
236382_LOG_EMITTER_PROVIDER = None
0 commit comments