Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ where
};

query_router.update_pool_settings(&pool.settings);
query_router.set_default_role();

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
Expand Down
5 changes: 5 additions & 0 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,11 @@ impl QueryRouter {
self.active_shard
}

/// Set active_role as the default_role specified in the pool.
pub fn set_default_role(&mut self) {
self.active_role = self.pool_settings.default_role;
}

/// Get the current desired server role we should be talking to.
pub fn role(&self) -> Option<Role> {
self.active_role
Expand Down
36 changes: 35 additions & 1 deletion tests/ruby/load_balancing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,41 @@
end
end
end

context "when all replicas are down " do
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) }

it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count = 0
number_of_replicas = processes[:replicas].length

# Take down all replicas
processes[:replicas].each(&:take_down)

(number_of_replicas + 1).times do |n|
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end

expect(failed_count).to eq(number_of_replicas + 1)
failed_count = 0

# Ban_time is configured to 60 so this reset will only work
# if the replicas are unbanned automatically
processes[:replicas].each(&:reset)

number_of_replicas.times do
conn.async_exec("SELECT 1 + 2")
rescue
conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
failed_count += 1
end
expect(failed_count).to eq(0)
end
end
end

describe "Least Outstanding Queries Load Balancing" do
Expand Down Expand Up @@ -161,4 +196,3 @@
end
end
end