diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 84e2d22b88bb7..eed8197f3b23e 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -31,6 +31,7 @@ o2_add_library(Framework src/CompletionPolicyHelpers.cxx src/ComputingResourceHelpers.cxx src/ConfigContext.cxx + src/ControlService.cxx src/DispatchPolicy.cxx src/ConfigParamStore.cxx src/ConfigParamsHelper.cxx @@ -83,10 +84,10 @@ o2_add_library(Framework src/TableBuilder.cxx src/TableConsumer.cxx src/TableTreeHelpers.cxx + src/TextDriverClient.cxx src/DataInputDirector.cxx src/DataOutputDirector.cxx src/Task.cxx - src/TextControlService.cxx src/Variant.cxx src/WorkflowCustomizationHelpers.cxx src/WorkflowHelpers.cxx diff --git a/Framework/Core/include/Framework/CommonServices.h b/Framework/Core/include/Framework/CommonServices.h index 6c26583efedd8..b9c210ed49d0a 100644 --- a/Framework/Core/include/Framework/CommonServices.h +++ b/Framework/Core/include/Framework/CommonServices.h @@ -47,6 +47,7 @@ struct CommonServices { return [](InitContext&, void* service) -> void* { return service; }; } + static ServiceSpec driverClientSpec(); static ServiceSpec monitoringSpec(); static ServiceSpec infologgerContextSpec(); static ServiceSpec infologgerSpec(); diff --git a/Framework/Core/include/Framework/ControlService.h b/Framework/Core/include/Framework/ControlService.h index 0fcfdcc696b42..3b27f44177263 100644 --- a/Framework/Core/include/Framework/ControlService.h +++ b/Framework/Core/include/Framework/ControlService.h @@ -11,10 +11,16 @@ #define O2_FRAMEWORK_CONTROLSERVICE_H_ #include "Framework/ServiceHandle.h" +#include +#include namespace o2::framework { +struct ServiceRegistry; +struct DeviceState; +struct DriverClient; + enum struct StreamingState : int; /// Kind of request we want to issue to control @@ -33,16 +39,26 @@ class ControlService public: constexpr static ServiceKind service_kind = ServiceKind::Global; + ControlService(ServiceRegistry& registry, DeviceState& deviceState); /// Compatibility with old API. void readyToQuit(bool all) { this->readyToQuit(all ? QuitRequest::All : QuitRequest::Me); } /// Signal control that we are potentially ready to quit some / all /// dataprocessor. - virtual void readyToQuit(QuitRequest kind) = 0; + void readyToQuit(QuitRequest kind); /// Signal that we are done with the current stream - virtual void endOfStream() = 0; + void endOfStream(); /// Report the current streaming state of a given device - virtual void notifyStreamingState(StreamingState state) = 0; + void notifyStreamingState(StreamingState state); + + private: + bool mOnce = false; + ServiceRegistry& mRegistry; + DeviceState& mDeviceState; + DriverClient& mDriverClient; + std::mutex mMutex; }; +bool parseControl(std::string const& s, std::smatch& match); + } // namespace o2::framework -#endif // O2_FRAMEWORK_ROOTFILESERVICE_H_ +#endif // O2_FRAMEWORK_CONTROLSERVICE_H_ diff --git a/Framework/Core/include/Framework/DriverClient.h b/Framework/Core/include/Framework/DriverClient.h new file mode 100644 index 0000000000000..5c1c2f05f23d0 --- /dev/null +++ b/Framework/Core/include/Framework/DriverClient.h @@ -0,0 +1,34 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_DRIVERCLIENT_H_ +#define O2_FRAMEWORK_DRIVERCLIENT_H_ + +#include "Framework/ServiceHandle.h" +#include + +namespace o2::framework +{ + +/// A service API to communicate with the driver +class DriverClient +{ + public: + constexpr static ServiceKind service_kind = ServiceKind::Global; + + /// Report some message to the Driver + virtual void tell(const char* msg) = 0; + + /// Act on some @a event notified by the driver + virtual void observe(const char* event, std::function callback) = 0; +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_DRIVERCLIENT_H_ diff --git a/Framework/Core/include/Framework/TextControlService.h b/Framework/Core/include/Framework/TextControlService.h deleted file mode 100644 index fd2e207cb479c..0000000000000 --- a/Framework/Core/include/Framework/TextControlService.h +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. -#ifndef O2_FRAMEWORK_TEXTCONTROLSERVICE_H_ -#define O2_FRAMEWORK_TEXTCONTROLSERVICE_H_ - -#include "Framework/ServiceHandle.h" -#include "Framework/ControlService.h" - -#include -#include -#include - -namespace o2::framework -{ - -class ServiceRegistry; -class DeviceState; - -/// A service that data processors can use to talk to control and ask for -/// their own state change or others. -class TextControlService : public ControlService -{ - public: - TextControlService(ServiceRegistry& registry, DeviceState& deviceState); - /// Tell the control that I am ready to quit. This will be - /// done by printing (only once) - /// - /// CONTROL_ACTION: READY_TO_QUIT_ME - /// - /// or - /// - /// CONTROL_ACTION: READY_TO_QUIT_ALL - /// - /// depending on the value of \param all. - /// - /// It's up to the driver to actually react on that and terminate the - /// child. - void readyToQuit(QuitRequest all = QuitRequest::Me) final; - - void endOfStream() final; - - void notifyStreamingState(StreamingState state) final; - - private: - bool mOnce = false; - ServiceRegistry& mRegistry; - DeviceState& mDeviceState; - std::mutex mMutex; -}; - -bool parseControl(std::string const& s, std::smatch& match); - -} // namespace o2::framework -#endif // O2_FRAMEWORK_TEXTCONTROLSERVICE_H_ diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index f92114b1de2bd..fa56c952c2509 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -9,7 +9,8 @@ // or submit itself to any jurisdiction. #include "Framework/CommonServices.h" #include "Framework/ParallelContext.h" -#include "Framework/TextControlService.h" +#include "Framework/ControlService.h" +#include "Framework/DriverClient.h" #include "Framework/CallbackService.h" #include "Framework/TimesliceIndex.h" #include "Framework/ServiceRegistry.h" @@ -23,6 +24,7 @@ #include "Framework/EndOfStreamContext.h" #include "Framework/Tracing.h" #include "Framework/Monitoring.h" +#include "TextDriverClient.h" #include "../src/DataProcessingStatus.h" #include @@ -216,13 +218,38 @@ o2::framework::ServiceSpec CommonServices::configurationSpec() ServiceKind::Global}; } +o2::framework::ServiceSpec CommonServices::driverClientSpec() +{ + return ServiceSpec{ + "driverClient", + [](ServiceRegistry& services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle { + return ServiceHandle{TypeIdHelpers::uniqueId(), + new TextDriverClient(services, state)}; + }, + noConfiguration(), + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + nullptr, + ServiceKind::Global}; +} + o2::framework::ServiceSpec CommonServices::controlSpec() { return ServiceSpec{ "control", [](ServiceRegistry& services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle { return ServiceHandle{TypeIdHelpers::uniqueId(), - new TextControlService(services, state)}; + new ControlService(services, state)}; }, noConfiguration(), nullptr, @@ -540,6 +567,7 @@ std::vector CommonServices::defaultServices(int numThreads) { std::vector specs{ timesliceIndex(), + driverClientSpec(), monitoringSpec(), infologgerContextSpec(), infologgerSpec(), diff --git a/Framework/Core/src/TextControlService.cxx b/Framework/Core/src/ControlService.cxx similarity index 74% rename from Framework/Core/src/TextControlService.cxx rename to Framework/Core/src/ControlService.cxx index 15f15cd565774..ebdb2c65bf0a9 100644 --- a/Framework/Core/src/TextControlService.cxx +++ b/Framework/Core/src/ControlService.cxx @@ -7,7 +7,8 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include "Framework/TextControlService.h" +#include "Framework/ControlService.h" +#include "Framework/DriverClient.h" #include "Framework/DeviceSpec.h" #include "Framework/DeviceState.h" #include "Framework/ServiceRegistry.h" @@ -22,21 +23,22 @@ namespace o2::framework { -TextControlService::TextControlService(ServiceRegistry& registry, DeviceState& deviceState) +ControlService::ControlService(ServiceRegistry& registry, DeviceState& deviceState) : mRegistry{registry}, - mDeviceState{deviceState} + mDeviceState{deviceState}, + mDriverClient{registry.get()} { } // This will send an end of stream to all the devices downstream. -void TextControlService::endOfStream() +void ControlService::endOfStream() { std::scoped_lock lock(mMutex); mDeviceState.streaming = StreamingState::EndOfStreaming; } // All we do is to printout -void TextControlService::readyToQuit(QuitRequest what) +void ControlService::readyToQuit(QuitRequest what) { std::scoped_lock lock(mMutex); if (mOnce == true) { @@ -46,27 +48,27 @@ void TextControlService::readyToQuit(QuitRequest what) switch (what) { case QuitRequest::All: mDeviceState.quitRequested = true; - LOG(INFO) << "CONTROL_ACTION: READY_TO_QUIT_ALL"; + mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ALL"); break; case QuitRequest::Me: mDeviceState.quitRequested = true; - LOG(INFO) << "CONTROL_ACTION: READY_TO_QUIT_ME"; + mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ME"); break; } } -void TextControlService::notifyStreamingState(StreamingState state) +void ControlService::notifyStreamingState(StreamingState state) { std::scoped_lock lock(mMutex); switch (state) { case StreamingState::Idle: - LOG(INFO) << "CONTROL_ACTION: NOTIFY_STREAMING_STATE IDLE"; + mDriverClient.tell("CONTROL_ACTION: NOTIFY_STREAMING_STATE IDLE"); break; case StreamingState::Streaming: - LOG(INFO) << "CONTROL_ACTION: NOTIFY_STREAMING_STATE STREAMING"; + mDriverClient.tell("CONTROL_ACTION: NOTIFY_STREAMING_STATE STREAMING"); break; case StreamingState::EndOfStreaming: - LOG(INFO) << "CONTROL_ACTION: NOTIFY_STREAMING_STATE EOS"; + mDriverClient.tell("CONTROL_ACTION: NOTIFY_STREAMING_STATE EOS"); break; default: throw std::runtime_error("Unknown streaming state"); diff --git a/Framework/Core/src/TextDriverClient.cxx b/Framework/Core/src/TextDriverClient.cxx new file mode 100644 index 0000000000000..86d7009223d23 --- /dev/null +++ b/Framework/Core/src/TextDriverClient.cxx @@ -0,0 +1,25 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "TextDriverClient.h" +#include "Framework/Logger.h" + +namespace o2::framework +{ + +TextDriverClient::TextDriverClient(ServiceRegistry& registry, DeviceState& deviceState) +{ +} + +void TextDriverClient::tell(const char* msg) +{ + LOG(INFO) << msg; +} + +} // namespace o2::framework diff --git a/Framework/Core/src/TextDriverClient.h b/Framework/Core/src/TextDriverClient.h new file mode 100644 index 0000000000000..7abdc58c170f1 --- /dev/null +++ b/Framework/Core/src/TextDriverClient.h @@ -0,0 +1,38 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_TEXTDRIVERCLIENT_H_ +#define O2_FRAMEWORK_TEXTDRIVERCLIENT_H_ + +#include "Framework/DriverClient.h" + +namespace o2::framework +{ + +struct ServiceRegistry; +struct DeviceState; + +/// A text based way of communicating with the driver. +class TextDriverClient : public DriverClient +{ + public: + constexpr static ServiceKind service_kind = ServiceKind::Global; + + TextDriverClient(ServiceRegistry& registry, DeviceState& deviceState); + + /// The text based client simply sends a message on stdout which is + /// (potentially) captured by the driver. + void tell(const char* msg) override; + /// Half duplex communication + void observe(const char* event, std::function callback) override{}; +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_TEXTDRIVERCLIENT_H_ diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index f25258c3c61ff..386bd56c672b6 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -32,7 +32,7 @@ #include "Framework/SimpleRawDeviceService.h" #define O2_SIGNPOST_DEFINE_CONTEXT #include "Framework/Signpost.h" -#include "Framework/TextControlService.h" +#include "Framework/ControlService.h" #include "Framework/CallbackService.h" #include "Framework/WorkflowSpec.h" #include "Framework/Monitoring.h"