Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6111,8 +6111,12 @@ Possible values:
)", EXPERIMENTAL) \
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\

/* ####################################################### */ \
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// Altinity Antalya modifications atop of 25.2
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
{"iceberg_timestamp_ms", 0, 0, "New setting."},
{"iceberg_snapshot_id", 0, 0, "New setting."},
Expand Down
6 changes: 6 additions & 0 deletions src/IO/ReadBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
else
{
LOG_TEST(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a case where we request whole object?

Copy link
Author

@ianton-ru ianton-ru May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg metadata for example

:) CREATE DATABASE datalake ENGINE = Iceberg('http://rest:8181/v1', 'minio', 'minio123') SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/warehouse', warehouse = 'iceberg'
:) SELECT * FROM datalake.`iceberg.bids`
Query id: d1bb9862-c077-403f-9843-94fd28173760

   ┌───────────────────datetime─┬─symbol─┬────bid─┬────ask─┐
1. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.23 │ 195.45 │
2. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.25 │  198.5 │
3. │ 2019-08-07 08:35:00.000000 │ AAPL   │ 195.23 │ 195.28 │
4. │ 2019-08-07 08:35:00.000000 │ AAPL   │ 195.22 │ 195.28 │
5. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.23 │ 195.45 │
6. │ 2019-08-09 08:35:00.000000 │ AAPL   │ 198.25 │  198.5 │
   └────────────────────────────┴────────┴────────┴────────┘

:) select ProfileEvents['S3GetObject'] from system.query_log where type='QueryFinish' and query_id='d1bb9862-c077-403f-9843-94fd28173760'

   ┌─arrayElement⋯GetObject')─┐
1. │                        8 │
   └──────────────────────────┘
...

grep "Read S3 object" /var/log/clickhouse-server/clickhouse-server.log

2025.05.07 22:38:10.414791 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/00003-ad725ef4-c28e-4ed4-aa4b-2e2aae0716d4.metadata.json, Version: Latest
2025.05.07 22:38:10.416600 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/snap-182060351258856937-0-ff436521-29e9-4437-be5b-eb60f209baa9.avro, Version: Latest
2025.05.07 22:38:10.418360 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/ff436521-29e9-4437-be5b-eb60f209baa9-m0.avro, Version: Latest
2025.05.07 22:38:10.420138 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/6f3e6993-47c9-4556-b70c-c6c48d2ced6f-m0.avro, Version: Latest
2025.05.07 22:38:10.421658 [ 80 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/metadata/f0de1c43-e367-4e3d-8c9d-4076d8fb0cbd-m0.avro, Version: Latest
2025.05.07 22:38:10.426911 [ 767 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/data/datetime_day=2019-08-09/00000-0-ff436521-29e9-4437-be5b-eb60f209baa9.parquet, Version: Latest, Range: 0-1643
2025.05.07 22:38:10.427003 [ 762 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/data/datetime_day=2019-08-09/00000-0-6f3e6993-47c9-4556-b70c-c6c48d2ced6f.parquet, Version: Latest, Range: 0-1643
2025.05.07 22:38:10.427050 [ 770 ] {d1bb9862-c077-403f-9843-94fd28173760} <Test> ReadBufferFromS3: Read S3 object. Bucket: warehouse, Key: data/data/datetime_day=2019-08-07/00000-0-f0de1c43-e367-4e3d-8c9d-4076d8fb0cbd.parquet, Version: Latest, Range: 0-1635

I added this for consistency, all requests count in ProfileEvents['S3GetObject'], but in logs only part of requests.

log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
bucket, key, version_id.empty() ? "Latest" : version_id);
}

ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
Expand Down
82 changes: 79 additions & 3 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include <Storages/IStorageCluster.h>

#include <pcg_random.hpp>
#include <Common/randomSeed.h>

#include <Common/Exception.h>
#include <Core/Settings.h>
#include <Core/QueryProcessingStage.h>
Expand All @@ -13,6 +16,7 @@
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/queryToString.h>
#include <Planner/Utils.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <QueryPipeline/narrowPipe.h>
Expand All @@ -22,6 +26,9 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>

#include <memory>
#include <string>
Expand All @@ -36,6 +43,7 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -93,15 +101,16 @@ void IStorageCluster::read(

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Block sample_block;
ASTPtr query_to_send = query_info.query;

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
Expand All @@ -114,6 +123,17 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
return;
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -141,6 +161,62 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);

static pcg64 rng(randomSeed());
size_t shard_num = rng() % host_addresses.size();
auto shard_addresses = host_addresses[shard_num];
/// After getClusterImpl each shard must have exactly 1 replica
if (shard_addresses.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
auto host_name = shard_addresses[0].toString();

LOG_INFO(log, "Choose remote initiator '{}'", host_name);

bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
std::string remote_function_name = secure ? "remoteSecure" : "remote";

/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");

auto query_settings = select_query->settings();
if (query_settings)
{
auto & settings_ast = query_settings->as<ASTSetQuery &>();
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
}
}

ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);

table_expression->table_function = remote_query;

auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);

auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);

return RemoteCallVariables{storage, new_context};
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
Expand Down
13 changes: 12 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,20 @@ class IStorageCluster : public IStorage
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}

struct RemoteCallVariables
{
StoragePtr storage;
ContextPtr context;
};

RemoteCallVariables convertToRemote(
ClusterPtr cluster,
ContextPtr context,
const std::string & cluster_name_from_settings,
ASTPtr query_to_send);

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
Expand Down
17 changes: 16 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return std::nullopt;
}

std::optional<String> tryGetSamplePathFromMetadata() const override
{
if (!current_metadata)
return std::nullopt;
auto data_files = current_metadata->getDataFiles();
if (!data_files.empty())
return data_files[0];
return std::nullopt;
}

std::optional<size_t> totalRows() override
{
if (!current_metadata)
Expand Down Expand Up @@ -468,7 +478,12 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
createDynamicStorage(type);
}

virtual void assertInitialized() const override { return getImpl().assertInitialized(); }
void assertInitialized() const override { return getImpl().assertInitialized(); }

std::optional<String> tryGetSamplePathFromMetadata() const override
{
return getImpl().tryGetSamplePathFromMetadata();
}

private:
inline StorageObjectStorage::Configuration & getImpl() const
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata

DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override { return data_files; }

NamesAndTypesList getTableSchema() const override { return schema; }

DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
return table_snapshot->update();
}

Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
{
throwNotImplemented("getDataFiles()");
}

ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
const ActionsDAG * filter_dag,
FileProgressCallback callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata

bool update(const ContextPtr & context) override;

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override;

NamesAndTypesList getReadSchema() const override;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
{
}

Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
Strings HudiMetadata::getDataFiles() const
{
if (data_files.empty())
data_files = getDataFilesImpl();
return data_files;
}

ObjectIterator HudiMetadata::iterate(
const ActionsDAG * filter_dag,
const ActionsDAG * /* filter_dag */,
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFiles(), object_storage, callback);
}

}
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/HudiMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext

HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);

Strings getDataFiles() const override;

NamesAndTypesList getTableSchema() const override { return {}; }

bool operator ==(const IDataLakeMetadata & other) const override
Expand Down Expand Up @@ -49,7 +51,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext
mutable Strings data_files;

Strings getDataFilesImpl() const;
Strings getDataFiles(const ActionsDAG * filter_dag) const;
};

}
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class IDataLakeMetadata : boost::noncopyable

virtual bool operator==(const IDataLakeMetadata & other) const = 0;

/// List all data files.
/// For better parallelization, iterate() method should be used.
virtual Strings getDataFiles() const = 0;
/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
return create_fn();
}

Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
if (!relevant_snapshot)
return {};
Expand Down Expand Up @@ -716,7 +716,7 @@ ObjectIterator IcebergMetadata::iterate(
FileProgressCallback callback,
size_t /* list_batch_size */) const
{
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
return createKeysIterator(getDataFilesImpl(filter_dag), object_storage, callback);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
const Poco::JSON::Object::Ptr & metadata_object,
IcebergMetadataFilesCachePtr cache_ptr);

/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed
/// without changing metadata file). Drops on every snapshot update.
Strings getDataFiles() const override { return getDataFilesImpl(nullptr); }

/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override
{
Expand Down Expand Up @@ -118,7 +123,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext

void updateState(const ContextPtr & local_context, bool metadata_file_changed);

Strings getDataFiles(const ActionsDAG * filter_dag) const;
Strings getDataFilesImpl(const ActionsDAG * filter_dag) const;

void updateSnapshot();

Expand Down
12 changes: 8 additions & 4 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ StorageObjectStorage::StorageObjectStorage(
bool distributed_processing_,
ASTPtr partition_by_,
bool is_table_function_,
bool lazy_init)
bool lazy_init,
std::optional<std::string> sample_path_)
: IStorage(table_id_)
, configuration(configuration_)
, object_storage(object_storage_)
Expand Down Expand Up @@ -130,7 +131,7 @@ StorageObjectStorage::StorageObjectStorage(
if (!do_lazy_init)
do_init();

std::string sample_path;
std::string sample_path = sample_path_.value_or("");
ColumnsDescription columns{columns_};
resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context);
configuration->check(context);
Expand Down Expand Up @@ -352,6 +353,11 @@ std::optional<ColumnsDescription> StorageObjectStorage::Configuration::tryGetTab
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration");
}

std::optional<String> StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration");
}

void StorageObjectStorage::read(
QueryPlan & query_plan,
const Names & column_names,
Expand Down Expand Up @@ -522,9 +528,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(

auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure)
{
return table_structure.value();
}
}

ObjectInfos read_keys;
Expand Down
Loading
Loading