diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 327358e2424ec..55dd973759ec9 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -25,8 +25,6 @@ #include "Framework/ChannelInfo.h" #include "Framework/Logger.h" -#include - #include #if __has_include() #include @@ -147,6 +145,18 @@ static inline auto extractOriginalsTuple(framework::pack, ProcessingConte return std::make_tuple(extractTypedOriginal(pc)...); } +void AODJAlienReaderHelpers::dumpFileMetrics(Monitoring& monitoring, TFile* currentFile, int tfPerFile, int tfRead) +{ + std::string monitoringInfo(fmt::format("lfn={},size={},total_tf={},read_tf={},read_bytes={},read_calls={}", currentFile->GetPath(), currentFile->GetSize(), tfPerFile, tfRead, currentFile->GetBytesRead(), currentFile->GetReadCalls())); +#if __has_include() + auto alienFile = dynamic_cast(currentFile); + if (alienFile) { + monitoringInfo += fmt::format(",se={}", alienFile->GetSE()); + } +#endif + monitoring.send(Metric{monitoringInfo, "aod-file-read-info"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); +} + AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() { auto callback = AlgorithmSpec{adaptStateful([](ConfigParamRegistry const& options, @@ -159,7 +169,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() monitoring.flushBuffer(); if (!options.isSet("aod-file")) { - LOGP(ERROR, "No input file defined!"); + LOGP(FATAL, "No input file defined!"); throw std::runtime_error("Processing is stopped!"); } @@ -199,24 +209,12 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() numTF, watchdog, didir](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) { - // check if RuntimeLimit is reached - if (!watchdog->update()) { - LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit); - LOGP(INFO, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1); - monitoring.flushBuffer(); - didir->closeInputFiles(); - control.endOfStream(); - control.readyToQuit(QuitRequest::Me); - return; - } - // Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId // the TF to read is numTF assert(device.inputTimesliceId < device.maxInputTimeslices); uint64_t timeFrameNumber = 0; int fcnt = (*fileCounter * device.maxInputTimeslices) + device.inputTimesliceId; int ntf = *numTF + 1; - monitoring.send(Metric{(uint64_t)ntf, "tf-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); static int currentFileCounter = -1; static int filesProcessed = 0; if (currentFileCounter != *fileCounter) { @@ -225,11 +223,25 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() } // loop over requested tables - TTree* tr = nullptr; bool first = true; static size_t totalSizeUncompressed = 0; static size_t totalSizeCompressed = 0; - static size_t totalReadCalls = 0; + static TFile* currentFile = nullptr; + static int tfCurrentFile = -1; + + // check if RuntimeLimit is reached + if (!watchdog->update()) { + LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit); + LOGP(INFO, "Stopping reader {} after time frame {}.", device.inputTimesliceId, watchdog->numberTimeFrames - 1); + if (currentFile) { + dumpFileMetrics(monitoring, currentFile, tfCurrentFile, ntf); + } + monitoring.flushBuffer(); + didir->closeInputFiles(); + control.endOfStream(); + control.readyToQuit(QuitRequest::Me); + return; + } for (auto route : requestedTables) { @@ -238,11 +250,12 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); // create a TreeToTable object - auto info = didir->getFileFolder(dh, fcnt, ntf); - size_t before = 0; - tr = didir->getDataTree(dh, fcnt, ntf); + TTree* tr = didir->getDataTree(dh, fcnt, ntf); if (!tr) { if (first) { + // dump metrics of file which is done for reading + dumpFileMetrics(monitoring, currentFile, tfCurrentFile, ntf); + // check if there is a next file to read fcnt += device.maxInputTimeslices; if (didir->atEnd(fcnt)) { @@ -278,7 +291,6 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() // add branches to read // fill the table - auto colnames = getColumnNames(dh); if (colnames.size() == 0) { totalSizeCompressed += tr->GetZipBytes(); @@ -293,35 +305,19 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() } } t2t.fill(tr); - if (info.file) { - totalReadCalls += info.file->GetReadCalls() - before; - static std::string currentFileRead = ""; - std::string nextFileRead = info.file->GetPath(); - if (currentFileRead != nextFileRead) { - currentFileRead = nextFileRead; - std::string monitoringInfo(currentFileRead); - monitoringInfo += ","; - monitoringInfo += std::to_string(info.file->GetSize()); -#if __has_include() - auto alienFile = dynamic_cast(info.file); - if (alienFile) { - monitoringInfo += ","; - monitoringInfo += alienFile->GetSE(); - } -#endif - monitoring.send(Metric{monitoringInfo, "aod-file-read-info"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); - LOGP(INFO, "File read info: {}", monitoringInfo); - // TODO extend to publish at the end of the file (or on each TF?) the sizes read *per file* - } - } monitoring.send(Metric{(double)ps.GetReadCalls(), "aod-tree-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); delete tr; + // needed for metrics dumping (upon next file read, or terminate due to watchdog) + auto info = didir->getFileFolder(dh, fcnt, ntf); + currentFile = info.file; + tfCurrentFile = didir->getTimeFramesInFile(dh, fcnt); + first = false; } + monitoring.send(Metric{(uint64_t)ntf, "tf-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); monitoring.send(Metric{(uint64_t)totalSizeCompressed / 1000, "aod-bytes-read-compressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); - monitoring.send(Metric{(uint64_t)totalReadCalls, "aod-total-read-calls"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); // save file number and time frame *fileCounter = (fcnt - device.inputTimesliceId) / device.maxInputTimeslices; diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h index 8ef579bd68edf..613073c763f29 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h @@ -14,6 +14,7 @@ #include "Framework/TableBuilder.h" #include "Framework/AlgorithmSpec.h" #include "Framework/Logger.h" +#include #include namespace o2::framework::readers @@ -21,6 +22,7 @@ namespace o2::framework::readers struct AODJAlienReaderHelpers { static AlgorithmSpec rootFileReaderCallback(); + static void dumpFileMetrics(o2::monitoring::Monitoring& monitoring, TFile* currentFile, int tfPerFile, int tfRead); }; } // namespace o2::framework::readers diff --git a/Framework/Core/include/Framework/DataInputDirector.h b/Framework/Core/include/Framework/DataInputDirector.h index 1a7a3e5af5ee1..ba9fc1007c09c 100644 --- a/Framework/Core/include/Framework/DataInputDirector.h +++ b/Framework/Core/include/Framework/DataInputDirector.h @@ -69,6 +69,7 @@ struct DataInputDescriptor { uint64_t getTimeFrameNumber(int counter, int numTF); FileAndFolder getFileFolder(int counter, int numTF); + int getTimeFramesInFile(int counter); void closeInputFile(); bool isAlienSupportOn() { return mAlienSupport; } @@ -114,6 +115,7 @@ struct DataInputDirector { TTree* getDataTree(header::DataHeader dh, int counter, int numTF); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF); + int getTimeFramesInFile(header::DataHeader dh, int counter); private: std::string minputfilesFile; diff --git a/Framework/Core/src/DataInputDirector.cxx b/Framework/Core/src/DataInputDirector.cxx index e5143a10637d8..fb84ae510a5e0 100644 --- a/Framework/Core/src/DataInputDirector.cxx +++ b/Framework/Core/src/DataInputDirector.cxx @@ -164,6 +164,11 @@ FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF) return fileAndFolder; } +int DataInputDescriptor::getTimeFramesInFile(int counter) +{ + return mfilenames.at(counter)->numberOfTimeFrames; +} + void DataInputDescriptor::closeInputFile() { if (mcurrentFile) { @@ -526,6 +531,17 @@ FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counte return didesc->getFileFolder(counter, numTF); } +int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter) +{ + auto didesc = getDataInputDescriptor(dh); + // if NOT match then use defaultDataInputDescriptor + if (!didesc) { + didesc = mdefaultDataInputDescriptor; + } + + return didesc->getTimeFramesInFile(counter); +} + uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counter, int numTF) { auto didesc = getDataInputDescriptor(dh); diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 54b1aa7bf78b4..f25258c3c61ff 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1324,8 +1324,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, performanceMetrics.push_back("arrow-bytes-delta"); performanceMetrics.push_back("aod-bytes-read-uncompressed"); performanceMetrics.push_back("aod-bytes-read-compressed"); - performanceMetrics.push_back("aod-total-read-calls"); - performanceMetrics.push_back("aod-file-read-path"); + performanceMetrics.push_back("aod-file-read-info"); ResourcesMonitoringHelper::dumpMetricsToJSON(metricsInfos, driverInfo.metrics, deviceSpecs, performanceMetrics); } // This is a clean exit. Before we do so, if required,