Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef CLUSTERNATIVEHELPER_H
#define CLUSTERNATIVEHELPER_H
/// @file ClusterNativeHelper.h
/// @brief Helper class to read the binary format of TPC ClusterNative
/// @since 2019-01-23
/// @author Matthias Richter

#ifndef CLUSTERNATIVEHELPER_H
#define CLUSTERNATIVEHELPER_H

#include "DataFormatsTPC/ClusterNative.h"
#include "DataFormatsTPC/ClusterGroupAttribute.h"
#include "DataFormatsTPC/Constants.h"
Expand Down
173 changes: 173 additions & 0 deletions DataFormats/Detectors/TPC/include/DataFormatsTPC/WorkflowHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

/// @file WorkflowHelper.h
/// @brief Helper class to obtain TPC clusters / digits / labels from DPL
/// @author David Rohr

#ifndef WORKFLOWHELPER_H
#define WORKFLOWHELPER_H

#include <memory>
#include "Framework/ProcessingContext.h"
#include "Framework/DataRefUtils.h"
#include <Framework/InputRecord.h>
#include "Framework/InputRecordWalker.h"
#include "DataFormatsTPC/TrackTPC.h"
#include "SimulationDataFormat/MCCompLabel.h"
#include "SimulationDataFormat/MCTruthContainer.h"
#include "SimulationDataFormat/ConstMCTruthContainer.h"
#include "DataFormatsTPC/Constants.h"
#include "DataFormatsTPC/Digit.h"
#include "DataFormatsTPC/TPCSectorHeader.h"
#include "DataFormatsTPC/ClusterGroupAttribute.h"
#include "DataFormatsTPC/ClusterNative.h"
#include "DataFormatsTPC/ClusterNativeHelper.h"

// NOTE: The DataFormatsTPC package does not have all required dependencies for these includes.
// The users of these headers should add them by themselves, in order to avoid making
// DataFormatsTPC depend on the Framework.

namespace o2
{
namespace tpc
{
namespace internal
{
struct InputRef {
o2::framework::DataRef data;
o2::framework::DataRef labels;
};
struct getWorkflowTPCInput_ret_internal {
std::map<int, InputRef> inputrefs;
std::vector<o2::dataformats::ConstMCLabelContainerView> mcInputs;
std::vector<gsl::span<const char>> inputs;
std::array<int, constants::MAXSECTOR> inputDigitsMCIndex;
std::vector<o2::dataformats::ConstMCLabelContainerView> inputDigitsMC;
std::unique_ptr<ClusterNative[]> clusterBuffer;
ClusterNativeHelper::ConstMCLabelContainerViewWithBuffer clustersMCBuffer;
};
struct getWorkflowTPCInput_ret {
getWorkflowTPCInput_ret_internal internal;
std::array<gsl::span<const o2::tpc::Digit>, constants::MAXSECTOR> inputDigits;
std::array<const o2::dataformats::ConstMCLabelContainerView*, constants::MAXSECTOR> inputDigitsMCPtrs;
ClusterNativeAccess clusterIndex;
};
} // namespace internal

static auto getWorkflowTPCInput(o2::framework::ProcessingContext& pc, int verbosity = 0, bool do_mcLabels = false, bool do_clusters = true, unsigned long tpcSectorMask = 0xFFFFFFFFF, bool do_digits = false)
{
auto retVal = std::make_unique<internal::getWorkflowTPCInput_ret>();

if (do_clusters && do_digits) {
throw std::invalid_argument("Currently cannot process both clusters and digits");
}

if (do_mcLabels) {
std::vector<o2::framework::InputSpec> filter = {
{"check", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "DIGITSMCTR"}, o2::framework::Lifetime::Timeframe},
{"check", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CLNATIVEMCLBL"}, o2::framework::Lifetime::Timeframe},
};
unsigned long recvMask = 0;
for (auto const& ref : o2::framework::InputRecordWalker(pc.inputs(), filter)) {
auto const* sectorHeader = o2::framework::DataRefUtils::getHeader<TPCSectorHeader*>(ref);
if (sectorHeader == nullptr) {
// FIXME: think about error policy
LOG(ERROR) << "sector header missing on header stack";
return retVal;
}
const int sector = sectorHeader->sector();
if (sector < 0) {
continue;
}
if (recvMask & sectorHeader->sectorBits) {
throw std::runtime_error("can only have one MC data set per sector");
}
recvMask |= sectorHeader->sectorBits;
retVal->internal.inputrefs[sector].labels = ref;
if (do_digits) {
retVal->internal.inputDigitsMCIndex[sector] = retVal->internal.inputDigitsMC.size();
retVal->internal.inputDigitsMC.emplace_back(o2::dataformats::ConstMCLabelContainerView(pc.inputs().get<gsl::span<char>>(ref)));
}
}
if (recvMask != tpcSectorMask) {
throw std::runtime_error("Incomplete set of MC labels received");
}
if (do_digits) {
for (unsigned int i = 0; i < constants::MAXSECTOR; i++) {
if (verbosity >= 1) {
LOG(INFO) << "GOT MC LABELS FOR SECTOR " << i << " -> " << retVal->internal.inputDigitsMC[retVal->internal.inputDigitsMCIndex[i]].getNElements();
}
retVal->inputDigitsMCPtrs[i] = &retVal->internal.inputDigitsMC[retVal->internal.inputDigitsMCIndex[i]];
}
}
}

if (do_clusters || do_digits) {
std::vector<o2::framework::InputSpec> filter = {
{"check", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "DIGITS"}, o2::framework::Lifetime::Timeframe},
{"check", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CLUSTERNATIVE"}, o2::framework::Lifetime::Timeframe},
};
unsigned long recvMask = 0;
for (auto const& ref : o2::framework::InputRecordWalker(pc.inputs(), filter)) {
auto const* sectorHeader = o2::framework::DataRefUtils::getHeader<TPCSectorHeader*>(ref);
if (sectorHeader == nullptr) {
throw std::runtime_error("sector header missing on header stack");
}
const int sector = sectorHeader->sector();
if (sector < 0) {
continue;
}
if (recvMask & sectorHeader->sectorBits) {
throw std::runtime_error("can only have one cluster data set per sector");
}
recvMask |= sectorHeader->sectorBits;
retVal->internal.inputrefs[sector].data = ref;
if (do_digits) {
retVal->inputDigits[sector] = pc.inputs().get<gsl::span<o2::tpc::Digit>>(ref);
if (verbosity >= 1) {
LOG(INFO) << "GOT DIGITS SPAN FOR SECTOR " << sector << " -> " << retVal->inputDigits[sector].size();
}
}
}
if (recvMask != tpcSectorMask) {
throw std::runtime_error("Incomplete set of clusters/digits received");
}

for (auto const& refentry : retVal->internal.inputrefs) {
auto& sector = refentry.first;
auto& ref = refentry.second.data;
if (do_clusters) {
if (ref.payload == nullptr) {
// skip zero-length message
continue;
}
if (refentry.second.labels.header != nullptr && refentry.second.labels.payload != nullptr) {
retVal->internal.mcInputs.emplace_back(o2::dataformats::ConstMCLabelContainerView(pc.inputs().get<gsl::span<char>>(refentry.second.labels)));
}
retVal->internal.inputs.emplace_back(gsl::span(ref.payload, o2::framework::DataRefUtils::getPayloadSize(ref)));
}
if (verbosity > 1) {
LOG(INFO) << "received " << *(ref.spec) << ", size " << o2::framework::DataRefUtils::getPayloadSize(ref) << " for sector " << sector;
}
}
}

if (do_clusters) {
memset(&retVal->clusterIndex, 0, sizeof(retVal->clusterIndex));
ClusterNativeHelper::Reader::fillIndex(retVal->clusterIndex, retVal->internal.clusterBuffer, retVal->internal.clustersMCBuffer, retVal->internal.inputs, retVal->internal.mcInputs, [&tpcSectorMask](auto& index) { return tpcSectorMask & (1ul << index); });
}

return retVal;
}

} // namespace tpc
} // namespace o2
#endif // WORKFLOWHELPER_H
15 changes: 15 additions & 0 deletions DataFormats/Detectors/TPC/src/WorkflowHelper.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

/// @file WorkflowHelper.cxx
/// @author David Rohr

// Dummy file for the workflow helper, does not include WorkflowHelper.h,
// since DataFormats/TPC does not pull in all requred framework dependenecies.
105 changes: 3 additions & 102 deletions Detectors/GlobalTrackingWorkflow/src/TPCITSMatchingSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <vector>

#include "Framework/ConfigParamRegistry.h"
#include "Framework/InputRecordWalker.h"
#include "GlobalTrackingWorkflow/TPCITSMatchingSpec.h"
#include "ReconstructionDataFormats/TrackTPCITS.h"
#include "SimulationDataFormat/MCCompLabel.h"
Expand All @@ -25,8 +24,7 @@
#include "DataFormatsITSMFT/ROFRecord.h"
#include "DataFormatsTPC/TrackTPC.h"
#include "DataFormatsTPC/ClusterNative.h"
#include "DataFormatsTPC/ClusterNativeHelper.h"
#include "DataFormatsTPC/TPCSectorHeader.h"
#include "DataFormatsTPC/WorkflowHelper.h"
#include "DetectorsBase/GeometryManager.h"
#include "DetectorsBase/Propagator.h"
#include "ITSMFTBase/DPLAlpideParam.h"
Expand Down Expand Up @@ -115,104 +113,7 @@ void TPCITSMatchingDPL::run(ProcessingContext& pc)

const auto clusTPCShmap = pc.inputs().get<gsl::span<unsigned char>>("clusTPCshmap");

//---------------------------->> TPC Clusters loading >>------------------------------------------
int operation = 0;
uint64_t activeSectors = 0;
std::bitset<o2::tpc::constants::MAXSECTOR> validSectors = 0;
std::map<int, DataRef> datarefs;
std::vector<InputSpec> filter = {
{"check", ConcreteDataTypeMatcher{"TPC", "CLUSTERNATIVE"}, Lifetime::Timeframe},
};
for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
auto const* sectorHeader = DataRefUtils::getHeader<o2::tpc::TPCSectorHeader*>(ref);
if (sectorHeader == nullptr) {
// FIXME: think about error policy
LOG(ERROR) << "sector header missing on header stack";
throw std::runtime_error("sector header missing on header stack");
}
int sector = sectorHeader->sector();
std::bitset<o2::tpc::constants::MAXSECTOR> sectorMask(sectorHeader->sectorBits);
LOG(INFO) << "Reading TPC cluster data, sector mask is " << sectorMask;
if ((validSectors & sectorMask).any()) {
// have already data for this sector, this should not happen in the current
// sequential implementation, for parallel path merged at the tracker stage
// multiple buffers need to be handled
throw std::runtime_error("can only have one data set per sector");
}
activeSectors |= sectorHeader->activeSectors;
validSectors |= sectorMask;
datarefs[sector] = ref;
}

auto printInputLog = [&validSectors, &activeSectors](auto& r, const char* comment, auto& s) {
LOG(INFO) << comment << " " << *(r.spec) << ", size " << DataRefUtils::getPayloadSize(r) //
<< " for sector " << s //
<< std::endl //
<< " input status: " << validSectors //
<< std::endl //
<< " active sectors: " << std::bitset<o2::tpc::constants::MAXSECTOR>(activeSectors);
};

if (activeSectors == 0 || (activeSectors & validSectors.to_ulong()) != activeSectors) {
// not all sectors available
// Since we expect complete input, this should not happen (why does the bufferization considered for TPC CA tracker? Ask Matthias)
throw std::runtime_error("Did not receive TPC clusters data for all sectors");
}
//------------------------------------------------------------------------------
std::vector<gsl::span<const char>> clustersTPC;

for (auto const& refentry : datarefs) {
auto& sector = refentry.first;
auto& ref = refentry.second;
clustersTPC.emplace_back(ref.payload, DataRefUtils::getPayloadSize(ref));
printInputLog(ref, "received", sector);
}

// Just print TPC clusters status
{
// make human readable information from the bitfield
std::string bitInfo;
auto nActiveBits = validSectors.count();
if (((uint64_t)0x1 << nActiveBits) == validSectors.to_ulong() + 1) {
// sectors 0 to some upper bound are active
bitInfo = "0-" + std::to_string(nActiveBits - 1);
} else {
int rangeStart = -1;
int rangeEnd = -1;
for (size_t sector = 0; sector < validSectors.size(); sector++) {
if (validSectors.test(sector)) {
if (rangeStart < 0) {
if (rangeEnd >= 0) {
bitInfo += ",";
}
bitInfo += std::to_string(sector);
if (nActiveBits == 1) {
break;
}
rangeStart = sector;
}
rangeEnd = sector;
} else {
if (rangeStart >= 0 && rangeEnd > rangeStart) {
bitInfo += "-" + std::to_string(rangeEnd);
}
rangeStart = -1;
}
}
if (rangeStart >= 0 && rangeEnd > rangeStart) {
bitInfo += "-" + std::to_string(rangeEnd);
}
}
LOG(INFO) << "running matching for sector(s) " << bitInfo;
}

o2::tpc::ClusterNativeAccess clusterIndex;
std::unique_ptr<o2::tpc::ClusterNative[]> clusterBuffer;
memset(&clusterIndex, 0, sizeof(clusterIndex));
o2::tpc::ClusterNativeHelper::ConstMCLabelContainerViewWithBuffer dummyMCOutput;
std::vector<o2::tpc::ClusterNativeHelper::ConstMCLabelContainerView> dummyMCInput;
o2::tpc::ClusterNativeHelper::Reader::fillIndex(clusterIndex, clusterBuffer, dummyMCOutput, clustersTPC, dummyMCInput);
//----------------------------<< TPC Clusters loading <<------------------------------------------
const auto& inputsTPCclusters = o2::tpc::getWorkflowTPCInput(pc);

//
MCLabelsTr lblITS;
Expand Down Expand Up @@ -243,7 +144,7 @@ void TPCITSMatchingDPL::run(ProcessingContext& pc)
mMatching.setITSClusterROFRecInp(clusITSROF);
mMatching.setTPCTracksInp(tracksTPC);
mMatching.setTPCTrackClusIdxInp(tracksTPCClRefs);
mMatching.setTPCClustersInp(&clusterIndex);
mMatching.setTPCClustersInp(&inputsTPCclusters->clusterIndex);
mMatching.setTPCClustersSharingMap(clusTPCShmap);

if (mUseMC) {
Expand Down
Loading