diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index fa4c2fc5fdc5a..06f5de71e4740 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -98,6 +98,7 @@ o2_add_library(Framework src/Task.cxx src/TextControlService.cxx src/Variant.cxx + src/WorkflowCustomizationHelpers.cxx src/WorkflowHelpers.cxx src/WorkflowSerializationHelpers.cxx src/WorkflowSpec.cxx diff --git a/Framework/Core/include/Framework/ChannelConfigurationPolicyHelpers.h b/Framework/Core/include/Framework/ChannelConfigurationPolicyHelpers.h index ece802e0dd547..4f7df3f87f877 100644 --- a/Framework/Core/include/Framework/ChannelConfigurationPolicyHelpers.h +++ b/Framework/Core/include/Framework/ChannelConfigurationPolicyHelpers.h @@ -7,8 +7,8 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H -#define FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H +#ifndef O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H_ +#define O2_FRAMEWORK_CHANNELCONFIGURATIONPOLICYHELPERS_H_ #include "Framework/ChannelSpec.h" @@ -19,6 +19,9 @@ namespace o2::framework struct FairMQChannelConfigSpec { int64_t rateLogging; + int64_t recvBufferSize; + int64_t sendBufferSize; + std::string ipcPrefix; }; /// A set of helpers for common ChannelConfigurationPolicy behaviors diff --git a/Framework/Core/include/Framework/ChannelSpec.h b/Framework/Core/include/Framework/ChannelSpec.h index d8fcb406c2258..c8d256673aaba 100644 --- a/Framework/Core/include/Framework/ChannelSpec.h +++ b/Framework/Core/include/Framework/ChannelSpec.h @@ -48,7 +48,10 @@ struct InputChannelSpec { std::string hostname; unsigned short port; ChannelProtocol protocol = ChannelProtocol::Network; - size_t rateLogging = 60; + size_t rateLogging = 0; + size_t recvBufferSize = 1000; + size_t sendBufferSize = 1000; + std::string ipcPrefix = "."; }; /// This describes an output channel. Output channels are semantically @@ -64,7 +67,10 @@ struct OutputChannelSpec { unsigned short port; size_t listeners; ChannelProtocol protocol = ChannelProtocol::Network; - size_t rateLogging = 60; + size_t rateLogging = 0; + size_t recvBufferSize = 1000; + size_t sendBufferSize = 1000; + std::string ipcPrefix = "."; }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/WorkflowCustomizationHelpers.h b/Framework/Core/include/Framework/WorkflowCustomizationHelpers.h new file mode 100644 index 0000000000000..edf4758adeb4f --- /dev/null +++ b/Framework/Core/include/Framework/WorkflowCustomizationHelpers.h @@ -0,0 +1,25 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_WORKFLOWCUSTOMIZATIONHELPERS_H_ +#define O2_FRAMEWORK_WORKFLOWCUSTOMIZATIONHELPERS_H_ + +#include "Framework/ConfigParamSpec.h" +#include + +namespace o2::framework +{ + +struct WorkflowCustomizationHelpers { + static std::vector requiredWorkflowOptions(); +}; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_WORKFLOWCUSTOMIZATIONHELPERS_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index d79e67bd42b38..3e97f6b4bc575 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -20,6 +20,7 @@ #include "Framework/BoostOptionsRetriever.h" #include "Framework/CustomWorkflowTerminationHook.h" #include "Framework/CommonServices.h" +#include "Framework/WorkflowCustomizationHelpers.h" #include "Framework/Logger.h" #include @@ -75,6 +76,9 @@ void defaultConfiguration(std::vector& services) services = o2::framework::CommonServices::defaultServices(); } +/// Workflow options which are required by DPL in order to work. +std::vector requiredWorkflowOptions(); + void defaultConfiguration(o2::framework::OnWorkflowTerminationHook& hook) { hook = [](const char*) {}; @@ -131,35 +135,8 @@ int main(int argc, char** argv) // The default policy is a catch all pub/sub setup to be consistent with the past. std::vector workflowOptions; UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0); - workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}}); - workflowOptions.push_back(ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}}); - workflowOptions.push_back(ConfigParamSpec{"clone", VariantType::String, "", {"clone processors from a template"}}); - workflowOptions.push_back(ConfigParamSpec{"workflow-suffix", VariantType::String, "", {"suffix to add to all dataprocessors"}}); - - // options for AOD rate limiting - workflowOptions.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::Int64, 0LL, {"Rate limit AOD processing based on memory"}}); - - // options for AOD writer - workflowOptions.push_back(ConfigParamSpec{"aod-writer-json", VariantType::String, "", {"Name of the json configuration file"}}); - workflowOptions.push_back(ConfigParamSpec{"aod-writer-resfile", VariantType::String, "", {"Default name of the output file"}}); - workflowOptions.push_back(ConfigParamSpec{"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}}); - workflowOptions.push_back(ConfigParamSpec{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}}); - workflowOptions.push_back(ConfigParamSpec{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}}); - - workflowOptions.push_back(ConfigParamSpec{"fairmq-rate-logging", VariantType::Int, 60, {"Rate logging for FairMQ channels"}}); - - workflowOptions.push_back(ConfigParamSpec{"forwarding-policy", - VariantType::String, - "dangling", - {"Which messages to forward." - " dangling: dangling outputs," - " all: all messages"}}); - workflowOptions.push_back(ConfigParamSpec{"forwarding-destination", - VariantType::String, - "file", - {"Destination for forwarded messages." - " file: write to file," - " fairmq: send to output proxy"}}); + auto requiredWorkflowOptions = WorkflowCustomizationHelpers::requiredWorkflowOptions(); + workflowOptions.insert(std::end(workflowOptions), std::begin(requiredWorkflowOptions), std::end(requiredWorkflowOptions)); std::vector completionPolicies; UserCustomizationsHelper::userDefinedCustomization(completionPolicies, 0); diff --git a/Framework/Core/src/ChannelConfigurationPolicy.cxx b/Framework/Core/src/ChannelConfigurationPolicy.cxx index 514cdc60f5fd3..3facdf12ed57e 100644 --- a/Framework/Core/src/ChannelConfigurationPolicy.cxx +++ b/Framework/Core/src/ChannelConfigurationPolicy.cxx @@ -17,9 +17,15 @@ namespace o2::framework std::vector ChannelConfigurationPolicy::createDefaultPolicies(ConfigContext const& configContext) { ChannelConfigurationPolicy defaultPolicy; + FairMQChannelConfigSpec spec; + spec.rateLogging = configContext.options().get("fairmq-rate-logging"); + spec.recvBufferSize = configContext.options().get("fairmq-recv-buffer-size"); + spec.sendBufferSize = configContext.options().get("fairmq-send-buffer-size"); + spec.ipcPrefix = configContext.options().get("fairmq-ipc-prefix"); + defaultPolicy.match = ChannelConfigurationPolicyHelpers::matchAny; - defaultPolicy.modifyInput = ChannelConfigurationPolicyHelpers::pullInput({configContext.options().get("fairmq-rate-logging")}); - defaultPolicy.modifyOutput = ChannelConfigurationPolicyHelpers::pushOutput({configContext.options().get("fairmq-rate-logging")}); + defaultPolicy.modifyInput = ChannelConfigurationPolicyHelpers::pullInput(spec); + defaultPolicy.modifyOutput = ChannelConfigurationPolicyHelpers::pushOutput(spec); return {defaultPolicy}; } diff --git a/Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx b/Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx index 89dbfbe71f87d..9169edccd61bb 100644 --- a/Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx +++ b/Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx @@ -41,6 +41,9 @@ ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPoli channel.method = ChannelMethod::Connect; channel.type = ChannelType::Sub; channel.rateLogging = spec.rateLogging; + channel.recvBufferSize = spec.recvBufferSize; + channel.recvBufferSize = spec.sendBufferSize; + channel.ipcPrefix = spec.ipcPrefix; }; } @@ -50,6 +53,9 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol channel.method = ChannelMethod::Bind; channel.type = ChannelType::Pub; channel.rateLogging = spec.rateLogging; + channel.recvBufferSize = spec.recvBufferSize; + channel.recvBufferSize = spec.sendBufferSize; + channel.ipcPrefix = spec.ipcPrefix; }; } @@ -59,6 +65,9 @@ ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPoli channel.method = ChannelMethod::Connect; channel.type = ChannelType::Pull; channel.rateLogging = spec.rateLogging; + channel.recvBufferSize = spec.recvBufferSize; + channel.recvBufferSize = spec.sendBufferSize; + channel.ipcPrefix = spec.ipcPrefix; }; } @@ -68,6 +77,9 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol channel.method = ChannelMethod::Bind; channel.type = ChannelType::Push; channel.rateLogging = spec.rateLogging; + channel.recvBufferSize = spec.recvBufferSize; + channel.recvBufferSize = spec.sendBufferSize; + channel.ipcPrefix = spec.ipcPrefix; }; } @@ -77,6 +89,9 @@ ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPoli channel.method = ChannelMethod::Connect; channel.type = ChannelType::Pair; channel.rateLogging = spec.rateLogging; + channel.recvBufferSize = spec.recvBufferSize; + channel.recvBufferSize = spec.sendBufferSize; + channel.ipcPrefix = spec.ipcPrefix; }; } @@ -86,6 +101,9 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol channel.method = ChannelMethod::Bind; channel.type = ChannelType::Pair; channel.rateLogging = spec.rateLogging; + channel.recvBufferSize = spec.recvBufferSize; + channel.recvBufferSize = spec.sendBufferSize; + channel.ipcPrefix = spec.ipcPrefix; }; } diff --git a/Framework/Core/src/ChannelSpecHelpers.cxx b/Framework/Core/src/ChannelSpecHelpers.cxx index 7c04beecb19ea..5a15a05a7367b 100644 --- a/Framework/Core/src/ChannelSpecHelpers.cxx +++ b/Framework/Core/src/ChannelSpecHelpers.cxx @@ -67,7 +67,7 @@ std::string ChannelSpecHelpers::channelUrl(OutputChannelSpec const& channel) { switch (channel.protocol) { case ChannelProtocol::IPC: - return fmt::format("ipc://{}/{}_{},transport=shmem", getTmpFolder(), channel.hostname, channel.port); + return fmt::format("ipc://{}/{}_{},transport=shmem", channel.ipcPrefix, channel.hostname, channel.port); default: return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) : fmt::format("tcp://{}:{}", channel.hostname, channel.port); @@ -78,7 +78,7 @@ std::string ChannelSpecHelpers::channelUrl(InputChannelSpec const& channel) { switch (channel.protocol) { case ChannelProtocol::IPC: - return fmt::format("ipc://{}/{}_{},transport=shmem", getTmpFolder(), channel.hostname, channel.port); + return fmt::format("ipc://{}/{}_{},transport=shmem", channel.ipcPrefix, channel.hostname, channel.port); default: return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{}", channel.port) : fmt::format("tcp://{}:{}", channel.hostname, channel.port); diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index cb4c76e217373..89ed2328e2ca8 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -36,6 +36,7 @@ #include #include +#include #include #include @@ -245,32 +246,26 @@ struct ExpirationHandlerHelpers { /// FIXME: support shared memory std::string DeviceSpecHelpers::inputChannel2String(const InputChannelSpec& channel) { - std::string result; - - if (!channel.name.empty()) { - result += "name=" + channel.name + ","; - } - result += std::string("type=") + ChannelSpecHelpers::typeAsString(channel.type); - result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method); - result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel); - result += std::string(",rateLogging=" + std::to_string(channel.rateLogging)); - - return result; + return fmt::format("{}type={},method={},address={},rateLogging={},recvBufferSize={},sendBufferSize={}", + channel.name.empty() ? "" : "name=" + channel.name + ",", + ChannelSpecHelpers::typeAsString(channel.type), + ChannelSpecHelpers::methodAsString(channel.method), + ChannelSpecHelpers::channelUrl(channel), + channel.rateLogging, + channel.recvBufferSize, + channel.sendBufferSize); } std::string DeviceSpecHelpers::outputChannel2String(const OutputChannelSpec& channel) { - std::string result; - - if (!channel.name.empty()) { - result += "name=" + channel.name + ","; - } - result += std::string("type=") + ChannelSpecHelpers::typeAsString(channel.type); - result += std::string(",method=") + ChannelSpecHelpers::methodAsString(channel.method); - result += std::string(",address=") + ChannelSpecHelpers::channelUrl(channel); - result += std::string(",rateLogging=" + std::to_string(channel.rateLogging)); - - return result; + return fmt::format("{}type={},method={},address={},rateLogging={},recvBufferSize={},sendBufferSize={}", + channel.name.empty() ? "" : "name=" + channel.name + ",", + ChannelSpecHelpers::typeAsString(channel.type), + ChannelSpecHelpers::methodAsString(channel.method), + ChannelSpecHelpers::channelUrl(channel), + channel.rateLogging, + channel.recvBufferSize, + channel.sendBufferSize); } void DeviceSpecHelpers::processOutEdgeActions(std::vector& devices, diff --git a/Framework/Core/src/WorkflowCustomizationHelpers.cxx b/Framework/Core/src/WorkflowCustomizationHelpers.cxx new file mode 100644 index 0000000000000..4d187933bce5e --- /dev/null +++ b/Framework/Core/src/WorkflowCustomizationHelpers.cxx @@ -0,0 +1,65 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/WorkflowCustomizationHelpers.h" +#include +#include +#include + +namespace +{ +std::string defaultIPCFolder() +{ + /// Find out a place where we can write the sockets + char const* channelPrefix = getenv("TMPDIR"); + if (channelPrefix) { + return std::string(channelPrefix); + } + return access("/tmp", W_OK) == 0 ? "/tmp" : "."; +} +} // namespace + +namespace o2::framework +{ + +std::vector WorkflowCustomizationHelpers::requiredWorkflowOptions() +{ + return std::vector{{ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}}, + ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}}, + ConfigParamSpec{"clone", VariantType::String, "", {"clone processors from a template"}}, + ConfigParamSpec{"workflow-suffix", VariantType::String, "", {"suffix to add to all dataprocessors"}}, + + // options for AOD rate limiting + ConfigParamSpec{"aod-memory-rate-limit", VariantType::Int64, 0LL, {"Rate limit AOD processing based on memory"}}, + + // options for AOD writer + ConfigParamSpec{"aod-writer-json", VariantType::String, "", {"Name of the json configuration file"}}, + ConfigParamSpec{"aod-writer-resfile", VariantType::String, "", {"Default name of the output file"}}, + ConfigParamSpec{"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}}, + ConfigParamSpec{"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}}, + ConfigParamSpec{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}}, + + ConfigParamSpec{"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}}, + ConfigParamSpec{"fairmq-recv-buffer-size", VariantType::Int, 1000, {"recvBufferSize option for FairMQ channels"}}, + ConfigParamSpec{"fairmq-send-buffer-size", VariantType::Int, 1000, {"sendBufferSize option for FairMQ channels"}}, + /// Find out a place where we can write the sockets + ConfigParamSpec{"fairmq-ipc-prefix", VariantType::String, defaultIPCFolder(), {"Prefix for FairMQ channels location"}}, + + ConfigParamSpec{"forwarding-policy", VariantType::String, "dangling", {"Which messages to forward." + " dangling: dangling outputs," + " all: all messages"}}, + ConfigParamSpec{"forwarding-destination", + VariantType::String, + "file", + {"Destination for forwarded messages." + " file: write to file," + " fairmq: send to output proxy"}}}}; +} +} // namespace o2::framework diff --git a/Framework/Core/test/Mocking.h b/Framework/Core/test/Mocking.h index c89b324910fa0..8a8577fdbad84 100644 --- a/Framework/Core/test/Mocking.h +++ b/Framework/Core/test/Mocking.h @@ -12,6 +12,7 @@ #include "Framework/WorkflowSpec.h" #include "Framework/DataSpecUtils.h" #include "Framework/SimpleOptionsRetriever.h" +#include "Framework/WorkflowCustomizationHelpers.h" #include "../src/WorkflowHelpers.h" std::unique_ptr makeEmptyConfigContext() @@ -20,21 +21,7 @@ std::unique_ptr makeEmptyConfigContext() // FIXME: Ugly... We need to fix ownership and make sure the ConfigContext // either owns or shares ownership of the registry. std::vector> retrievers; - static std::vector specs = { - ConfigParamSpec{"forwarding-policy", - VariantType::String, - "dangling", - {""}}, - ConfigParamSpec{"forwarding-destination", - VariantType::String, - "file", - {"what to do with dangling outputs. file: write to file, fairmq: send to output proxy"}}, - ConfigParamSpec{"fairmq-rate-logging", - VariantType::Int, - 60, - {"rateLogging"}}, - }; - specs.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::String, "0", {"rate"}}); + static std::vector specs = WorkflowCustomizationHelpers::requiredWorkflowOptions(); auto store = std::make_unique(specs, std::move(retrievers)); store->preload(); store->activate();