Skip to content

Commit f1f8b72

Browse files
haphungwcoryan
andauthored
cleanup(pubsub)!: remove explicit nack (#4549)
Simplify the pubsub subscriber API by removing the explicit `nack` method and replacing it with an idiomatic `Drop` implementation. Update unit tests and documentation accordingly. Fixes #4514 --------- Co-authored-by: Carlos O'Ryan <coryan@google.com>
1 parent a20d765 commit f1f8b72

3 files changed

Lines changed: 10 additions & 29 deletions

File tree

src/pubsub/src/subscriber/builder.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ impl StreamingPull {
6464

6565
/// Sets the ack deadline to use for the stream.
6666
///
67-
/// This value represents how long the application has to ack or nack an
68-
/// incoming message. Note that this value is independent of the deadline
67+
/// This value represents how long the application has to ack an
68+
/// incoming message. If the handler is dropped without being acked,
69+
/// it is nacked. Note that this value is independent of the
6970
/// configured on the server-side subscription.
7071
///
7172
/// If the server does not hear back from the client within this deadline
@@ -95,7 +96,7 @@ impl StreamingPull {
9596
/// Flow control settings for the maximum number of outstanding messages.
9697
///
9798
/// The server will stop sending messages to a client when this many
98-
/// messages are outstanding (i.e. that have not been acked or nacked).
99+
/// messages are outstanding (i.e. that have not been acked).
99100
///
100101
/// The server resumes sending messages when the outstanding message count
101102
/// drops below this value.
@@ -122,7 +123,7 @@ impl StreamingPull {
122123
/// Flow control settings for the maximum number of outstanding bytes.
123124
///
124125
/// The server will stop sending messages to a client when this many bytes
125-
/// of messages are outstanding (i.e. that have not been acked or nacked).
126+
/// of messages are outstanding (i.e. that have not been acked).
126127
///
127128
/// The server resumes sending messages when the outstanding byte count
128129
/// drops below this value.

src/pubsub/src/subscriber/handler.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,6 @@ impl Handler {
4040
Handler::AtLeastOnce(h) => h.ack(),
4141
}
4242
}
43-
44-
/// Rejects the message associated with this handler.
45-
///
46-
/// The message will be removed from this `Subscriber`'s lease management.
47-
/// The service will redeliver this message, possibly to another client.
48-
pub fn nack(self) {
49-
match self {
50-
Handler::AtLeastOnce(h) => h.nack(),
51-
}
52-
}
5343
}
5444

5545
#[derive(Debug)]
@@ -91,16 +81,6 @@ impl AtLeastOnce {
9181
}
9282
}
9383

94-
/// Rejects the message associated with this handler.
95-
///
96-
/// The message will be removed from this `Subscriber`'s lease management.
97-
/// The service will redeliver this message, possibly to another client.
98-
pub fn nack(mut self) {
99-
if let Some(inner) = self.inner.take() {
100-
inner.nack();
101-
}
102-
}
103-
10484
#[cfg(test)]
10585
pub(crate) fn ack_id(&self) -> &str {
10686
self.inner
@@ -148,7 +128,7 @@ mod tests {
148128
let h = Handler::AtLeastOnce(AtLeastOnce::new(test_id(1), ack_tx));
149129
assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
150130

151-
h.nack();
131+
drop(h);
152132
let ack = ack_rx.try_recv()?;
153133
assert_eq!(ack, AckResult::Nack(test_id(1)));
154134

@@ -174,7 +154,7 @@ mod tests {
174154
let h = AtLeastOnce::new(test_id(1), ack_tx);
175155
assert_eq!(ack_rx.try_recv(), Err(TryRecvError::Empty));
176156

177-
h.nack();
157+
drop(h);
178158
let ack = ack_rx.try_recv()?;
179159
assert_eq!(ack, AckResult::Nack(test_id(1)));
180160

src/pubsub/src/subscriber/session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ impl Session {
132132

133133
/// Returns the next message received on this subscription.
134134
///
135-
/// The message data is returned along with a [Handler] for acknowledging
136-
/// (ack) or rejecting (nack) the message.
135+
/// Returns the message data along with a [Handler] for acknowledging (ack) the message.
136+
/// Dropping the [Handler] without acknowledging it will reject (nack) the message.
137137
///
138138
/// If the underlying stream encounters a permanent error, an `Error` is
139139
/// returned instead.
@@ -469,7 +469,7 @@ mod tests {
469469
let Some((_, Handler::AtLeastOnce(h))) = session.next().await.transpose()? else {
470470
anyhow::bail!("expected message {i}")
471471
};
472-
h.nack();
472+
drop(h);
473473
}
474474
// Take a long time to process some messages
475475
let mut hold = Vec::new();

0 commit comments

Comments
 (0)