diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index 89d8dd90d9f20..0d00b99e6a03a 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -74,7 +74,6 @@ struct DataProcessorContext { AlgorithmSpec::ErrorCallback* error = nullptr; std::function* errorHandling = nullptr; - int* errorCount = nullptr; }; /// A device actually carrying out all the DPL @@ -124,7 +123,6 @@ class DataProcessingDevice : public FairMQDevice /// Completed actions std::vector mCompleted; - int mErrorCount; uint64_t mLastSlowMetricSentTimestamp = 0; /// The timestamp of the last time we sent slow metrics uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started diff --git a/Framework/Core/include/Framework/DataProcessingStats.h b/Framework/Core/include/Framework/DataProcessingStats.h index b4d9fb5c26210..c5bde7ed5598e 100644 --- a/Framework/Core/include/Framework/DataProcessingStats.h +++ b/Framework/Core/include/Framework/DataProcessingStats.h @@ -25,6 +25,7 @@ struct DataProcessingStats { int minLatency = 0; int maxLatency = 0; }; + std::atomic errorCount = 0; std::atomic pendingInputs = 0; std::atomic incomplete = 0; std::atomic inputParts = 0; diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index fa56c952c2509..40d074f894701 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -480,6 +480,7 @@ auto sendRelayerMetrics(ServiceRegistry& registry, DataProcessingStats& stats) - monitoring.send(Metric{(int)relayerStats.droppedIncomingMessages, "dropped_incoming_messages"}.addTag(Key::Subsystem, Value::DPL)); monitoring.send(Metric{(int)relayerStats.relayedMessages, "relayed_messages"}.addTag(Key::Subsystem, Value::DPL)); + monitoring.send(Metric{(int)stats.errorCount, "errors"}.addTag(Key::Subsystem, Value::DPL)); monitoring.send(Metric{(int)stats.pendingInputs, "inputs/relayed/pending"}.addTag(Key::Subsystem, Value::DPL)); monitoring.send(Metric{(int)stats.incomplete, "inputs/relayed/incomplete"}.addTag(Key::Subsystem, Value::DPL)); monitoring.send(Metric{(int)stats.inputParts, "inputs/relayed/total"}.addTag(Key::Subsystem, Value::DPL)); diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index da5982055957d..139f4d309eb69 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -94,8 +94,7 @@ DataProcessingDevice::DataProcessingDevice(DeviceSpec const& spec, ServiceRegist mError{spec.algorithm.onError}, mConfigRegistry{nullptr}, mAllocator{&mTimingInfo, ®istry, spec.outputs}, - mServiceRegistry{registry}, - mErrorCount{0} + mServiceRegistry{registry} { /// FIXME: move erro handling to a service? if (mError != nullptr) { @@ -427,7 +426,6 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context) context.error = &mError; /// Callback for the error handling context.errorHandling = &mErrorHandling; - context.errorCount = &mErrorCount; } void DataProcessingDevice::PreRun() @@ -684,9 +682,8 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, FairMQParts return results; }; - auto reportError = [& registry = *context.registry, &context](const char* message) { - context.errorCount++; - registry.get().send(Metric{*context.errorCount, "errors"}.addTag(Key::Subsystem, Value::DPL)); + auto reportError = [®istry = *context.registry, &context](const char* message) { + registry.get().errorCount++; }; auto handleValidMessages = [&parts, &context = context, &relayer = *context.relayer, &reportError](std::vector const& types) { @@ -782,9 +779,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, // should work just fine. std::vector currentSetOfInputs; - auto reportError = [& registry = *context.registry, &context](const char* message) { - context.errorCount++; - registry.get().send(Metric{*context.errorCount, "errors"}.addTag(Key::Subsystem, Value::DPL)); + auto reportError = [®istry = *context.registry, &context](const char* message) { + registry.get().errorCount++; }; // For the moment we have a simple "immediately dispatch" policy for stuff @@ -1077,8 +1073,7 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, void DataProcessingDevice::error(const char* msg) { LOG(ERROR) << msg; - mErrorCount++; - mServiceRegistry.get().send(Metric{mErrorCount, "errors"}.addTag(Key::Subsystem, Value::DPL)); + mServiceRegistry.get().errorCount++; } } // namespace o2::framework