diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 00c67cb8a37c..7366efa37642 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -717,9 +717,9 @@ void Cluster::initMisc() } } -std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const +std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const { - return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)}; + return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)}; } std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const @@ -768,7 +768,7 @@ void shuffleReplicas(std::vector & replicas, const Settings & } -Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard) +Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) { if (from.addresses_with_failover.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty"); @@ -811,9 +811,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings[Setting::load_balancing]); info.per_replica_pools = {std::move(pool)}; - addresses_with_failover.emplace_back(Addresses{address}); - - slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size()); shards_info.emplace_back(std::move(info)); } }; @@ -835,6 +832,23 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti secret = from.secret; name = from.name; + if (max_hosts > 0 && shards_info.size() > max_hosts) + { + pcg64_fast gen{randomSeed()}; + std::shuffle(shards_info.begin(), shards_info.end(), gen); + shards_info.resize(max_hosts); + + shard_num = 0; + for (auto & shard_info : shards_info) + shard_info.shard_num = ++shard_num; + } + + for (size_t i = 0; i < shards_info.size(); ++i) + { + addresses_with_failover.emplace_back(shards_info[i].local_addresses); + slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); + } + initMisc(); } diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index e1868a18ad4f..b581963b08dc 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -266,7 +266,7 @@ class Cluster std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; /// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards. - std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const; + std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const; /// Returns false if cluster configuration doesn't allow to use it for cross-replication. /// NOTE: true does not mean, that it's actually a cross-replication cluster. @@ -292,7 +292,7 @@ class Cluster /// For getClusterWithReplicasAsShards implementation struct ReplicasAsShardsTag {}; - Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard); + Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts); void addShard( const Settings & settings, diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 526acc140c9c..acf615f7704b 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index bbdcee5ac5d4..f965a63f4e10 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -35,6 +35,7 @@ namespace Setting extern const SettingsBool async_socket_for_remote; extern const SettingsBool skip_unavailable_shards; extern const SettingsNonZeroUInt64 max_parallel_replicas; + extern const SettingsUInt64 object_storage_max_nodes; } namespace ErrorCodes @@ -103,7 +104,7 @@ void IStorageCluster::read( storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings); + auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -247,9 +248,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings) return new_context; } -ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_) +ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts) { - return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef()); + return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef(), /* max_replicas_from_shard */ 0, max_hosts); } } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 89155e4041e5..f26381fa7699 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -39,6 +39,7 @@ class IStorageCluster : public IStorage bool async_insert) override; ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } + /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, @@ -82,7 +83,7 @@ class IStorageCluster : public IStorage } private: - static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_); + static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0); LoggerPtr log; String cluster_name; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 57a3a384c20c..9514f510ab49 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -72,7 +72,7 @@ def started_cluster(): "s0_0_0", main_configs=["configs/cluster.xml", "configs/named_collections.xml"], user_configs=["configs/users.xml"], - macros={"replica": "node1", "shard": "shard1"}, + macros={"replica": "replica1", "shard": "shard1"}, with_minio=True, with_zookeeper=True, ) @@ -916,5 +916,167 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): ) cluster_optimized_traffic = int(cluster_optimized_traffic) assert cluster_optimized_traffic == optimized_traffic - + node.query("SET allow_experimental_analyzer = DEFAULT") + + +def test_distributed_s3_table_engine(started_cluster): + node = started_cluster.instances["s0_0_0"] + + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """ + ) + + node.query("DROP TABLE IF EXISTS single_node"); + node.query( + """ + CREATE TABLE single_node + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + """ + ) + query_id_engine_single_node = str(uuid.uuid4()) + resp_engine_single_node = node.query( + """ + SELECT * FROM single_node ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_single_node + ) + assert resp_def == resp_engine_single_node + + node.query("DROP TABLE IF EXISTS distributed"); + node.query( + """ + CREATE TABLE distributed + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + SETTINGS object_storage_cluster='cluster_simple' + """ + ) + query_id_engine_distributed = str(uuid.uuid4()) + resp_engine_distributed = node.query( + """ + SELECT * FROM distributed ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_distributed + ) + assert resp_def == resp_engine_distributed + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' + """ + ) + assert int(hosts_engine_single_node) == 1 + hosts_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' + """ + ) + assert int(hosts_engine_distributed) == 3 + + +def test_cluster_hosts_limit(started_cluster): + node = started_cluster.instances["s0_0_0"] + + query_id_def = str(uuid.uuid4()) + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """, + query_id = query_id_def + ) + + # object_storage_max_nodes is greater than number of hosts in cluster + query_id_4_hosts = str(uuid.uuid4()) + resp_4_hosts = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_max_nodes=4 + """, + query_id = query_id_4_hosts + ) + assert resp_def == resp_4_hosts + + # object_storage_max_nodes is equal number of hosts in cluster + query_id_3_hosts = str(uuid.uuid4()) + resp_3_hosts = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_max_nodes=3 + """, + query_id = query_id_3_hosts + ) + assert resp_def == resp_3_hosts + + # object_storage_max_nodes is less than number of hosts in cluster + query_id_2_hosts = str(uuid.uuid4()) + resp_2_hosts = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + SETTINGS object_storage_max_nodes=2 + """, + query_id = query_id_2_hosts + ) + assert resp_def == resp_2_hosts + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_def = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_def}' AND query_id!='{query_id_def}' + """ + ) + assert int(hosts_def) == 3 + + hosts_4 = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_4_hosts}' AND query_id!='{query_id_4_hosts}' + """ + ) + assert int(hosts_4) == 3 + + hosts_3 = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_3_hosts}' AND query_id!='{query_id_3_hosts}' + """ + ) + assert int(hosts_3) == 3 + + hosts_2 = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_2_hosts}' AND query_id!='{query_id_2_hosts}' + """ + ) + assert int(hosts_2) == 2