-
Notifications
You must be signed in to change notification settings - Fork 13
Setting object_storage_max_nodes #677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0bfcd65
5b70ae4
0db6d89
08a0ebd
10cbf30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -717,9 +717,9 @@ void Cluster::initMisc() | |
| } | ||
| } | ||
|
|
||
| std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const | ||
| std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const | ||
| { | ||
| return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)}; | ||
| return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)}; | ||
| } | ||
|
|
||
| std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const | ||
|
|
@@ -768,7 +768,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings & | |
|
|
||
| } | ||
|
|
||
| Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard) | ||
| Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) | ||
| { | ||
| if (from.addresses_with_failover.empty()) | ||
| throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty"); | ||
|
|
@@ -811,9 +811,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti | |
| info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings[Setting::load_balancing]); | ||
| info.per_replica_pools = {std::move(pool)}; | ||
|
|
||
| addresses_with_failover.emplace_back(Addresses{address}); | ||
|
|
||
| slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size()); | ||
| shards_info.emplace_back(std::move(info)); | ||
| } | ||
| }; | ||
|
|
@@ -835,6 +832,23 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti | |
| secret = from.secret; | ||
| name = from.name; | ||
|
|
||
| if (max_hosts > 0 && shards_info.size() > max_hosts) | ||
| { | ||
| pcg64_fast gen{randomSeed()}; | ||
| std::shuffle(shards_info.begin(), shards_info.end(), gen); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reasoning behind the shuffle?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without shuffle nodes from list begin will be chosen. First nodes will be loaded, last nodes will be empty. |
||
| shards_info.resize(max_hosts); | ||
|
|
||
| shard_num = 0; | ||
| for (auto & shard_info : shards_info) | ||
| shard_info.shard_num = ++shard_num; | ||
| } | ||
|
|
||
| for (size_t i = 0; i < shards_info.size(); ++i) | ||
| { | ||
| addresses_with_failover.emplace_back(shards_info[i].local_addresses); | ||
| slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); | ||
| } | ||
|
|
||
| initMisc(); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,7 +72,7 @@ def started_cluster(): | |
| "s0_0_0", | ||
| main_configs=["configs/cluster.xml", "configs/named_collections.xml"], | ||
| user_configs=["configs/users.xml"], | ||
| macros={"replica": "node1", "shard": "shard1"}, | ||
| macros={"replica": "replica1", "shard": "shard1"}, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change this?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to unification with other two nodes below, they have |
||
| with_minio=True, | ||
| with_zookeeper=True, | ||
| ) | ||
|
|
@@ -916,5 +916,167 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): | |
| ) | ||
| cluster_optimized_traffic = int(cluster_optimized_traffic) | ||
| assert cluster_optimized_traffic == optimized_traffic | ||
|
|
||
| node.query("SET allow_experimental_analyzer = DEFAULT") | ||
|
|
||
|
|
||
| def test_distributed_s3_table_engine(started_cluster): | ||
| node = started_cluster.instances["s0_0_0"] | ||
|
|
||
| resp_def = node.query( | ||
| """ | ||
| SELECT * from s3Cluster( | ||
| 'cluster_simple', | ||
| 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', | ||
| 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) | ||
| """ | ||
| ) | ||
|
|
||
| node.query("DROP TABLE IF EXISTS single_node"); | ||
| node.query( | ||
| """ | ||
| CREATE TABLE single_node | ||
| (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) | ||
| ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') | ||
| """ | ||
| ) | ||
| query_id_engine_single_node = str(uuid.uuid4()) | ||
| resp_engine_single_node = node.query( | ||
| """ | ||
| SELECT * FROM single_node ORDER BY (name, value, polygon) | ||
| """, | ||
| query_id = query_id_engine_single_node | ||
| ) | ||
| assert resp_def == resp_engine_single_node | ||
|
|
||
| node.query("DROP TABLE IF EXISTS distributed"); | ||
| node.query( | ||
| """ | ||
| CREATE TABLE distributed | ||
| (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) | ||
| ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') | ||
| SETTINGS object_storage_cluster='cluster_simple' | ||
| """ | ||
| ) | ||
| query_id_engine_distributed = str(uuid.uuid4()) | ||
| resp_engine_distributed = node.query( | ||
| """ | ||
| SELECT * FROM distributed ORDER BY (name, value, polygon) | ||
| """, | ||
| query_id = query_id_engine_distributed | ||
| ) | ||
| assert resp_def == resp_engine_distributed | ||
|
|
||
| node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") | ||
|
|
||
| hosts_engine_single_node = node.query( | ||
| f""" | ||
| SELECT uniq(hostname) | ||
| FROM clusterAllReplicas('cluster_simple', system.query_log) | ||
| WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' | ||
| """ | ||
| ) | ||
| assert int(hosts_engine_single_node) == 1 | ||
| hosts_engine_distributed = node.query( | ||
| f""" | ||
| SELECT uniq(hostname) | ||
| FROM clusterAllReplicas('cluster_simple', system.query_log) | ||
| WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' | ||
| """ | ||
| ) | ||
| assert int(hosts_engine_distributed) == 3 | ||
|
|
||
|
|
||
| def test_cluster_hosts_limit(started_cluster): | ||
| node = started_cluster.instances["s0_0_0"] | ||
|
|
||
| query_id_def = str(uuid.uuid4()) | ||
| resp_def = node.query( | ||
| """ | ||
| SELECT * from s3Cluster( | ||
| 'cluster_simple', | ||
| 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', | ||
| 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) | ||
| """, | ||
| query_id = query_id_def | ||
| ) | ||
|
|
||
| # object_storage_max_nodes is greater than number of hosts in cluster | ||
| query_id_4_hosts = str(uuid.uuid4()) | ||
| resp_4_hosts = node.query( | ||
| """ | ||
| SELECT * from s3Cluster( | ||
| 'cluster_simple', | ||
| 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', | ||
| 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) | ||
| SETTINGS object_storage_max_nodes=4 | ||
| """, | ||
| query_id = query_id_4_hosts | ||
| ) | ||
| assert resp_def == resp_4_hosts | ||
|
|
||
| # object_storage_max_nodes is equal number of hosts in cluster | ||
| query_id_3_hosts = str(uuid.uuid4()) | ||
| resp_3_hosts = node.query( | ||
| """ | ||
| SELECT * from s3Cluster( | ||
| 'cluster_simple', | ||
| 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', | ||
| 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) | ||
| SETTINGS object_storage_max_nodes=3 | ||
| """, | ||
| query_id = query_id_3_hosts | ||
| ) | ||
| assert resp_def == resp_3_hosts | ||
|
|
||
| # object_storage_max_nodes is less than number of hosts in cluster | ||
| query_id_2_hosts = str(uuid.uuid4()) | ||
| resp_2_hosts = node.query( | ||
| """ | ||
| SELECT * from s3Cluster( | ||
| 'cluster_simple', | ||
| 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', | ||
| 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) | ||
| SETTINGS object_storage_max_nodes=2 | ||
| """, | ||
| query_id = query_id_2_hosts | ||
| ) | ||
| assert resp_def == resp_2_hosts | ||
|
|
||
| node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") | ||
|
|
||
| hosts_def = node.query( | ||
| f""" | ||
| SELECT uniq(hostname) | ||
| FROM clusterAllReplicas('cluster_simple', system.query_log) | ||
| WHERE type='QueryFinish' AND initial_query_id='{query_id_def}' AND query_id!='{query_id_def}' | ||
| """ | ||
| ) | ||
| assert int(hosts_def) == 3 | ||
|
|
||
| hosts_4 = node.query( | ||
| f""" | ||
| SELECT uniq(hostname) | ||
| FROM clusterAllReplicas('cluster_simple', system.query_log) | ||
| WHERE type='QueryFinish' AND initial_query_id='{query_id_4_hosts}' AND query_id!='{query_id_4_hosts}' | ||
| """ | ||
| ) | ||
| assert int(hosts_4) == 3 | ||
|
|
||
| hosts_3 = node.query( | ||
| f""" | ||
| SELECT uniq(hostname) | ||
| FROM clusterAllReplicas('cluster_simple', system.query_log) | ||
| WHERE type='QueryFinish' AND initial_query_id='{query_id_3_hosts}' AND query_id!='{query_id_3_hosts}' | ||
| """ | ||
| ) | ||
| assert int(hosts_3) == 3 | ||
|
|
||
| hosts_2 = node.query( | ||
| f""" | ||
| SELECT uniq(hostname) | ||
| FROM clusterAllReplicas('cluster_simple', system.query_log) | ||
| WHERE type='QueryFinish' AND initial_query_id='{query_id_2_hosts}' AND query_id!='{query_id_2_hosts}' | ||
| """ | ||
| ) | ||
| assert int(hosts_2) == 2 | ||
Uh oh!
There was an error while loading. Please reload this page.