Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5924,6 +5924,15 @@ Allow to create database with Engine=MaterializedPostgreSQL(...).
/** Experimental feature for moving data between shards. */ \
DECLARE(Bool, allow_experimental_query_deduplication, false, R"(
Experimental data deduplication for SELECT queries based on part UUIDs
)", EXPERIMENTAL) \
DECLARE(String, object_storage_cluster, "", R"(
Cluster to make distributed requests to object storages with alternative syntax.
)", EXPERIMENTAL) \
DECLARE(UInt64, object_storage_max_nodes, 0, R"(
Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
Possible values:
- Positive integer.
- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
\
/* ####################################################### */ \
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"http_response_headers", "", "", "New setting."},
{"parallel_replicas_index_analysis_only_on_coordinator", true, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"}, // enabling it was moved to 24.10
{"least_greatest_legacy_null_behavior", true, false, "New setting"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
}
},
{"24.11",
Expand Down
16 changes: 12 additions & 4 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
, configuration{configuration_}
, object_storage(object_storage_)
, cluster_name_in_settings(false)
{
ColumnsDescription columns{columns_};
std::string sample_path;
Expand Down Expand Up @@ -105,10 +106,17 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
configuration->getEngineName());
}

ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
args.insert(args.begin(), cluster_name_arg);
if (cluster_name_in_settings)
{
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
}
else
{
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
args.insert(args.begin(), cluster_name_arg);
}
}

RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class StorageObjectStorageCluster : public IStorageCluster

String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);

void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; }

private:
void updateQueryToSendIfNeeded(
ASTPtr & query,
Expand All @@ -39,6 +41,7 @@ class StorageObjectStorageCluster : public IStorageCluster
const StorageObjectStorage::ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
NamesAndTypesList virtual_columns;
bool cluster_name_in_settings;
};

}
97 changes: 0 additions & 97 deletions src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
factory.registerFunction<TableFunctionObjectStorage<S3Definition, StorageS3Configuration>>(
{
.documentation =
{
.description=R"(The table function can be used to read the data stored on AWS S3.)",
.examples{{"s3", "SELECT * FROM s3(url, access_key_id, secret_access_key)", ""}
},
.categories{"DataLake"}},
.allow_readonly = false
});

factory.registerFunction<TableFunctionObjectStorage<GCSDefinition, StorageS3Configuration>>(
{
.documentation =
Expand Down Expand Up @@ -173,38 +162,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
.allow_readonly = false
});
#endif

#if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionObjectStorage<AzureDefinition, StorageAzureConfiguration>>(
{
.documentation =
{
.description=R"(The table function can be used to read the data stored on Azure Blob Storage.)",
.examples{
{
"azureBlobStorage",
"SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
"[account_name, account_key, format, compression, structure])", ""
}}
},
.allow_readonly = false
});
#endif
#if USE_HDFS
factory.registerFunction<TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfiguration>>(
{
.documentation =
{
.description=R"(The table function can be used to read the data stored on HDFS virtual filesystem.)",
.examples{
{
"hdfs",
"SELECT * FROM hdfs(url, format, compression, structure])", ""
}}
},
.allow_readonly = false
});
#endif
}

#if USE_AZURE_BLOB_STORAGE
Expand Down Expand Up @@ -256,29 +213,6 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});

#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_HDFS
factory.registerFunction<TableFunctionIcebergHDFS>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)",
.examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
Expand All @@ -290,42 +224,11 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
#endif


#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)",
.examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif

void registerTableFunctionHudi(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudi>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store.)",
.examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}

#endif

void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLake(factory);
#endif
registerTableFunctionHudi(factory);
#endif
}
}
1 change: 1 addition & 0 deletions src/TableFunctions/TableFunctionObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration>::execute
auto configuration = Base::getConfiguration();

ColumnsDescription columns;

if (configuration->structure != "auto")
columns = parseColumnsListFromString(configuration->structure, context);
else if (!Base::structure_hint.empty())
Expand Down
2 changes: 0 additions & 2 deletions src/TableFunctions/TableFunctionObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ namespace DB

class Context;

class StorageS3Settings;
class StorageAzureBlobSettings;
class StorageS3Configuration;
class StorageAzureConfiguration;

Expand Down
Loading
Loading