diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 319e142fac81..737bce88c47b 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -146,6 +146,10 @@ # include #endif +#if USE_PARQUET +# include +#endif + #include /// A minimal file used when the server is run without installation @@ -286,6 +290,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 primary_index_cache_size; extern const ServerSettingsDouble primary_index_cache_size_ratio; extern const ServerSettingsBool use_legacy_mongodb_integration; + extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size; } } @@ -2234,6 +2239,10 @@ try if (dns_cache_updater) dns_cache_updater->start(); +#if USE_PARQUET + ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]); +#endif + /// Set current database name before loading tables and databases because /// system logs may copy global context. std::string default_database = server_settings[ServerSetting::default_database].toString(); diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index 900d4edc39f1..d4416f80ba17 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -68,7 +68,7 @@ class IInputFormat : public SourceWithKeyCondition void needOnlyCount() { need_only_count = true; } /// Set additional info/key/id related to underlying storage of the ReadBuffer - virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {} + virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {} protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 2132b37e4139..2578c4e184c5 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include namespace ProfileEvents @@ -522,15 +523,6 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } -ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes) - : CacheBase(max_size_bytes) {} - -ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes) -{ - static ParquetFileMetaDataCache instance(max_size_bytes); - return &instance; -} - std::shared_ptr ParquetBlockInputFormat::readMetadataFromFile() { createArrowFileIfNotCreated(); @@ -547,7 +539,7 @@ std::shared_ptr ParquetBlockInputFormat::getFileMetaData( return readMetadataFromFile(); } - auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet( + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( metadata_cache.key, [&]() { @@ -834,11 +826,10 @@ void ParquetBlockInputFormat::initializeIfNeeded() } } -void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) +void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) { metadata_cache.key = key_; metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; - metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]; } void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index d9a8d82e5cd6..a1b7eade4303 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -2,7 +2,6 @@ #include "config.h" #if USE_PARQUET -#include #include #include #include @@ -73,7 +72,7 @@ class ParquetBlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } - void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; private: Chunk read() override; @@ -350,8 +349,9 @@ class ParquetBlockInputFormat : public IInputFormat { String key; bool use_cache = false; - UInt64 max_size_bytes{0}; - } metadata_cache; + }; + + Cache metadata_cache; }; class ParquetSchemaReader : public ISchemaReader @@ -370,16 +370,6 @@ class ParquetSchemaReader : public ISchemaReader std::shared_ptr metadata; }; -class ParquetFileMetaDataCache : public CacheBase -{ -public: - static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes); - void clear() {} - -private: - ParquetFileMetaDataCache(UInt64 max_size_bytes); -}; - } #endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp new file mode 100644 index 000000000000..85808f90543b --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp @@ -0,0 +1,20 @@ +#include + +#ifdef USE_PARQUET + +namespace DB +{ + +ParquetFileMetaDataCache::ParquetFileMetaDataCache() + : CacheBase(0) +{} + +ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance() +{ + static ParquetFileMetaDataCache instance; + return &instance; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h new file mode 100644 index 000000000000..fb5fc1bb0217 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetFileMetaDataCache.h @@ -0,0 +1,30 @@ +#pragma once + +#include "config.h" + +#if USE_PARQUET + +namespace parquet +{ + +class FileMetaData; + +} + +#include + +namespace DB +{ + +class ParquetFileMetaDataCache : public CacheBase +{ +public: + static ParquetFileMetaDataCache * instance(); + +private: + ParquetFileMetaDataCache(); +}; + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp index 8264b565e39a..aef18523f483 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp @@ -22,8 +22,17 @@ #include #include "ArrowBufferedStreams.h" #include +#include +#include +#include +namespace ProfileEvents +{ +extern const Event ParquetMetaDataCacheHits; +extern const Event ParquetMetaDataCacheMisses; +} + namespace DB { @@ -32,6 +41,11 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace Setting +{ +extern const SettingsBool input_format_parquet_use_metadata_cache; +} + static NamesAndTypesList getHeaderForParquetMetadata() { NamesAndTypesList names_and_types{ @@ -129,10 +143,35 @@ void checkHeader(const Block & header) static std::shared_ptr getFileMetadata( ReadBuffer & in, const FormatSettings & format_settings, - std::atomic & is_stopped) + std::atomic & is_stopped, + ParquetMetadataInputFormat::Cache metadata_cache) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); - return parquet::ReadMetaData(arrow_file); + // in-memory cache is not implemented for local file operations, only for remote files + // there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation + // and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key + if (!metadata_cache.use_cache || metadata_cache.key.empty()) + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + + auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet( + metadata_cache.key, + [&]() + { + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + return parquet::ReadMetaData(arrow_file); + } + ); + + if (loaded) + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses); + else + ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits); + + return parquet_file_metadata; + + } ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_) @@ -147,7 +186,7 @@ Chunk ParquetMetadataInputFormat::read() if (done) return res; - auto metadata = getFileMetadata(*in, format_settings, is_stopped); + auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache); const auto & header = getPort().getHeader(); auto names_and_types = getHeaderForParquetMetadata(); @@ -486,6 +525,12 @@ void ParquetMetadataInputFormat::resetParser() done = false; } +void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_) +{ + metadata_cache.key = key_; + metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache]; +} + ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_) : ISchemaReader(in_) { diff --git a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h index 5d2d89898596..5cd3f4173bf5 100644 --- a/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetMetadataInputFormat.h @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat void resetParser() override; + void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override; + + struct Cache + { + String key; + bool use_cache = false; + }; + private: Chunk read() override; @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat const FormatSettings format_settings; bool done = false; std::atomic is_stopped{0}; + + Cache metadata_cache; }; class ParquetMetadataSchemaReader : public ISchemaReader diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b01566d00ca3..669173d36e3d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -415,7 +415,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade input_format->needOnlyCount(); if (!object_info->getPath().empty()) - input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); + input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag); builder.init(Pipe(input_format)); diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference index 51fdf048b8ac..f5c1b1de44a4 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.reference @@ -1,3 +1,5 @@ 10 10 10 +10 +10 diff --git a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql index 6882acac2ae7..6153ad30b332 100644 --- a/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql +++ b/tests/queries/0_stateless/03299_parquet_object_storage_metadata_cache.sql @@ -16,6 +16,10 @@ SELECT COUNT(*) FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet) SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache'; +SELECT COUNT(*) +FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata) +SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_format_metadata_cache'; + SYSTEM FLUSH LOGS; SELECT ProfileEvents['ParquetMetaDataCacheHits'] @@ -25,4 +29,11 @@ AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1; +SELECT ProfileEvents['ParquetMetaDataCacheHits'] +FROM system.query_log +where log_comment = 'test_03262_parquet_metadata_format_metadata_cache' +AND type = 'QueryFinish' +ORDER BY event_time desc +LIMIT 1; + DROP TABLE t_parquet_03262;