3131
3232package org .opensearch .cluster .coordination ;
3333
34+ import java .util .Locale ;
35+ import java .util .Optional ;
3436import org .apache .logging .log4j .LogManager ;
3537import org .apache .logging .log4j .Logger ;
3638import org .apache .logging .log4j .message .ParameterizedMessage ;
4749import org .opensearch .core .common .io .stream .NamedWriteableRegistry ;
4850import org .opensearch .core .common .io .stream .StreamInput ;
4951import org .opensearch .core .transport .TransportResponse ;
52+ import org .opensearch .gateway .remote .ClusterMetadataManifest ;
53+ import org .opensearch .gateway .remote .RemoteClusterStateService ;
5054import org .opensearch .threadpool .ThreadPool ;
5155import org .opensearch .transport .BytesTransportRequest ;
5256import org .opensearch .transport .TransportChannel ;
@@ -74,6 +78,7 @@ public class PublicationTransportHandler {
7478 private static final Logger logger = LogManager .getLogger (PublicationTransportHandler .class );
7579
7680 public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state" ;
81+ public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state" ;
7782 public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state" ;
7883
7984 private final TransportService transportService ;
@@ -97,16 +102,19 @@ public class PublicationTransportHandler {
97102 private final TransportRequestOptions stateRequestOptions = TransportRequestOptions .builder ()
98103 .withType (TransportRequestOptions .Type .STATE )
99104 .build ();
105+ private final RemoteClusterStateService remoteClusterStateService ;
100106
101107 public PublicationTransportHandler (
102108 TransportService transportService ,
103109 NamedWriteableRegistry namedWriteableRegistry ,
104110 Function <PublishRequest , PublishWithJoinResponse > handlePublishRequest ,
105- BiConsumer <ApplyCommitRequest , ActionListener <Void >> handleApplyCommit
111+ BiConsumer <ApplyCommitRequest , ActionListener <Void >> handleApplyCommit ,
112+ RemoteClusterStateService remoteClusterStateService
106113 ) {
107114 this .transportService = transportService ;
108115 this .namedWriteableRegistry = namedWriteableRegistry ;
109116 this .handlePublishRequest = handlePublishRequest ;
117+ this .remoteClusterStateService = remoteClusterStateService ;
110118
111119 transportService .registerRequestHandler (
112120 PUBLISH_STATE_ACTION_NAME ,
@@ -117,6 +125,15 @@ public PublicationTransportHandler(
117125 (request , channel , task ) -> channel .sendResponse (handleIncomingPublishRequest (request ))
118126 );
119127
128+ transportService .registerRequestHandler (
129+ PUBLISH_REMOTE_STATE_ACTION_NAME ,
130+ ThreadPool .Names .GENERIC ,
131+ false ,
132+ false ,
133+ RemotePublishRequest ::new ,
134+ (request , channel , task ) -> channel .sendResponse (handleIncomingRemotePublishRequest (request ))
135+ );
136+
120137 transportService .registerRequestHandler (
121138 COMMIT_STATE_ACTION_NAME ,
122139 ThreadPool .Names .GENERIC ,
@@ -211,6 +228,44 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
211228 }
212229 }
213230
231+ private PublishWithJoinResponse handleIncomingRemotePublishRequest (RemotePublishRequest request ) throws IOException {
232+ final Optional <ClusterMetadataManifest > manifestOptional = remoteClusterStateService .getClusterMetadataManifestByTermVersion (request .getClusterName (), request .getClusterUUID (), request .term , request .version );
233+ if (manifestOptional .isPresent () == false ) {
234+ throw new IllegalStateException (
235+ String .format (Locale .ROOT , "Manifest is not present for term - %s version - %s" , request .term , request .version )
236+ );
237+ }
238+ ClusterMetadataManifest manifest = manifestOptional .get ();
239+ boolean applyFullState = false ;
240+ final ClusterState lastSeen = lastSeenClusterState .get ();
241+ if (lastSeen == null ) {
242+ logger .debug ("Diff cannot be applied as there is no last cluster state" );
243+ applyFullState = true ;
244+ } else if (manifest .getDiffManifest () == null ) {
245+ logger .debug ("There is no diff in the manifest" );
246+ applyFullState = true ;
247+ } else if (manifest .getDiffManifest ().getFromStateUUID ().equals (lastSeen .stateUUID ()) == false ) {
248+ logger .debug ("Last cluster state not compatible with the diff" );
249+ applyFullState = true ;
250+ }
251+
252+ if (applyFullState == true ) {
253+ ClusterState clusterState = remoteClusterStateService .getClusterStateForManifest (request .getClusterName (), manifest , transportService .getLocalNode ().getId ());
254+ logger .debug ("Downloaded full cluster state [{}]" , clusterState );
255+ fullClusterStateReceivedCount .incrementAndGet ();
256+ final PublishWithJoinResponse response = acceptState (clusterState );
257+ lastSeenClusterState .set (clusterState );
258+ return response ;
259+ } else {
260+ ClusterState clusterState = remoteClusterStateService .getClusterStateUsingDiff (request .getClusterName (), manifest , lastSeenClusterState .get (), transportService .getLocalNode ().getId ());
261+ logger .debug ("Downloaded full cluster state from diff [{}]" , clusterState );
262+ compatibleClusterStateDiffReceivedCount .incrementAndGet ();
263+ final PublishWithJoinResponse response = acceptState (clusterState );
264+ lastSeenClusterState .compareAndSet (lastSeen , clusterState );
265+ return response ;
266+ }
267+ }
268+
214269 private PublishWithJoinResponse acceptState (ClusterState incomingState ) {
215270 // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
216271 if (transportService .getLocalNode ().equals (incomingState .nodes ().getClusterManagerNode ())) {
@@ -224,8 +279,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
224279 return handlePublishRequest .apply (new PublishRequest (incomingState ));
225280 }
226281
227- public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent ) {
228- final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent );
282+ public PublicationContext newPublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemoteStateEnabled ) {
283+ final PublicationContext publicationContext = new PublicationContext (clusterChangedEvent , isRemoteStateEnabled );
229284
230285 // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
231286 // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
@@ -270,12 +325,14 @@ public class PublicationContext {
270325 private final boolean sendFullVersion ;
271326 private final Map <Version , BytesReference > serializedStates = new HashMap <>();
272327 private final Map <Version , BytesReference > serializedDiffs = new HashMap <>();
328+ private final boolean sendRemoteState ;
273329
274- PublicationContext (ClusterChangedEvent clusterChangedEvent ) {
330+ PublicationContext (ClusterChangedEvent clusterChangedEvent , boolean isRemoteStateEnabled ) {
275331 discoveryNodes = clusterChangedEvent .state ().nodes ();
276332 newState = clusterChangedEvent .state ();
277333 previousState = clusterChangedEvent .previousState ();
278334 sendFullVersion = previousState .getBlocks ().disableStatePersistence ();
335+ sendRemoteState = isRemoteStateEnabled ;
279336 }
280337
281338 void buildDiffAndSerializeStates () {
@@ -339,7 +396,9 @@ public void onFailure(Exception e) {
339396 } else {
340397 responseActionListener = listener ;
341398 }
342- if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
399+ if (sendRemoteState && destination .isRemoteStateNode ()) {
400+ sendRemoteClusterState (destination , publishRequest .getAcceptedState (), responseActionListener );
401+ } else if (sendFullVersion || previousState .nodes ().nodeExists (destination ) == false ) {
343402 logger .trace ("sending full cluster state version [{}] to [{}]" , newState .version (), destination );
344403 sendFullClusterState (destination , responseActionListener );
345404 } else {
@@ -384,6 +443,43 @@ public String executor() {
384443 );
385444 }
386445
446+ private void sendRemoteClusterState (DiscoveryNode destination , ClusterState clusterState , ActionListener <PublishWithJoinResponse > listener ) {
447+ try {
448+ final RemotePublishRequest remotePublishRequest = new RemotePublishRequest (discoveryNodes .getLocalNode (), clusterState .term (), clusterState .getVersion (), clusterState .getClusterName ().value (), clusterState .metadata ().clusterUUID ());
449+ final Consumer <TransportException > transportExceptionHandler = exp -> {
450+ logger .debug (() -> new ParameterizedMessage ("failed to send remote cluster state to {}" , destination ), exp );
451+ listener .onFailure (exp );
452+ };
453+ final TransportResponseHandler <PublishWithJoinResponse > responseHandler = new TransportResponseHandler <
454+ PublishWithJoinResponse >() {
455+
456+ @ Override
457+ public PublishWithJoinResponse read (StreamInput in ) throws IOException {
458+ return new PublishWithJoinResponse (in );
459+ }
460+
461+ @ Override
462+ public void handleResponse (PublishWithJoinResponse response ) {
463+ listener .onResponse (response );
464+ }
465+
466+ @ Override
467+ public void handleException (TransportException exp ) {
468+ transportExceptionHandler .accept (exp );
469+ }
470+
471+ @ Override
472+ public String executor () {
473+ return ThreadPool .Names .GENERIC ;
474+ }
475+ };
476+ transportService .sendRequest (destination , PUBLISH_REMOTE_STATE_ACTION_NAME , remotePublishRequest , stateRequestOptions , responseHandler );
477+ } catch (Exception e ) {
478+ logger .warn (() -> new ParameterizedMessage ("error sending remote cluster state to {}" , destination ), e );
479+ listener .onFailure (e );
480+ }
481+ }
482+
387483 private void sendFullClusterState (DiscoveryNode destination , ActionListener <PublishWithJoinResponse > listener ) {
388484 BytesReference bytes = serializedStates .get (destination .getVersion ());
389485 if (bytes == null ) {
0 commit comments