Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/Interpreters/Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,9 +717,9 @@ void Cluster::initMisc()
}
}

std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)};
}

std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
Expand Down Expand Up @@ -768,7 +768,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings &

}

Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
Expand Down Expand Up @@ -811,9 +811,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings[Setting::load_balancing]);
info.per_replica_pools = {std::move(pool)};

addresses_with_failover.emplace_back(Addresses{address});

slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
shards_info.emplace_back(std::move(info));
}
};
Expand All @@ -835,6 +832,23 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
secret = from.secret;
name = from.name;

if (max_hosts > 0 && shards_info.size() > max_hosts)
{
pcg64_fast gen{randomSeed()};
std::shuffle(shards_info.begin(), shards_info.end(), gen);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind the shuffle?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without shuffle nodes from list begin will be chosen. First nodes will be loaded, last nodes will be empty.
With shuffle loading will be more or less equal for every node independent of place in list.

shards_info.resize(max_hosts);

shard_num = 0;
for (auto & shard_info : shards_info)
shard_info.shard_num = ++shard_num;
}

for (size_t i = 0; i < shards_info.size(); ++i)
{
addresses_with_failover.emplace_back(shards_info[i].local_addresses);
slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);
}

initMisc();
}

Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class Cluster
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;

/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const;

/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
Expand All @@ -292,7 +292,7 @@ class Cluster

/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts);

void addShard(
const Settings & settings,
Expand Down
1 change: 0 additions & 1 deletion src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include <Parsers/ASTInsertQuery.h>
#include <Server/TCPServer.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/SocketAddress.h>
Expand Down
7 changes: 4 additions & 3 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace Setting
extern const SettingsBool async_socket_for_remote;
extern const SettingsBool skip_unavailable_shards;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
}

namespace ErrorCodes
Expand Down Expand Up @@ -103,7 +104,7 @@ void IStorageCluster::read(
storage_snapshot->check(column_names);

updateBeforeRead(context);
auto cluster = getClusterImpl(context, cluster_name_from_settings);
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

Expand Down Expand Up @@ -247,9 +248,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
return new_context;
}

ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_)
ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts)
{
return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef());
return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef(), /* max_replicas_from_shard */ 0, max_hosts);
}

}
3 changes: 2 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class IStorageCluster : public IStorage
bool async_insert) override;

ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }

/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
Expand Down Expand Up @@ -82,7 +83,7 @@ class IStorageCluster : public IStorage
}

private:
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_);
static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);

LoggerPtr log;
String cluster_name;
Expand Down
166 changes: 164 additions & 2 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def started_cluster():
"s0_0_0",
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
user_configs=["configs/users.xml"],
macros={"replica": "node1", "shard": "shard1"},
macros={"replica": "replica1", "shard": "shard1"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to unification with other two nodes below, they have "replica": ""replicaX" macro.

with_minio=True,
with_zookeeper=True,
)
Expand Down Expand Up @@ -916,5 +916,167 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer):
)
cluster_optimized_traffic = int(cluster_optimized_traffic)
assert cluster_optimized_traffic == optimized_traffic

node.query("SET allow_experimental_analyzer = DEFAULT")


def test_distributed_s3_table_engine(started_cluster):
node = started_cluster.instances["s0_0_0"]

resp_def = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
"""
)

node.query("DROP TABLE IF EXISTS single_node");
node.query(
"""
CREATE TABLE single_node
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
"""
)
query_id_engine_single_node = str(uuid.uuid4())
resp_engine_single_node = node.query(
"""
SELECT * FROM single_node ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_single_node
)
assert resp_def == resp_engine_single_node

node.query("DROP TABLE IF EXISTS distributed");
node.query(
"""
CREATE TABLE distributed
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
SETTINGS object_storage_cluster='cluster_simple'
"""
)
query_id_engine_distributed = str(uuid.uuid4())
resp_engine_distributed = node.query(
"""
SELECT * FROM distributed ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_distributed
)
assert resp_def == resp_engine_distributed

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")

hosts_engine_single_node = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
"""
)
assert int(hosts_engine_single_node) == 1
hosts_engine_distributed = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
"""
)
assert int(hosts_engine_distributed) == 3


def test_cluster_hosts_limit(started_cluster):
node = started_cluster.instances["s0_0_0"]

query_id_def = str(uuid.uuid4())
resp_def = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
""",
query_id = query_id_def
)

# object_storage_max_nodes is greater than number of hosts in cluster
query_id_4_hosts = str(uuid.uuid4())
resp_4_hosts = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
SETTINGS object_storage_max_nodes=4
""",
query_id = query_id_4_hosts
)
assert resp_def == resp_4_hosts

# object_storage_max_nodes is equal number of hosts in cluster
query_id_3_hosts = str(uuid.uuid4())
resp_3_hosts = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
SETTINGS object_storage_max_nodes=3
""",
query_id = query_id_3_hosts
)
assert resp_def == resp_3_hosts

# object_storage_max_nodes is less than number of hosts in cluster
query_id_2_hosts = str(uuid.uuid4())
resp_2_hosts = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
SETTINGS object_storage_max_nodes=2
""",
query_id = query_id_2_hosts
)
assert resp_def == resp_2_hosts

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")

hosts_def = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_def}' AND query_id!='{query_id_def}'
"""
)
assert int(hosts_def) == 3

hosts_4 = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_4_hosts}' AND query_id!='{query_id_4_hosts}'
"""
)
assert int(hosts_4) == 3

hosts_3 = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_3_hosts}' AND query_id!='{query_id_3_hosts}'
"""
)
assert int(hosts_3) == 3

hosts_2 = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_2_hosts}' AND query_id!='{query_id_2_hosts}'
"""
)
assert int(hosts_2) == 2
Loading