Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ o2_add_library(Framework
src/CompletionPolicyHelpers.cxx
src/ComputingResourceHelpers.cxx
src/ConfigContext.cxx
src/ControlService.cxx
src/DispatchPolicy.cxx
src/ConfigParamStore.cxx
src/ConfigParamsHelper.cxx
Expand Down Expand Up @@ -83,10 +84,10 @@ o2_add_library(Framework
src/TableBuilder.cxx
src/TableConsumer.cxx
src/TableTreeHelpers.cxx
src/TextDriverClient.cxx
src/DataInputDirector.cxx
src/DataOutputDirector.cxx
src/Task.cxx
src/TextControlService.cxx
src/Variant.cxx
src/WorkflowCustomizationHelpers.cxx
src/WorkflowHelpers.cxx
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/CommonServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct CommonServices {
return [](InitContext&, void* service) -> void* { return service; };
}

static ServiceSpec driverClientSpec();
static ServiceSpec monitoringSpec();
static ServiceSpec infologgerContextSpec();
static ServiceSpec infologgerSpec();
Expand Down
24 changes: 20 additions & 4 deletions Framework/Core/include/Framework/ControlService.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
#define O2_FRAMEWORK_CONTROLSERVICE_H_

#include "Framework/ServiceHandle.h"
#include <regex>
#include <mutex>

namespace o2::framework
{

struct ServiceRegistry;
struct DeviceState;
struct DriverClient;

enum struct StreamingState : int;

/// Kind of request we want to issue to control
Expand All @@ -33,16 +39,26 @@ class ControlService
public:
constexpr static ServiceKind service_kind = ServiceKind::Global;

ControlService(ServiceRegistry& registry, DeviceState& deviceState);
/// Compatibility with old API.
void readyToQuit(bool all) { this->readyToQuit(all ? QuitRequest::All : QuitRequest::Me); }
/// Signal control that we are potentially ready to quit some / all
/// dataprocessor.
virtual void readyToQuit(QuitRequest kind) = 0;
void readyToQuit(QuitRequest kind);
/// Signal that we are done with the current stream
virtual void endOfStream() = 0;
void endOfStream();
/// Report the current streaming state of a given device
virtual void notifyStreamingState(StreamingState state) = 0;
void notifyStreamingState(StreamingState state);

private:
bool mOnce = false;
ServiceRegistry& mRegistry;
DeviceState& mDeviceState;
DriverClient& mDriverClient;
std::mutex mMutex;
};

bool parseControl(std::string const& s, std::smatch& match);

} // namespace o2::framework
#endif // O2_FRAMEWORK_ROOTFILESERVICE_H_
#endif // O2_FRAMEWORK_CONTROLSERVICE_H_
34 changes: 34 additions & 0 deletions Framework/Core/include/Framework/DriverClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_DRIVERCLIENT_H_
#define O2_FRAMEWORK_DRIVERCLIENT_H_

#include "Framework/ServiceHandle.h"
#include <functional>

namespace o2::framework
{

/// A service API to communicate with the driver
class DriverClient
{
public:
constexpr static ServiceKind service_kind = ServiceKind::Global;

/// Report some message to the Driver
virtual void tell(const char* msg) = 0;

/// Act on some @a event notified by the driver
virtual void observe(const char* event, std::function<void(char const*)> callback) = 0;
};

} // namespace o2::framework

#endif // O2_FRAMEWORK_DRIVERCLIENT_H_
61 changes: 0 additions & 61 deletions Framework/Core/include/Framework/TextControlService.h

This file was deleted.

32 changes: 30 additions & 2 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
// or submit itself to any jurisdiction.
#include "Framework/CommonServices.h"
#include "Framework/ParallelContext.h"
#include "Framework/TextControlService.h"
#include "Framework/ControlService.h"
#include "Framework/DriverClient.h"
#include "Framework/CallbackService.h"
#include "Framework/TimesliceIndex.h"
#include "Framework/ServiceRegistry.h"
Expand All @@ -23,6 +24,7 @@
#include "Framework/EndOfStreamContext.h"
#include "Framework/Tracing.h"
#include "Framework/Monitoring.h"
#include "TextDriverClient.h"
#include "../src/DataProcessingStatus.h"

#include <Configuration/ConfigurationInterface.h>
Expand Down Expand Up @@ -216,13 +218,38 @@ o2::framework::ServiceSpec CommonServices::configurationSpec()
ServiceKind::Global};
}

o2::framework::ServiceSpec CommonServices::driverClientSpec()
{
return ServiceSpec{
"driverClient",
[](ServiceRegistry& services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
return ServiceHandle{TypeIdHelpers::uniqueId<DriverClient>(),
new TextDriverClient(services, state)};
},
noConfiguration(),
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
nullptr,
ServiceKind::Global};
}

o2::framework::ServiceSpec CommonServices::controlSpec()
{
return ServiceSpec{
"control",
[](ServiceRegistry& services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
return ServiceHandle{TypeIdHelpers::uniqueId<ControlService>(),
new TextControlService(services, state)};
new ControlService(services, state)};
},
noConfiguration(),
nullptr,
Expand Down Expand Up @@ -540,6 +567,7 @@ std::vector<ServiceSpec> CommonServices::defaultServices(int numThreads)
{
std::vector<ServiceSpec> specs{
timesliceIndex(),
driverClientSpec(),
monitoringSpec(),
infologgerContextSpec(),
infologgerSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/TextControlService.h"
#include "Framework/ControlService.h"
#include "Framework/DriverClient.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DeviceState.h"
#include "Framework/ServiceRegistry.h"
Expand All @@ -22,21 +23,22 @@
namespace o2::framework
{

TextControlService::TextControlService(ServiceRegistry& registry, DeviceState& deviceState)
ControlService::ControlService(ServiceRegistry& registry, DeviceState& deviceState)
: mRegistry{registry},
mDeviceState{deviceState}
mDeviceState{deviceState},
mDriverClient{registry.get<DriverClient>()}
{
}

// This will send an end of stream to all the devices downstream.
void TextControlService::endOfStream()
void ControlService::endOfStream()
{
std::scoped_lock lock(mMutex);
mDeviceState.streaming = StreamingState::EndOfStreaming;
}

// All we do is to printout
void TextControlService::readyToQuit(QuitRequest what)
void ControlService::readyToQuit(QuitRequest what)
{
std::scoped_lock lock(mMutex);
if (mOnce == true) {
Expand All @@ -46,27 +48,27 @@ void TextControlService::readyToQuit(QuitRequest what)
switch (what) {
case QuitRequest::All:
mDeviceState.quitRequested = true;
LOG(INFO) << "CONTROL_ACTION: READY_TO_QUIT_ALL";
mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ALL");
break;
case QuitRequest::Me:
mDeviceState.quitRequested = true;
LOG(INFO) << "CONTROL_ACTION: READY_TO_QUIT_ME";
mDriverClient.tell("CONTROL_ACTION: READY_TO_QUIT_ME");
break;
}
}

void TextControlService::notifyStreamingState(StreamingState state)
void ControlService::notifyStreamingState(StreamingState state)
{
std::scoped_lock lock(mMutex);
switch (state) {
case StreamingState::Idle:
LOG(INFO) << "CONTROL_ACTION: NOTIFY_STREAMING_STATE IDLE";
mDriverClient.tell("CONTROL_ACTION: NOTIFY_STREAMING_STATE IDLE");
break;
case StreamingState::Streaming:
LOG(INFO) << "CONTROL_ACTION: NOTIFY_STREAMING_STATE STREAMING";
mDriverClient.tell("CONTROL_ACTION: NOTIFY_STREAMING_STATE STREAMING");
break;
case StreamingState::EndOfStreaming:
LOG(INFO) << "CONTROL_ACTION: NOTIFY_STREAMING_STATE EOS";
mDriverClient.tell("CONTROL_ACTION: NOTIFY_STREAMING_STATE EOS");
break;
default:
throw std::runtime_error("Unknown streaming state");
Expand Down
25 changes: 25 additions & 0 deletions Framework/Core/src/TextDriverClient.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "TextDriverClient.h"
#include "Framework/Logger.h"

namespace o2::framework
{

TextDriverClient::TextDriverClient(ServiceRegistry& registry, DeviceState& deviceState)
{
}

void TextDriverClient::tell(const char* msg)
{
LOG(INFO) << msg;
}

} // namespace o2::framework
38 changes: 38 additions & 0 deletions Framework/Core/src/TextDriverClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_TEXTDRIVERCLIENT_H_
#define O2_FRAMEWORK_TEXTDRIVERCLIENT_H_

#include "Framework/DriverClient.h"

namespace o2::framework
{

struct ServiceRegistry;
struct DeviceState;

/// A text based way of communicating with the driver.
class TextDriverClient : public DriverClient
{
public:
constexpr static ServiceKind service_kind = ServiceKind::Global;

TextDriverClient(ServiceRegistry& registry, DeviceState& deviceState);

/// The text based client simply sends a message on stdout which is
/// (potentially) captured by the driver.
void tell(const char* msg) override;
/// Half duplex communication
void observe(const char* event, std::function<void(char const*)> callback) override{};
};

} // namespace o2::framework

#endif // O2_FRAMEWORK_TEXTDRIVERCLIENT_H_
2 changes: 1 addition & 1 deletion Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "Framework/SimpleRawDeviceService.h"
#define O2_SIGNPOST_DEFINE_CONTEXT
#include "Framework/Signpost.h"
#include "Framework/TextControlService.h"
#include "Framework/ControlService.h"
#include "Framework/CallbackService.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/Monitoring.h"
Expand Down