Skip to content

Commit 99a2c94

Browse files
committed
[AIT-98] feat: realtime edits and deletes
1 parent 31969a8 commit 99a2c94

File tree

11 files changed

+756
-94
lines changed

11 files changed

+756
-94
lines changed

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 100 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.ably.lib.types.DeltaExtras;
3232
import io.ably.lib.types.ErrorInfo;
3333
import io.ably.lib.types.Message;
34+
import io.ably.lib.types.MessageAction;
3435
import io.ably.lib.types.MessageAnnotations;
3536
import io.ably.lib.types.MessageDecodeException;
3637
import io.ably.lib.types.MessageOperation;
@@ -46,6 +47,7 @@
4647
import io.ably.lib.types.UpdateDeleteResult;
4748
import io.ably.lib.util.CollectionUtils;
4849
import io.ably.lib.util.EventEmitter;
50+
import io.ably.lib.util.Listeners;
4951
import io.ably.lib.util.Log;
5052
import io.ably.lib.util.ReconnectionStrategy;
5153
import io.ably.lib.util.StringUtils;
@@ -1123,7 +1125,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
11231125
case suspended:
11241126
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
11251127
default:
1126-
connectionManager.send(msg, queueMessages, listener);
1128+
connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener));
11271129
}
11281130
}
11291131

@@ -1206,103 +1208,89 @@ public void getMessageAsync(String serial, Callback<Message> callback) {
12061208
}
12071209

12081210
/**
1209-
* Updates an existing message using patch semantics.
1210-
* <p>
1211-
* Non-null fields in the provided message (name, data, extras) will replace the corresponding
1212-
* fields in the existing message, while null fields will be left unchanged.
1211+
* Asynchronously updates an existing message.
12131212
*
12141213
* @param message A {@link Message} object containing the fields to update and the serial identifier.
1215-
* Only non-null fields will be applied to the existing message.
1216-
* @param operation operation metadata such as clientId, description, or metadata in the version field
1217-
* @throws AblyException If the update operation fails.
1218-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1214+
* <p>
1215+
* This callback is invoked on a background thread.
12191216
*/
1220-
public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException {
1221-
return messageEditsMixin.updateMessage(ably.http, message, operation);
1217+
public void updateMessage(Message message) throws AblyException {
1218+
updateMessage(message, null, null);
12221219
}
12231220

12241221
/**
1225-
* Updates an existing message using patch semantics.
1226-
* <p>
1227-
* Non-null fields in the provided message (name, data, extras) will replace the corresponding
1228-
* fields in the existing message, while null fields will be left unchanged.
1222+
* Asynchronously updates an existing message.
12291223
*
12301224
* @param message A {@link Message} object containing the fields to update and the serial identifier.
1231-
* Only non-null fields will be applied to the existing message.
1232-
* @throws AblyException If the update operation fails.
1233-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1225+
* @param operation operation metadata such as clientId, description, or metadata in the version field
1226+
* <p>
1227+
* This callback is invoked on a background thread.
12341228
*/
1235-
public UpdateDeleteResult updateMessage(Message message) throws AblyException {
1236-
return updateMessage(message, null);
1229+
public void updateMessage(Message message, MessageOperation operation) throws AblyException {
1230+
updateMessage(message, operation, null);
12371231
}
12381232

12391233
/**
12401234
* Asynchronously updates an existing message.
12411235
*
12421236
* @param message A {@link Message} object containing the fields to update and the serial identifier.
12431237
* @param operation operation metadata such as clientId, description, or metadata in the version field
1244-
* @param callback A callback to be notified of the outcome of this operation.
1238+
* @param listener A callback to be notified of the outcome of this operation.
12451239
* <p>
12461240
* This callback is invoked on a background thread.
12471241
*/
1248-
public void updateMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
1249-
messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback);
1242+
public void updateMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException {
1243+
Log.v(TAG, "updateMessage(Message); channel = " + this.name + "; serial = " + message.serial);
1244+
updateDeleteImpl(message, operation, MessageAction.MESSAGE_UPDATE, listener);
12501245
}
12511246

12521247
/**
12531248
* Asynchronously updates an existing message.
12541249
*
12551250
* @param message A {@link Message} object containing the fields to update and the serial identifier.
1256-
* @param callback A callback to be notified of the outcome of this operation.
1251+
* @param listener A callback to be notified of the outcome of this operation.
12571252
* <p>
12581253
* This callback is invoked on a background thread.
12591254
*/
1260-
public void updateMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
1261-
updateMessageAsync(message, null, callback);
1255+
public void updateMessage(Message message, Callback<UpdateDeleteResult> listener) throws AblyException {
1256+
updateMessage(message, null, listener);
12621257
}
12631258

12641259
/**
1265-
* Marks a message as deleted.
1266-
* <p>
1267-
* This operation does not remove the message from history; it marks it as deleted
1268-
* while preserving the full message history. The deleted message can still be
1269-
* retrieved and will have its action set to MESSAGE_DELETE.
1260+
* Asynchronously marks a message as deleted.
12701261
*
1271-
* @param message A {@link Message} message containing the serial identifier.
1272-
* @param operation operation metadata such as clientId, description, or metadata in the version field
1273-
* @throws AblyException If the delete operation fails.
1274-
* @return A {@link UpdateDeleteResult} containing the deleted message version serial.
1262+
* @param message A {@link Message} object containing the serial identifier and operation metadata.
1263+
* <p>
1264+
* This callback is invoked on a background thread.
12751265
*/
1276-
public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException {
1277-
return messageEditsMixin.deleteMessage(ably.http, message, operation);
1266+
public void deleteMessage(Message message) throws AblyException {
1267+
deleteMessage(message, null, null);
12781268
}
12791269

12801270
/**
1281-
* Marks a message as deleted.
1282-
* <p>
1283-
* This operation does not remove the message from history; it marks it as deleted
1284-
* while preserving the full message history. The deleted message can still be
1285-
* retrieved and will have its action set to MESSAGE_DELETE.
1271+
* Asynchronously marks a message as deleted.
12861272
*
1287-
* @param message A {@link Message} message containing the serial identifier.
1288-
* @throws AblyException If the delete operation fails.
1289-
* @return A {@link UpdateDeleteResult} containing the deleted message version serial.
1273+
* @param message A {@link Message} object containing the serial identifier and operation metadata.
1274+
* @param operation operation metadata such as clientId, description, or metadata in the version field
1275+
* <p>
1276+
* This callback is invoked on a background thread.
12901277
*/
1291-
public UpdateDeleteResult deleteMessage(Message message) throws AblyException {
1292-
return deleteMessage(message, null);
1278+
public void deleteMessage(Message message, MessageOperation operation) throws AblyException {
1279+
deleteMessage(message, operation, null);
12931280
}
12941281

12951282
/**
12961283
* Asynchronously marks a message as deleted.
12971284
*
12981285
* @param message A {@link Message} object containing the serial identifier and operation metadata.
12991286
* @param operation operation metadata such as clientId, description, or metadata in the version field
1300-
* @param callback A callback to be notified of the outcome of this operation.
1287+
* @param listener A callback to be notified of the outcome of this operation.
13011288
* <p>
13021289
* This callback is invoked on a background thread.
13031290
*/
1304-
public void deleteMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
1305-
messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback);
1291+
public void deleteMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException {
1292+
Log.v(TAG, "deleteMessage(Message); channel = " + this.name + "; serial = " + message.serial);
1293+
updateDeleteImpl(message, operation, MessageAction.MESSAGE_DELETE, listener);
13061294
}
13071295

13081296
/**
@@ -1313,44 +1301,45 @@ public void deleteMessageAsync(Message message, MessageOperation operation, Call
13131301
* <p>
13141302
* This callback is invoked on a background thread.
13151303
*/
1316-
public void deleteMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
1317-
deleteMessageAsync(message, null, callback);
1304+
public void deleteMessage(Message message, Callback<UpdateDeleteResult> callback) throws AblyException {
1305+
deleteMessage(message, null, callback);
13181306
}
13191307

13201308
/**
1321-
* Appends message text to the end of the message.
1309+
* Asynchronously appends message text to the end of the message.
13221310
*
13231311
* @param message A {@link Message} object containing the serial identifier and data to append.
1324-
* @param operation operation details such as clientId, description, or metadata
1325-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1326-
* @throws AblyException If the append operation fails.
1312+
* <p>
1313+
* This callback is invoked on a background thread.
13271314
*/
1328-
public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException {
1329-
return messageEditsMixin.appendMessage(ably.http, message, operation);
1315+
public void appendMessage(Message message) throws AblyException {
1316+
appendMessage(message, null, null);
13301317
}
13311318

13321319
/**
1333-
* Appends message text to the end of the message.
1320+
* Asynchronously appends message text to the end of the message.
13341321
*
13351322
* @param message A {@link Message} object containing the serial identifier and data to append.
1336-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1337-
* @throws AblyException If the append operation fails.
1323+
* @param operation operation details such as clientId, description, or metadata
1324+
* <p>
1325+
* This callback is invoked on a background thread.
13381326
*/
1339-
public UpdateDeleteResult appendMessage(Message message) throws AblyException {
1340-
return appendMessage(message, null);
1327+
public void appendMessage(Message message, MessageOperation operation) throws AblyException {
1328+
appendMessage(message, operation, null);
13411329
}
13421330

13431331
/**
13441332
* Asynchronously appends message text to the end of the message.
13451333
*
13461334
* @param message A {@link Message} object containing the serial identifier and data to append.
13471335
* @param operation operation details such as clientId, description, or metadata
1348-
* @param callback A callback to be notified of the outcome of this operation.
1336+
* @param listener A callback to be notified of the outcome of this operation.
13491337
* <p>
13501338
* This callback is invoked on a background thread.
13511339
*/
1352-
public void appendMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
1353-
messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback);
1340+
public void appendMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException {
1341+
Log.v(TAG, "appendMessage(Message); channel = " + this.name + "; serial = " + message.serial);
1342+
updateDeleteImpl(message, operation, MessageAction.MESSAGE_APPEND, listener);
13541343
}
13551344

13561345
/**
@@ -1361,8 +1350,50 @@ public void appendMessageAsync(Message message, MessageOperation operation, Call
13611350
* <p>
13621351
* This callback is invoked on a background thread.
13631352
*/
1364-
public void appendMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
1365-
appendMessageAsync(message, null, callback);
1353+
public void appendMessage(Message message, Callback<UpdateDeleteResult> callback) throws AblyException {
1354+
appendMessage(message, null, callback);
1355+
}
1356+
1357+
private void updateDeleteImpl(
1358+
Message message,
1359+
MessageOperation operation,
1360+
MessageAction action,
1361+
Callback<UpdateDeleteResult> listener
1362+
) throws AblyException {
1363+
if (message.serial == null || message.serial.isEmpty()) {
1364+
throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003));
1365+
}
1366+
ConnectionManager connectionManager = ably.connection.connectionManager;
1367+
ConnectionManager.State connectionState = connectionManager.getConnectionState();
1368+
boolean queueMessages = ably.options.queueMessages;
1369+
if (!connectionManager.isActive() || (connectionState.queueEvents && !queueMessages)) {
1370+
throw AblyException.fromErrorInfo(connectionState.defaultErrorInfo);
1371+
}
1372+
boolean connected = (connectionState.sendEvents);
1373+
1374+
Message updatedMessage = new Message(message.name, message.data, message.extras);
1375+
updatedMessage.serial = message.serial;
1376+
updatedMessage.action = action;
1377+
updatedMessage.version = new MessageVersion();
1378+
if (operation != null) {
1379+
updatedMessage.version.clientId = operation.clientId;
1380+
updatedMessage.version.description = operation.description;
1381+
updatedMessage.version.metadata = operation.metadata;
1382+
}
1383+
1384+
try {
1385+
ably.auth.checkClientId(message, true, connected);
1386+
updatedMessage.encode(options);
1387+
} catch (AblyException e) {
1388+
if (listener != null) {
1389+
listener.onError(e.errorInfo);
1390+
}
1391+
return;
1392+
}
1393+
1394+
ProtocolMessage msg = new ProtocolMessage(Action.message, this.name);
1395+
msg.messages = new Message[] { updatedMessage };
1396+
connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener));
13661397
}
13671398

13681399
/**
@@ -1681,7 +1712,7 @@ public void once(ChannelState state, ChannelStateListener listener) {
16811712
*/
16821713
public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException {
16831714
ConnectionManager connectionManager = ably.connection.connectionManager;
1684-
connectionManager.send(protocolMessage, ably.options.queueMessages, listener);
1715+
connectionManager.send(protocolMessage, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
16851716
}
16861717

16871718
private static final String TAG = Channel.class.getName();

lib/src/main/java/io/ably/lib/realtime/Presence.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import io.ably.lib.types.PresenceMessage;
1616
import io.ably.lib.types.PresenceSerializer;
1717
import io.ably.lib.types.ProtocolMessage;
18+
import io.ably.lib.types.PublishResult;
19+
import io.ably.lib.util.Listeners;
1820
import io.ably.lib.util.Log;
1921
import io.ably.lib.util.StringUtils;
2022

@@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws
120122
return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId));
121123
}
122124

123-
void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) {
125+
void addPendingPresence(PresenceMessage presenceMessage, Callback<PublishResult> listener) {
124126
synchronized(channel) {
125-
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener);
127+
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener));
126128
pendingPresence.add(queuedPresence);
127129
}
128130
}
@@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
763765
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
764766
message.presence = new PresenceMessage[] { msg };
765767
ConnectionManager connectionManager = ably.connection.connectionManager;
766-
connectionManager.send(message, ably.options.queueMessages, listener);
768+
connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
767769
break;
768770
default:
769771
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001));
@@ -892,7 +894,7 @@ private void sendQueuedMessages() {
892894
pendingPresence.clear();
893895

894896
try {
895-
connectionManager.send(message, queueMessages, listener);
897+
connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener));
896898
} catch(AblyException e) {
897899
Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e);
898900
if(listener != null)

0 commit comments

Comments
 (0)