Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"least_greatest_legacy_null_behavior", true, false, "New setting"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"input_format_parquet_use_metadata_cache", false, false, "New setting"},
}
},
{"24.11",
Expand Down
15 changes: 9 additions & 6 deletions src/Databases/Iceberg/DatabaseIceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>

#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
Expand All @@ -37,6 +38,7 @@ namespace DatabaseIcebergSetting
extern const DatabaseIcebergSettingsString storage_endpoint;
extern const DatabaseIcebergSettingsString oauth_server_uri;
extern const DatabaseIcebergSettingsBool vended_credentials;
extern const DatabaseIcebergSettingsString object_storage_cluster;
}
namespace Setting
{
Expand Down Expand Up @@ -235,19 +237,20 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings));

return std::make_shared<StorageObjectStorage>(
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;

return std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
context_,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* comment */"",
getFormatSettings(context_),
LoadingStrictnessLevel::CREATE,
/* distributed_processing */false,
/* partition_by */nullptr,
/* lazy_init */true);
/* format_settings */ getFormatSettings(context_),
/* mode */ LoadingStrictnessLevel::CREATE,
/* partition_by */nullptr);
}

DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
Expand Down
1 change: 1 addition & 0 deletions src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace ErrorCodes
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
Expand Down
2 changes: 1 addition & 1 deletion src/Parsers/FunctionSecretArgumentsFinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class FunctionSecretArgumentsFinder
}
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") ||
(function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") ||
(function->name() == "gcs"))
(function->name() == "icebergS3") || (function->name() == "gcs"))
{
/// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ false);
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// Update storage metadata. Used in ALTER or initialization of Storage.
/// Metadata object is multiversion, so this method can be called without
/// any locks.
void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_)
virtual void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_)
{
metadata.set(std::make_unique<StorageInMemoryMetadata>(metadata_));
}

void setVirtuals(VirtualColumnsDescription virtuals_)
virtual void setVirtuals(VirtualColumnsDescription virtuals_)
{
virtuals.set(std::make_unique<VirtualColumnsDescription>(std::move(virtuals_)));
}
Expand Down
37 changes: 32 additions & 5 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards;
}

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}

IStorageCluster::IStorageCluster(
const String & cluster_name_,
const StorageID & table_id_,
Expand Down Expand Up @@ -73,13 +78,21 @@ void IStorageCluster::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/)
size_t max_block_size,
size_t num_streams)
{
auto cluster_name_ = getClusterName(context);

if (cluster_name_.empty())
{
readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return;
}

storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getCluster(context);
auto cluster = getClusterImpl(context, cluster_name_);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Expand Down Expand Up @@ -126,6 +139,20 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}

SinkToStoragePtr IStorageCluster::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert)
{
auto cluster_name_ = getClusterName(context);

if (cluster_name_.empty())
return writeFallBackToPure(query, metadata_snapshot, context, async_insert);

throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName());
}

void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createExtension(nullptr);
Expand Down Expand Up @@ -196,9 +223,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
return new_context;
}

ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_)
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef());
}

}
39 changes: 36 additions & 3 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ class IStorageCluster : public IStorage
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/) override;
size_t max_block_size,
size_t num_streams) override;

ClusterPtr getCluster(ContextPtr context) const;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;

ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;

Expand All @@ -43,11 +49,38 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }

const String & getOriginalClusterName() const { return cluster_name; }
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }

protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}

virtual void readFallBackToPure(
QueryPlan & /* query_plan */,
const Names & /* column_names */,
const StorageSnapshotPtr & /* storage_snapshot */,
SelectQueryInfo & /* query_info */,
ContextPtr /* context */,
QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */,
size_t /* num_streams */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName());
}

virtual SinkToStoragePtr writeFallBackToPure(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*context*/,
bool /*async_insert*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
}

private:
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_);

LoggerPtr log;
String cluster_name;
};
Expand Down
22 changes: 16 additions & 6 deletions src/Storages/ObjectStorage/Azure/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll

String connection_url;
String container_name;
std::optional<String> account_name;
std::optional<String> account_key;

if (collection.has("connection_string"))
connection_url = collection.get<String>("connection_string");
Expand Down Expand Up @@ -173,14 +171,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,

std::unordered_map<std::string_view, size_t> engine_args_to_idx;


String connection_url = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string/storage_account_url");
String container_name = checkAndGetLiteralArgument<String>(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");

std::optional<String> account_name;
std::optional<String> account_key;

auto is_format_arg = [] (const std::string & s) -> bool
{
return s == "auto" || FormatFactory::instance().getAllFormats().contains(Poco::toLower(s));
Expand Down Expand Up @@ -444,6 +438,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}

ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();

arguments->children.push_back(std::make_shared<ASTLiteral>(connection_params.endpoint.storage_account_url));
arguments->children.push_back(std::make_shared<ASTIdentifier>(connection_params.endpoint.container_name));
arguments->children.push_back(std::make_shared<ASTLiteral>(blob_path));
if (account_name && account_key)
{
arguments->children.push_back(std::make_shared<ASTLiteral>(*account_name));
arguments->children.push_back(std::make_shared<ASTLiteral>(*account_key));
}

return arguments;
}

}

#endif
4 changes: 4 additions & 0 deletions src/Storages/ObjectStorage/Azure/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,17 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

ASTPtr createArgsWithAccessData() const override;

protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;

std::string blob_path;
std::vector<String> blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
std::optional<std::string> account_name;
std::optional<std::string> account_key;
};

}
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
}

updated = true;
}

std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override
Expand Down Expand Up @@ -114,6 +116,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
private:
DataLakeMetadataPtr current_metadata;

bool updated = false;

ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/ObjectStorage/HDFS/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}

ASTPtr StorageHDFSConfiguration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();
arguments->children.push_back(std::make_shared<ASTLiteral>(url + path));
return arguments;
}

}

#endif
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/HDFS/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

ASTPtr createArgsWithAccessData() const override;

private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
Expand Down
30 changes: 27 additions & 3 deletions src/Storages/ObjectStorage/S3/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_

if (engine_args_to_idx.contains("format"))
{
format = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
auto format_ = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["format"]], "format");
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
if (format != "auto")
format = format;
if (format_ != "auto")
format = format_;
}

if (engine_args_to_idx.contains("structure"))
Expand Down Expand Up @@ -585,6 +585,30 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}

ASTPtr StorageS3Configuration::createArgsWithAccessData() const
{
auto arguments = std::make_shared<ASTExpressionList>();

arguments->children.push_back(std::make_shared<ASTLiteral>(url.uri_str));
if (auth_settings[S3AuthSetting::no_sign_request])
{
arguments->children.push_back(std::make_shared<ASTLiteral>("NOSIGN"));
}
else
{
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::access_key_id].value));
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::secret_access_key].value));
if (!auth_settings[S3AuthSetting::session_token].value.empty())
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::session_token].value));
if (format != "auto")
arguments->children.push_back(std::make_shared<ASTLiteral>(format));
if (!compression_method.empty())
arguments->children.push_back(std::make_shared<ASTLiteral>(compression_method));
}

return arguments;
}

}

#endif
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/S3/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;

ASTPtr createArgsWithAccessData() const override;

private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
Expand Down
Loading
Loading