feat(transport): Dynamic load balancing#341
Conversation
Attempt to add a feature to Tonic so client side load balancing can be done over a group of endpoints that change dynamically over time. An external service which is responsible for discovering new endpoints and monitoring their health could make a decision to add/remove an endpoint from the load balancing group in Tonic client. modified: examples/Cargo.toml new file: examples/src/load_balance_with_discovery/client.rs new file: examples/src/load_balance_with_discovery/server.rs modified: tonic/src/transport/channel/endpoint.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs
examples/Cargo.toml
Outdated
| futures = { version = "0.3", default-features = false, features = ["alloc"] } | ||
| async-stream = "0.2" | ||
| tower = "0.3" | ||
| tower = { version="0.3"} |
There was a problem hiding this comment.
| tower = { version="0.3"} | |
| tower = "0.3" |
|
This seems like an interesting start, I am a bit cautious about adding more complex types instead of just exposing the ability to pass in your own discover? I'd rather expose some sort of discover or channel hook than a endpoint manager. What do you think? |
|
I agree that passing discover would be cleaner/better. For time being I just wanted to keep it as close to existing implementation. I looked into passing discover directly as there already is a method balance in channel::mod.rs that does exactly that. If we wanted to expose Discover directly we would need to make Connection public or seek some other solution which converts from Endpoint to Connection. Ultimately, I am not entirely sure how to solve this conundrum :) |
|
@dawid-nowak could we have a |
|
I think we are in agreement that ideally that would be the case. Expose Discover that produces the stream of Change events. Is that correct ? |
|
@dawid-nowak yeah, so I think instead of exposing a This way we can implement discover ourselves by taking the stream of the channel change events and mapping them to our internal connection type. So internally we would implement a new discover type similar to Hopefully that makes a bit of sense, I am totally happy to sketch some thing out a bit more :) |
|
yeah, that makes sense, |
|
basically I think we can do something similar to this https://docs.rs/tonic/0.2.1/tonic/transport/channel/struct.Channel.html#method.balance_list. Where we have: fn balance_channel::<K>() -> (Sender<Change<K, Endpoint>>, Self)This then would allow the user to: let (tx, channel) = Channel::balance_channel();
tx.send(("some_key", some_endpoint)).await.unwrap();
channel.call(...).await;We then implement a new discover type that is similar to |
Take 2 after code review. Changing main interface to take a stream of tower:Change(K,Endpoint) which will get converted to Change:(K,Connection) modified: examples/src/load_balance_with_discovery/client.rs modified: tonic/src/transport/channel/endpoint.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs
|
Take 2. I think we are getting closer. |
* Converting balance_list to use balance_channel internally. modified: examples/Cargo.toml modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs modified: tonic/src/transport/service/mod.rs
…th_dynamic_discovery Conflicts: tonic/src/transport/service/discover.rs
* keeping fmt happy Changes to be committed: modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
LucioFranco
left a comment
There was a problem hiding this comment.
Looking great! I suggested a few changes, let me know if you have any questions :)
Changes to be committed: modified: examples/src/load_balance_with_discovery/client.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
* Removing Unpin for K Changes to be committed: modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
LucioFranco
left a comment
There was a problem hiding this comment.
looks great! left a few comments, I think we are almost there!
examples/Cargo.toml
Outdated
| path = "src/load_balance/server.rs" | ||
|
|
||
| [[bin]] | ||
| name = "load-balance-client-discovery" |
There was a problem hiding this comment.
What do you think about naming this dynamic-load-balance? Might be a bit more clear
tonic/src/transport/channel/mod.rs
Outdated
| Request, Response, | ||
| }; | ||
| use hyper::client::connect::Connection as HyperConnection; | ||
| use std::hash::Hash; |
There was a problem hiding this comment.
minor nit: can we move this import into the one below?
tonic/src/transport/channel/mod.rs
Outdated
| .unwrap_or(DEFAULT_BUFFER_SIZE); | ||
| let (channel, mut tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); | ||
| list.for_each(|endpoint| { | ||
| let _res = tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)); |
There was a problem hiding this comment.
probably should just unwrap here if there is an error we want to know because that would mean there is a bug in this code!
tonic/src/transport/channel/mod.rs
Outdated
| /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. | ||
| pub fn balance_channel<K>( | ||
| capacity: usize, | ||
| ) -> (Self, tokio::sync::mpsc::Sender<Change<K, Endpoint>>) |
There was a problem hiding this comment.
might want to just import this type, its making this signature a bit long 😄
| use super::super::service; | ||
| use super::connection::Connection; | ||
| use crate::transport::Endpoint; | ||
| use std::hash::Hash; |
| Option<Pin<Box<dyn Future<Output = Result<Connection, crate::Error>> + Send + 'static>>>, | ||
| i: usize, | ||
| pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> { | ||
| changes: tokio::sync::mpsc::Receiver<Change<K, Endpoint>>, |
There was a problem hiding this comment.
same here can we import this type?
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| let e1 = Endpoint::from_static("http://[::1]:50051").timeout(std::time::Duration::from_secs(1)); |
There was a problem hiding this comment.
just me playing around, removed
modified: examples/Cargo.toml modified: examples/README.md renamed: examples/src/load_balance_with_discovery/client.rs -> examples/src/dynamic_load_balance/client.rs renamed: examples/src/load_balance_with_discovery/server.rs -> examples/src/dynamic_load_balance/server.rs modified: tonic/src/transport/channel/mod.rs modified: tonic/src/transport/service/discover.rs
|
🤘 |
* examples: update to `tracing` 0.1.14, use `#[instrument]` Now that `tracing-attributes`'s `#[instrument]` macro plays nicely with `async-trait`, we can update the tracing example to use `instrument`. This lets us simplify the events emitted in the example. Signed-off-by: Eliza Weisman <eliza@buoyant.io> * feat(transport): Dynamic load balancing (#341) * Fix typo (#356) Co-authored-by: Dawid Nowak <nowakd@gmail.com> Co-authored-by: Paulo Duarte <paulo@duarteweb.com>
Attempt to add a feature to Tonic so client side load balancing can be done over a
group of endpoints that change dynamically over time.
Motivation
An external service which is responsible for discovering new endpoints
and monitoring their health could make a decision to add/remove an
endpoint from the load balancing group in Tonic client. At the moment, the tonic only allows to load balance across endpoints from a list.
Solution
The solution is to expose a new method on Channel interface called balance_with_manager which will take a trait EventManager defined in Transport. EventManager will hold a lists of endpoint which need to be added/removed from balance service in tower.
The approach is very similar to existing Channel::balance_list.
##Comments
Comments on how to improve it are welcome :)