Skip to content

Commit 9b222f2

Browse files
authored
Add ForceFlush for all LogRecordExporters and SpanExporters. (#2000)
1 parent 3a09d53 commit 9b222f2

46 files changed

Lines changed: 680 additions & 116 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ Increment the:
2121
[#2060](https://github.com/open-telemetry/opentelemetry-cpp/pull/2060)
2222
* [BUILD] Restore detfault value of `OPENTELEMETRY_INSTALL` to `ON` when it's on
2323
top level.[#2062](https://github.com/open-telemetry/opentelemetry-cpp/pull/2062)
24+
* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
25+
[#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000)
2426

2527
Important changes:
2628

@@ -44,6 +46,12 @@ Important changes:
4446
* As a result, a behavior change for GRPC SSL is possible,
4547
because the endpoint scheme now takes precedence.
4648
Please verify configuration settings for the GRPC endpoint.
49+
* [EXPORTERS]Add `ForceFlush` for `LogRecordExporter` and `SpanExporter`
50+
[#2000](https://github.com/open-telemetry/opentelemetry-cpp/pull/2000)
51+
* `LogRecordExporter` and `SpanExporter` add a new virtual function
52+
`ForceFlush`, and if users implement any customized `LogRecordExporter` and
53+
`SpanExporter`, they should also implement this function.There should be no
54+
influence if users only use factory to create exporters.
4755

4856
## [1.8.3] 2023-03-06
4957

examples/otlp/grpc_log_main.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
1212
# include "opentelemetry/trace/provider.h"
1313

14+
// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
15+
// running exportings when destroy and shutdown exporters.It's optional to users.
16+
# include "opentelemetry/sdk/logs/logger_provider.h"
17+
# include "opentelemetry/sdk/trace/tracer_provider.h"
18+
1419
# include <string>
1520

1621
# ifdef BAZEL_BUILD
@@ -42,6 +47,14 @@ void InitTracer()
4247

4348
void CleanupTracer()
4449
{
50+
// We call ForceFlush to prevent to cancel running exportings, It's optional.
51+
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
52+
trace::Provider::GetTracerProvider();
53+
if (provider)
54+
{
55+
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
56+
}
57+
4558
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
4659
trace::Provider::SetTracerProvider(none);
4760
}
@@ -59,6 +72,14 @@ void InitLogger()
5972

6073
void CleanupLogger()
6174
{
75+
// We call ForceFlush to prevent to cancel running exportings, It's optional.
76+
opentelemetry::nostd::shared_ptr<logs::LoggerProvider> provider =
77+
logs::Provider::GetLoggerProvider();
78+
if (provider)
79+
{
80+
static_cast<logs_sdk::LoggerProvider *>(provider.get())->ForceFlush();
81+
}
82+
6283
nostd::shared_ptr<logs::LoggerProvider> none;
6384
opentelemetry::logs::Provider::SetLoggerProvider(none);
6485
}

examples/otlp/grpc_main.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
77
#include "opentelemetry/trace/provider.h"
88

9+
// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
10+
// destroy and shutdown exporters.It's optional to users.
11+
#include "opentelemetry/sdk/trace/tracer_provider.h"
12+
913
#ifdef BAZEL_BUILD
1014
# include "examples/common/foo_library/foo_library.h"
1115
#else
@@ -32,6 +36,14 @@ void InitTracer()
3236

3337
void CleanupTracer()
3438
{
39+
// We call ForceFlush to prevent to cancel running exportings, It's optional.
40+
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
41+
trace::Provider::GetTracerProvider();
42+
if (provider)
43+
{
44+
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
45+
}
46+
3547
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
3648
trace::Provider::SetTracerProvider(none);
3749
}

examples/otlp/http_log_main.cc

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
# include "opentelemetry/sdk/trace/tracer_provider_factory.h"
1414
# include "opentelemetry/trace/provider.h"
1515

16+
// sdk::TracerProvider and sdk::LoggerProvider is just used to call ForceFlush and prevent to cancel
17+
// running exportings when destroy and shutdown exporters.It's optional to users.
18+
# include "opentelemetry/sdk/logs/logger_provider.h"
19+
# include "opentelemetry/sdk/trace/tracer_provider.h"
20+
21+
# include <iostream>
1622
# include <string>
1723

1824
# ifdef BAZEL_BUILD
@@ -32,11 +38,27 @@ namespace internal_log = opentelemetry::sdk::common::internal_log;
3238
namespace
3339
{
3440

35-
opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts;
41+
opentelemetry::exporter::otlp::OtlpHttpExporterOptions trace_opts;
3642
void InitTracer()
3743
{
44+
if (trace_opts.url.size() > 9)
45+
{
46+
if (trace_opts.url.substr(trace_opts.url.size() - 8) == "/v1/logs")
47+
{
48+
trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 8) + "/v1/traces";
49+
}
50+
else if (trace_opts.url.substr(trace_opts.url.size() - 9) == "/v1/logs/")
51+
{
52+
trace_opts.url = trace_opts.url.substr(0, trace_opts.url.size() - 9) + "/v1/traces";
53+
}
54+
else
55+
{
56+
trace_opts.url = opentelemetry::exporter::otlp::GetOtlpDefaultHttpTracesEndpoint();
57+
}
58+
}
59+
std::cout << "Using " << trace_opts.url << " to export trace spans." << std::endl;
3860
// Create OTLP exporter instance
39-
auto exporter = otlp::OtlpHttpExporterFactory::Create(opts);
61+
auto exporter = otlp::OtlpHttpExporterFactory::Create(trace_opts);
4062
auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter));
4163
std::shared_ptr<opentelemetry::trace::TracerProvider> provider =
4264
trace_sdk::TracerProviderFactory::Create(std::move(processor));
@@ -46,13 +68,22 @@ void InitTracer()
4668

4769
void CleanupTracer()
4870
{
71+
// We call ForceFlush to prevent to cancel running exportings, It's optional.
72+
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
73+
trace::Provider::GetTracerProvider();
74+
if (provider)
75+
{
76+
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
77+
}
78+
4979
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
5080
trace::Provider::SetTracerProvider(none);
5181
}
5282

5383
opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts;
5484
void InitLogger()
5585
{
86+
std::cout << "Using " << logger_opts.url << " to export log records." << std::endl;
5687
logger_opts.console_debug = true;
5788
// Create OTLP exporter instance
5889
auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts);
@@ -65,6 +96,14 @@ void InitLogger()
6596

6697
void CleanupLogger()
6798
{
99+
// We call ForceFlush to prevent to cancel running exportings, It's optional.
100+
opentelemetry::nostd::shared_ptr<logs::LoggerProvider> provider =
101+
logs::Provider::GetLoggerProvider();
102+
if (provider)
103+
{
104+
static_cast<logs_sdk::LoggerProvider *>(provider.get())->ForceFlush();
105+
}
106+
68107
std::shared_ptr<logs::LoggerProvider> none;
69108
opentelemetry::logs::Provider::SetLoggerProvider(none);
70109
}
@@ -83,26 +122,26 @@ int main(int argc, char *argv[])
83122
{
84123
if (argc > 1)
85124
{
86-
opts.url = argv[1];
125+
trace_opts.url = argv[1];
87126
logger_opts.url = argv[1];
88127
if (argc > 2)
89128
{
90-
std::string debug = argv[2];
91-
opts.console_debug = debug != "" && debug != "0" && debug != "no";
129+
std::string debug = argv[2];
130+
trace_opts.console_debug = debug != "" && debug != "0" && debug != "no";
92131
}
93132

94133
if (argc > 3)
95134
{
96135
std::string binary_mode = argv[3];
97136
if (binary_mode.size() >= 3 && binary_mode.substr(0, 3) == "bin")
98137
{
99-
opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
138+
trace_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
100139
logger_opts.content_type = opentelemetry::exporter::otlp::HttpRequestContentType::kBinary;
101140
}
102141
}
103142
}
104143

105-
if (opts.console_debug)
144+
if (trace_opts.console_debug)
106145
{
107146
internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Debug);
108147
}

examples/otlp/http_main.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include "opentelemetry/sdk/trace/tracer_provider_factory.h"
99
#include "opentelemetry/trace/provider.h"
1010

11+
// sdk::TracerProvider is just used to call ForceFlush and prevent to cancel running exportings when
12+
// destroy and shutdown exporters.It's optional to users.
13+
#include "opentelemetry/sdk/trace/tracer_provider.h"
14+
1115
#include <string>
1216

1317
#ifdef BAZEL_BUILD
@@ -38,6 +42,14 @@ void InitTracer()
3842

3943
void CleanupTracer()
4044
{
45+
// We call ForceFlush to prevent to cancel running exportings, It's optional.
46+
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider =
47+
trace::Provider::GetTracerProvider();
48+
if (provider)
49+
{
50+
static_cast<trace_sdk::TracerProvider *>(provider.get())->ForceFlush();
51+
}
52+
4153
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
4254
trace::Provider::SetTracerProvider(none);
4355
}

exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_record_exporter.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@
77
# include "nlohmann/json.hpp"
88
# include "opentelemetry/common/spin_lock_mutex.h"
99
# include "opentelemetry/ext/http/client/http_client_factory.h"
10+
# include "opentelemetry/nostd/shared_ptr.h"
1011
# include "opentelemetry/nostd/type_traits.h"
1112
# include "opentelemetry/sdk/logs/exporter.h"
1213
# include "opentelemetry/sdk/logs/recordable.h"
1314

1415
# include <time.h>
16+
# include <atomic>
17+
# include <condition_variable>
18+
# include <cstddef>
1519
# include <iostream>
20+
# include <mutex>
1621

1722
OPENTELEMETRY_BEGIN_NAMESPACE
1823
namespace exporter
@@ -90,6 +95,14 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
9095
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
9196
&records) noexcept override;
9297

98+
/**
99+
* Force flush the exporter.
100+
* @param timeout an option timeout, default to max.
101+
* @return return true when all data are exported, and false when timeout
102+
*/
103+
bool ForceFlush(
104+
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
105+
93106
/**
94107
* Shutdown this exporter.
95108
* @param timeout The maximum time to wait for the shutdown method to return
@@ -108,6 +121,18 @@ class ElasticsearchLogRecordExporter final : public opentelemetry::sdk::logs::Lo
108121
std::shared_ptr<ext::http::client::HttpClient> http_client_;
109122
mutable opentelemetry::common::SpinLockMutex lock_;
110123
bool isShutdown() const noexcept;
124+
125+
# ifdef ENABLE_ASYNC_EXPORT
126+
struct SynchronizationData
127+
{
128+
std::atomic<std::size_t> session_counter_;
129+
std::atomic<std::size_t> finished_session_counter_;
130+
std::condition_variable force_flush_cv;
131+
std::mutex force_flush_cv_m;
132+
std::recursive_mutex force_flush_m;
133+
};
134+
nostd::shared_ptr<SynchronizationData> synchronization_data_;
135+
# endif
111136
};
112137
} // namespace logs
113138
} // namespace exporter

exporters/elasticsearch/src/es_log_record_exporter.cc

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,19 @@ class AsyncResponseHandler : public http_client::EventHandler
290290
# endif
291291

292292
ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter()
293-
: options_{ElasticsearchExporterOptions()},
294-
http_client_{ext::http::client::HttpClientFactory::Create()}
295-
{}
293+
: options_{ElasticsearchExporterOptions()}, http_client_
294+
{
295+
ext::http::client::HttpClientFactory::Create()
296+
}
297+
# ifdef ENABLE_ASYNC_EXPORT
298+
, synchronization_data_(new SynchronizationData())
299+
# endif
300+
{
301+
# ifdef ENABLE_ASYNC_EXPORT
302+
synchronization_data_->finished_session_counter_.store(0);
303+
synchronization_data_->session_counter_.store(0);
304+
# endif
305+
}
296306

297307
ElasticsearchLogRecordExporter::ElasticsearchLogRecordExporter(
298308
const ElasticsearchExporterOptions &options)
@@ -343,10 +353,12 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
343353

344354
# ifdef ENABLE_ASYNC_EXPORT
345355
// Send the request
346-
std::size_t span_count = records.size();
347-
auto handler = std::make_shared<AsyncResponseHandler>(
356+
synchronization_data_->session_counter_.fetch_add(1, std::memory_order_release);
357+
std::size_t span_count = records.size();
358+
auto synchronization_data = synchronization_data_;
359+
auto handler = std::make_shared<AsyncResponseHandler>(
348360
session,
349-
[span_count](opentelemetry::sdk::common::ExportResult result) {
361+
[span_count, synchronization_data](opentelemetry::sdk::common::ExportResult result) {
350362
if (result != opentelemetry::sdk::common::ExportResult::kSuccess)
351363
{
352364
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] ERROR: Export "
@@ -358,6 +370,9 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
358370
OTEL_INTERNAL_LOG_DEBUG("[ES Log Exporter] Export " << span_count
359371
<< " trace span(s) success");
360372
}
373+
374+
synchronization_data->finished_session_counter_.fetch_add(1, std::memory_order_release);
375+
synchronization_data->force_flush_cv.notify_all();
361376
return true;
362377
},
363378
options_.console_debug_);
@@ -401,6 +416,49 @@ sdk::common::ExportResult ElasticsearchLogRecordExporter::Export(
401416
# endif
402417
}
403418

419+
bool ElasticsearchLogRecordExporter::ForceFlush(std::chrono::microseconds timeout) noexcept
420+
{
421+
# ifdef ENABLE_ASYNC_EXPORT
422+
std::lock_guard<std::recursive_mutex> lock_guard{synchronization_data_->force_flush_m};
423+
std::size_t running_counter =
424+
synchronization_data_->session_counter_.load(std::memory_order_acquire);
425+
// ASAN will report chrono: runtime error: signed integer overflow: A + B cannot be represented
426+
// in type 'long int' here. So we reset timeout to meet signed long int limit here.
427+
timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
428+
timeout, std::chrono::microseconds::zero());
429+
430+
std::chrono::steady_clock::duration timeout_steady =
431+
std::chrono::duration_cast<std::chrono::steady_clock::duration>(timeout);
432+
if (timeout_steady <= std::chrono::steady_clock::duration::zero())
433+
{
434+
timeout_steady = std::chrono::steady_clock::duration::max();
435+
}
436+
437+
std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);
438+
// Wait for all the sessions to finish
439+
while (timeout_steady > std::chrono::steady_clock::duration::zero())
440+
{
441+
if (synchronization_data_->finished_session_counter_.load(std::memory_order_acquire) >=
442+
running_counter)
443+
{
444+
break;
445+
}
446+
447+
std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now();
448+
if (std::cv_status::no_timeout != synchronization_data_->force_flush_cv.wait_for(
449+
lk_cv, std::chrono::seconds{options_.response_timeout_}))
450+
{
451+
break;
452+
}
453+
timeout_steady -= std::chrono::steady_clock::now() - start_timepoint;
454+
}
455+
456+
return timeout_steady > std::chrono::steady_clock::duration::zero();
457+
# else
458+
return true;
459+
# endif
460+
}
461+
404462
bool ElasticsearchLogRecordExporter::Shutdown(std::chrono::microseconds /* timeout */) noexcept
405463
{
406464
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);

exporters/etw/CMakeLists.txt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ target_include_directories(
1111
set_target_properties(opentelemetry_exporter_etw PROPERTIES EXPORT_NAME
1212
etw_exporter)
1313

14-
target_link_libraries(opentelemetry_exporter_etw
15-
INTERFACE opentelemetry_api nlohmann_json::nlohmann_json)
14+
target_link_libraries(
15+
opentelemetry_exporter_etw INTERFACE opentelemetry_api opentelemetry_trace
16+
nlohmann_json::nlohmann_json)
17+
if(WITH_LOGS_PREVIEW)
18+
target_link_libraries(opentelemetry_exporter_etw INTERFACE opentelemetry_logs)
19+
endif()
1620
if(nlohmann_json_clone)
1721
add_dependencies(opentelemetry_exporter_etw nlohmann_json::nlohmann_json)
1822
endif()

0 commit comments

Comments
 (0)