Skip to content

Commit 0b93c10

Browse files
committed
[AIT-258] feat: add Realtime mutable message support
- Updated `ConnectionManager` and `MessageQueue` to process `PublishResult` during acknowledgments (ACK/NACK). - Extended `send_protocol_message` to return `PublishResult` for publish tracking. - Bumped default `protocol_version` to 5. - Added tests for message update, delete, append operations, and PublishResult handling.
1 parent 1723f5d commit 0b93c10

File tree

6 files changed

+560
-21
lines changed

6 files changed

+560
-21
lines changed

ably/realtime/connectionmanager.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
from collections import deque
66
from datetime import datetime
7+
from itertools import zip_longest
78
from typing import TYPE_CHECKING
89

910
import httpx
@@ -13,6 +14,7 @@
1314
from ably.types.connectiondetails import ConnectionDetails
1415
from ably.types.connectionerrors import ConnectionErrors
1516
from ably.types.connectionstate import ConnectionEvent, ConnectionState, ConnectionStateChange
17+
from ably.types.operations import PublishResult
1618
from ably.types.tokendetails import TokenDetails
1719
from ably.util.eventemitter import EventEmitter
1820
from ably.util.exceptions import AblyException, IncompatibleClientIdException
@@ -29,7 +31,7 @@ class PendingMessage:
2931

3032
def __init__(self, message: dict):
3133
self.message = message
32-
self.future: asyncio.Future | None = None
34+
self.future: asyncio.Future[PublishResult] | None = None
3335
action = message.get('action')
3436

3537
# Messages that require acknowledgment: MESSAGE, PRESENCE, ANNOTATION, OBJECT
@@ -58,15 +60,22 @@ def count(self) -> int:
5860
"""Return the number of pending messages"""
5961
return len(self.messages)
6062

61-
def complete_messages(self, serial: int, count: int, err: AblyException | None = None) -> None:
63+
def complete_messages(
64+
self,
65+
serial: int,
66+
count: int,
67+
res: list[PublishResult] | None,
68+
err: AblyException | None = None
69+
) -> None:
6270
"""Complete messages based on serial and count from ACK/NACK
6371
6472
Args:
6573
serial: The msgSerial of the first message being acknowledged
6674
count: The number of messages being acknowledged
75+
res: List of PublishResult objects for each message acknowledged, or None if not available
6776
err: Error from NACK, or None for successful ACK
6877
"""
69-
log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, err={err}')
78+
log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, res={res}, err={err}')
7079

7180
if not self.messages:
7281
log.warning('MessageQueue.complete_messages(): called on empty queue')
@@ -87,12 +96,17 @@ def complete_messages(self, serial: int, count: int, err: AblyException | None =
8796
completed_messages = self.messages[:num_to_complete]
8897
self.messages = self.messages[num_to_complete:]
8998

90-
for msg in completed_messages:
99+
# Default res to empty list if None
100+
res_list = res if res is not None else []
101+
for (msg, publish_result) in zip_longest(completed_messages, res_list):
91102
if msg.future and not msg.future.done():
92103
if err:
93104
msg.future.set_exception(err)
94105
else:
95-
msg.future.set_result(None)
106+
# If publish_result is None, return empty PublishResult
107+
if publish_result is None:
108+
publish_result = PublishResult()
109+
msg.future.set_result(publish_result)
96110

97111
def complete_all_messages(self, err: AblyException) -> None:
98112
"""Complete all pending messages with an error"""
@@ -199,7 +213,7 @@ async def close_impl(self) -> None:
199213

200214
self.notify_state(ConnectionState.CLOSED)
201215

202-
async def send_protocol_message(self, protocol_message: dict) -> None:
216+
async def send_protocol_message(self, protocol_message: dict) -> PublishResult | None:
203217
"""Send a protocol message and optionally track it for acknowledgment
204218
205219
Args:
@@ -233,12 +247,14 @@ async def send_protocol_message(self, protocol_message: dict) -> None:
233247
if state_should_queue:
234248
self.queued_messages.appendleft(pending_message)
235249
if pending_message.ack_required:
236-
await pending_message.future
250+
return await pending_message.future
237251
return None
238252

239253
return await self._send_protocol_message_on_connected_state(pending_message)
240254

241-
async def _send_protocol_message_on_connected_state(self, pending_message: PendingMessage) -> None:
255+
async def _send_protocol_message_on_connected_state(
256+
self, pending_message: PendingMessage
257+
) -> PublishResult | None:
242258
if self.state == ConnectionState.CONNECTED and self.transport:
243259
# Add to pending queue before sending (for messages being resent from queue)
244260
if pending_message.ack_required and pending_message not in self.pending_message_queue.messages:
@@ -253,7 +269,7 @@ async def _send_protocol_message_on_connected_state(self, pending_message: Pendi
253269
AblyException("No active transport", 500, 50000)
254270
)
255271
if pending_message.ack_required:
256-
await pending_message.future
272+
return await pending_message.future
257273
return None
258274

259275
def send_queued_messages(self) -> None:
@@ -449,15 +465,18 @@ def on_heartbeat(self, id: str | None) -> None:
449465
self.__ping_future.set_result(None)
450466
self.__ping_future = None
451467

452-
def on_ack(self, serial: int, count: int) -> None:
468+
def on_ack(
469+
self, serial: int, count: int, res: list[PublishResult] | None
470+
) -> None:
453471
"""Handle ACK protocol message from server
454472
455473
Args:
456474
serial: The msgSerial of the first message being acknowledged
457475
count: The number of messages being acknowledged
476+
res: List of PublishResult objects for each message acknowledged, or None if not available
458477
"""
459-
log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}')
460-
self.pending_message_queue.complete_messages(serial, count)
478+
log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}, res={res}')
479+
self.pending_message_queue.complete_messages(serial, count, res)
461480

462481
def on_nack(self, serial: int, count: int, err: AblyException | None) -> None:
463482
"""Handle NACK protocol message from server
@@ -471,7 +490,7 @@ def on_nack(self, serial: int, count: int, err: AblyException | None) -> None:
471490
err = AblyException('Unable to send message; channel not responding', 50001, 500)
472491

473492
log.error(f'ConnectionManager.on_nack(): serial={serial}, count={count}, err={err}')
474-
self.pending_message_queue.complete_messages(serial, count, err)
493+
self.pending_message_queue.complete_messages(serial, count, None, err)
475494

476495
def deactivate_transport(self, reason: AblyException | None = None):
477496
# RTN19a: Before disconnecting, requeue any pending messages

0 commit comments

Comments
 (0)