Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ o2_add_library(Framework
src/Task.cxx
src/TextControlService.cxx
src/Variant.cxx
src/WorkflowCustomizationHelpers.cxx
src/WorkflowHelpers.cxx
src/WorkflowSerializationHelpers.cxx
src/WorkflowSpec.cxx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H
#define FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H
#ifndef O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H_
#define O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H_

#include "Framework/ChannelSpec.h"

Expand All @@ -19,6 +19,9 @@ namespace o2::framework

struct FairMQChannelConfigSpec {
int64_t rateLogging;
int64_t recvBufferSize;
int64_t sendBufferSize;
std::string ipcPrefix;
};

/// A set of helpers for common ChannelConfigurationPolicy behaviors
Expand Down
10 changes: 8 additions & 2 deletions Framework/Core/include/Framework/ChannelSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ struct InputChannelSpec {
std::string hostname;
unsigned short port;
ChannelProtocol protocol = ChannelProtocol::Network;
size_t rateLogging = 60;
size_t rateLogging = 0;
size_t recvBufferSize = 1000;
size_t sendBufferSize = 1000;
std::string ipcPrefix = ".";
};

/// This describes an output channel. Output channels are semantically
Expand All @@ -64,7 +67,10 @@ struct OutputChannelSpec {
unsigned short port;
size_t listeners;
ChannelProtocol protocol = ChannelProtocol::Network;
size_t rateLogging = 60;
size_t rateLogging = 0;
size_t recvBufferSize = 1000;
size_t sendBufferSize = 1000;
std::string ipcPrefix = ".";
};

} // namespace o2::framework
Expand Down
25 changes: 25 additions & 0 deletions Framework/Core/include/Framework/WorkflowCustomizationHelpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWCUSTOMIZATIONHELPERS_H_
#define O2_FRAMEWORK_WORKFLOWCUSTOMIZATIONHELPERS_H_

#include "Framework/ConfigParamSpec.h"
#include <vector>

namespace o2::framework
{

struct WorkflowCustomizationHelpers {
static std::vector<ConfigParamSpec> requiredWorkflowOptions();
};

} // namespace o2::framework

#endif // O2_FRAMEWORK_WORKFLOWCUSTOMIZATIONHELPERS_H_
35 changes: 6 additions & 29 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "Framework/BoostOptionsRetriever.h"
#include "Framework/CustomWorkflowTerminationHook.h"
#include "Framework/CommonServices.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "Framework/Logger.h"

#include <unistd.h>
Expand Down Expand Up @@ -75,6 +76,9 @@ void defaultConfiguration(std::vector<o2::framework::ServiceSpec>& services)
services = o2::framework::CommonServices::defaultServices();
}

/// Workflow options which are required by DPL in order to work.
std::vector<o2::framework::ConfigParamSpec> requiredWorkflowOptions();

void defaultConfiguration(o2::framework::OnWorkflowTerminationHook& hook)
{
hook = [](const char*) {};
Expand Down Expand Up @@ -131,35 +135,8 @@ int main(int argc, char** argv)
// The default policy is a catch all pub/sub setup to be consistent with the past.
std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0);
workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}});
workflowOptions.push_back(ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}});
workflowOptions.push_back(ConfigParamSpec{"clone", VariantType::String, "", {"clone processors from a template"}});
workflowOptions.push_back(ConfigParamSpec{"workflow-suffix", VariantType::String, "", {"suffix to add to all dataprocessors"}});

// options for AOD rate limiting
workflowOptions.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::Int64, 0LL, {"Rate limit AOD processing based on memory"}});

// options for AOD writer
workflowOptions.push_back(ConfigParamSpec{"aod-writer-json", VariantType::String, "", {"Name of the json configuration file"}});
workflowOptions.push_back(ConfigParamSpec{"aod-writer-resfile", VariantType::String, "", {"Default name of the output file"}});
workflowOptions.push_back(ConfigParamSpec{"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}});
workflowOptions.push_back(ConfigParamSpec{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}});
workflowOptions.push_back(ConfigParamSpec{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}});

workflowOptions.push_back(ConfigParamSpec{"fairmq-rate-logging", VariantType::Int, 60, {"Rate logging for FairMQ channels"}});

workflowOptions.push_back(ConfigParamSpec{"forwarding-policy",
VariantType::String,
"dangling",
{"Which messages to forward."
" dangling: dangling outputs,"
" all: all messages"}});
workflowOptions.push_back(ConfigParamSpec{"forwarding-destination",
VariantType::String,
"file",
{"Destination for forwarded messages."
" file: write to file,"
" fairmq: send to output proxy"}});
auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions();
workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions));

std::vector<CompletionPolicy> completionPolicies;
UserCustomizationsHelper::userDefinedCustomization(completionPolicies, 0);
Expand Down
10 changes: 8 additions & 2 deletions Framework/Core/src/ChannelConfigurationPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ namespace o2::framework
std::vector<ChannelConfigurationPolicy> ChannelConfigurationPolicy::createDefaultPolicies(ConfigContext const& configContext)
{
ChannelConfigurationPolicy defaultPolicy;
FairMQChannelConfigSpec spec;
spec.rateLogging = configContext.options().get<int>("fairmq-rate-logging");
spec.recvBufferSize = configContext.options().get<int>("fairmq-recv-buffer-size");
spec.sendBufferSize = configContext.options().get<int>("fairmq-send-buffer-size");
spec.ipcPrefix = configContext.options().get<std::string>("fairmq-ipc-prefix");

defaultPolicy.match = ChannelConfigurationPolicyHelpers::matchAny;
defaultPolicy.modifyInput = ChannelConfigurationPolicyHelpers::pullInput({configContext.options().get<int>("fairmq-rate-logging")});
defaultPolicy.modifyOutput = ChannelConfigurationPolicyHelpers::pushOutput({configContext.options().get<int>("fairmq-rate-logging")});
defaultPolicy.modifyInput = ChannelConfigurationPolicyHelpers::pullInput(spec);
defaultPolicy.modifyOutput = ChannelConfigurationPolicyHelpers::pushOutput(spec);

return {defaultPolicy};
}
Expand Down
18 changes: 18 additions & 0 deletions Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPoli
channel.method = ChannelMethod::Connect;
channel.type = ChannelType::Sub;
channel.rateLogging = spec.rateLogging;
channel.recvBufferSize = spec.recvBufferSize;
channel.recvBufferSize = spec.sendBufferSize;
channel.ipcPrefix = spec.ipcPrefix;
};
}

Expand All @@ -50,6 +53,9 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol
channel.method = ChannelMethod::Bind;
channel.type = ChannelType::Pub;
channel.rateLogging = spec.rateLogging;
channel.recvBufferSize = spec.recvBufferSize;
channel.recvBufferSize = spec.sendBufferSize;
channel.ipcPrefix = spec.ipcPrefix;
};
}

Expand All @@ -59,6 +65,9 @@ ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPoli
channel.method = ChannelMethod::Connect;
channel.type = ChannelType::Pull;
channel.rateLogging = spec.rateLogging;
channel.recvBufferSize = spec.recvBufferSize;
channel.recvBufferSize = spec.sendBufferSize;
channel.ipcPrefix = spec.ipcPrefix;
};
}

Expand All @@ -68,6 +77,9 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol
channel.method = ChannelMethod::Bind;
channel.type = ChannelType::Push;
channel.rateLogging = spec.rateLogging;
channel.recvBufferSize = spec.recvBufferSize;
channel.recvBufferSize = spec.sendBufferSize;
channel.ipcPrefix = spec.ipcPrefix;
};
}

Expand All @@ -77,6 +89,9 @@ ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPoli
channel.method = ChannelMethod::Connect;
channel.type = ChannelType::Pair;
channel.rateLogging = spec.rateLogging;
channel.recvBufferSize = spec.recvBufferSize;
channel.recvBufferSize = spec.sendBufferSize;
channel.ipcPrefix = spec.ipcPrefix;
};
}

Expand All @@ -86,6 +101,9 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol
channel.method = ChannelMethod::Bind;
channel.type = ChannelType::Pair;
channel.rateLogging = spec.rateLogging;
channel.recvBufferSize = spec.recvBufferSize;
channel.recvBufferSize = spec.sendBufferSize;
channel.ipcPrefix = spec.ipcPrefix;
};
}

Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/ChannelSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ std::string ChannelSpecHelpers::channelUrl(OutputChannelSpec const& channel)
{
switch (channel.protocol) {
case ChannelProtocol::IPC:
return fmt::format("ipc://{}/{}_{},transport=shmem", getTmpFolder(), channel.hostname, channel.port);
return fmt::format("ipc://{}/{}_{},transport=shmem", channel.ipcPrefix, channel.hostname, channel.port);
default:
return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port)
: fmt::format("tcp://{}:{}", channel.hostname, channel.port);
Expand All @@ -78,7 +78,7 @@ std::string ChannelSpecHelpers::channelUrl(InputChannelSpec const& channel)
{
switch (channel.protocol) {
case ChannelProtocol::IPC:
return fmt::format("ipc://{}/{}_{},transport=shmem", getTmpFolder(), channel.hostname, channel.port);
return fmt::format("ipc://{}/{}_{},transport=shmem", channel.ipcPrefix, channel.hostname, channel.port);
default:
return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port)
: fmt::format("tcp://{}:{}", channel.hostname, channel.port);
Expand Down
39 changes: 17 additions & 22 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <uv.h>
#include <iostream>
#include <fmt/format.h>

#include <sys/time.h>
#include <sys/resource.h>
Expand Down Expand Up @@ -245,32 +246,26 @@ struct ExpirationHandlerHelpers {
/// FIXME: support shared memory
std::string DeviceSpecHelpers::inputChannel2String(const InputChannelSpec& channel)
{
std::string result;

if (!channel.name.empty()) {
result += "name=" + channel.name + ",";
}
result += std::string("type=") + ChannelSpecHelpers::typeAsString(channel.type);
result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method);
result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel);
result += std::string(",rateLogging=" + std::to_string(channel.rateLogging));

return result;
return fmt::format("{}type={},method={},address={},rateLogging={},recvBufferSize={},sendBufferSize={}",
channel.name.empty() ? "" : "name=" + channel.name + ",",
ChannelSpecHelpers::typeAsString(channel.type),
ChannelSpecHelpers::methodAsString(channel.method),
ChannelSpecHelpers::channelUrl(channel),
channel.rateLogging,
channel.recvBufferSize,
channel.sendBufferSize);
}

std::string DeviceSpecHelpers::outputChannel2String(const OutputChannelSpec& channel)
{
std::string result;

if (!channel.name.empty()) {
result += "name=" + channel.name + ",";
}
result += std::string("type=") + ChannelSpecHelpers::typeAsString(channel.type);
result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method);
result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel);
result += std::string(",rateLogging=" + std::to_string(channel.rateLogging));

return result;
return fmt::format("{}type={},method={},address={},rateLogging={},recvBufferSize={},sendBufferSize={}",
channel.name.empty() ? "" : "name=" + channel.name + ",",
ChannelSpecHelpers::typeAsString(channel.type),
ChannelSpecHelpers::methodAsString(channel.method),
ChannelSpecHelpers::channelUrl(channel),
channel.rateLogging,
channel.recvBufferSize,
channel.sendBufferSize);
}

void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
Expand Down
65 changes: 65 additions & 0 deletions Framework/Core/src/WorkflowCustomizationHelpers.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/WorkflowCustomizationHelpers.h"
#include <string>
#include <cstdlib>
#include <unistd.h>

namespace
{
std::string defaultIPCFolder()
{
/// Find out a place where we can write the sockets
char const* channelPrefix = getenv("TMPDIR");
if (channelPrefix) {
return std::string(channelPrefix);
}
return access("/tmp", W_OK) == 0 ? "/tmp" : ".";
}
} // namespace

namespace o2::framework
{

std::vector<ConfigParamSpec> WorkflowCustomizationHelpers::requiredWorkflowOptions()
{
return std::vector<ConfigParamSpec>{{ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}},
ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}},
ConfigParamSpec{"clone", VariantType::String, "", {"clone processors from a template"}},
ConfigParamSpec{"workflow-suffix", VariantType::String, "", {"suffix to add to all dataprocessors"}},

// options for AOD rate limiting
ConfigParamSpec{"aod-memory-rate-limit", VariantType::Int64, 0LL, {"Rate limit AOD processing based on memory"}},

// options for AOD writer
ConfigParamSpec{"aod-writer-json", VariantType::String, "", {"Name of the json configuration file"}},
ConfigParamSpec{"aod-writer-resfile", VariantType::String, "", {"Default name of the output file"}},
ConfigParamSpec{"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}},
ConfigParamSpec{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}},
ConfigParamSpec{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}},

ConfigParamSpec{"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}},
ConfigParamSpec{"fairmq-recv-buffer-size", VariantType::Int, 1000, {"recvBufferSize option for FairMQ channels"}},
ConfigParamSpec{"fairmq-send-buffer-size", VariantType::Int, 1000, {"sendBufferSize option for FairMQ channels"}},
/// Find out a place where we can write the sockets
ConfigParamSpec{"fairmq-ipc-prefix", VariantType::String, defaultIPCFolder(), {"Prefix for FairMQ channels location"}},

ConfigParamSpec{"forwarding-policy", VariantType::String, "dangling", {"Which messages to forward."
" dangling: dangling outputs,"
" all: all messages"}},
ConfigParamSpec{"forwarding-destination",
VariantType::String,
"file",
{"Destination for forwarded messages."
" file: write to file,"
" fairmq: send to output proxy"}}}};
}
} // namespace o2::framework
17 changes: 2 additions & 15 deletions Framework/Core/test/Mocking.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "Framework/WorkflowSpec.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/SimpleOptionsRetriever.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "../src/WorkflowHelpers.h"

std::unique_ptr<o2::framework::ConfigContext> makeEmptyConfigContext()
Expand All @@ -20,21 +21,7 @@ std::unique_ptr<o2::framework::ConfigContext> makeEmptyConfigContext()
// FIXME: Ugly... We need to fix ownership and make sure the ConfigContext
// either owns or shares ownership of the registry.
std::vector<std::unique_ptr<ParamRetriever>> retrievers;
static std::vector<ConfigParamSpec> specs = {
ConfigParamSpec{"forwarding-policy",
VariantType::String,
"dangling",
{""}},
ConfigParamSpec{"forwarding-destination",
VariantType::String,
"file",
{"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}},
ConfigParamSpec{"fairmq-rate-logging",
VariantType::Int,
60,
{"rateLogging"}},
};
specs.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::String, "0", {"rate"}});
static std::vector<ConfigParamSpec> specs = WorkflowCustomizationHelpers::requiredWorkflowOptions();
auto store = std::make_unique<ConfigParamStore>(specs, std::move(retrievers));
store->preload();
store->activate();
Expand Down