3333package org .opensearch .cluster .coordination ;
3434
3535import org .apache .logging .log4j .LogManager ;
36+ import org .apache .logging .log4j .Logger ;
3637import org .apache .logging .log4j .core .LoggerContext ;
3738import org .apache .logging .log4j .core .config .Configuration ;
3839import org .apache .logging .log4j .core .config .LoggerConfig ;
40+ import org .apache .logging .log4j .message .ParameterizedMessage ;
3941import org .opensearch .action .admin .cluster .health .ClusterHealthResponse ;
42+ import org .opensearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
43+ import org .opensearch .cluster .ClusterState ;
44+ import org .opensearch .cluster .ClusterStateListener ;
4045import org .opensearch .cluster .NodeConnectionsService ;
4146import org .opensearch .cluster .metadata .IndexMetadata ;
47+ import org .opensearch .cluster .service .ClusterApplierService ;
4248import org .opensearch .cluster .service .ClusterService ;
4349import org .opensearch .common .SuppressForbidden ;
4450import org .opensearch .common .settings .Settings ;
51+ import org .opensearch .common .unit .TimeValue ;
52+ import org .opensearch .core .index .Index ;
53+ import org .opensearch .core .index .shard .ShardId ;
54+ import org .opensearch .env .NodeEnvironment ;
55+ import org .opensearch .index .IndexSettings ;
4556import org .opensearch .index .MockEngineFactoryPlugin ;
57+ import org .opensearch .index .store .IndexStoreListener ;
4658import org .opensearch .indices .recovery .RecoverySettings ;
59+ import org .opensearch .plugins .IndexStorePlugin ;
4760import org .opensearch .plugins .Plugin ;
4861import org .opensearch .tasks .Task ;
4962import org .opensearch .test .InternalSettingsPlugin ;
6275import org .junit .After ;
6376import org .junit .Before ;
6477
78+ import java .lang .reflect .Method ;
79+ import java .util .ArrayList ;
6580import java .util .Arrays ;
6681import java .util .Collection ;
82+ import java .util .HashMap ;
6783import java .util .List ;
84+ import java .util .Map ;
85+ import java .util .Optional ;
86+ import java .util .Set ;
87+ import java .util .concurrent .CountDownLatch ;
88+ import java .util .concurrent .Executors ;
89+ import java .util .concurrent .ScheduledExecutorService ;
6890import java .util .concurrent .TimeUnit ;
6991import java .util .concurrent .atomic .AtomicBoolean ;
7092
7193import static org .opensearch .cluster .coordination .FollowersChecker .FOLLOWER_CHECK_ACTION_NAME ;
7294import static org .hamcrest .Matchers .is ;
7395
7496/**
75- Check https://github.com/opensearch-project/OpenSearch/issues/4874 and
76- https://github.com/opensearch-project/OpenSearch/pull/15521 for context
97+ * Check https://github.com/opensearch-project/OpenSearch/issues/4874 and
98+ * https://github.com/opensearch-project/OpenSearch/pull/15521 for context
7799 */
78100@ ClusterScope (scope = Scope .TEST , numDataNodes = 0 )
79101@ SuppressForbidden (reason = "Pending fix: https://github.com/opensearch-project/OpenSearch/issues/18972" )
@@ -82,16 +104,32 @@ public class NodeJoinLeftIT extends OpenSearchIntegTestCase {
82104 private TestLogsAppender testLogsAppender ;
83105 private String clusterManager ;
84106 private String redNodeName ;
107+ private Settings nodeSettings ;
85108 private LoggerContext loggerContext ;
86109
87110 @ Override
88111 protected Collection <Class <? extends Plugin >> nodePlugins () {
89- return Arrays .asList (
112+ List < Class <? extends Plugin >> plugins = Arrays .asList (
90113 MockTransportService .TestPlugin .class ,
91114 MockFSIndexStore .TestPlugin .class ,
92115 InternalSettingsPlugin .class ,
93116 MockEngineFactoryPlugin .class
94117 );
118+
119+ if (requiresTestIndexStoreListener ()) {
120+ plugins = new ArrayList <>(plugins );
121+ plugins .add (TestIndexStoreListenerPlugin .class );
122+ }
123+ return plugins ;
124+ }
125+
126+ private boolean requiresTestIndexStoreListener () {
127+ try {
128+ Method testMethod = getClass ().getMethod (getTestName ());
129+ return testMethod .isAnnotationPresent (RequiresTestIndexStoreListener .class );
130+ } catch (NoSuchMethodException e ) {
131+ return false ;
132+ }
95133 }
96134
97135 @ Override
@@ -107,7 +145,12 @@ protected void beforeIndexDeletion() throws Exception {
107145 public void setUp () throws Exception {
108146 super .setUp ();
109147 // Add any other specific messages you want to capture
110- List <String > messagesToCapture = Arrays .asList ("failed to join" , "IllegalStateException" );
148+ List <String > messagesToCapture = new ArrayList <String >() {
149+ {
150+ add ("failed to join" );
151+ add ("IllegalStateException" );
152+ }
153+ };
111154 testLogsAppender = new TestLogsAppender (messagesToCapture );
112155 loggerContext = (LoggerContext ) LogManager .getContext (false );
113156 Configuration config = loggerContext .getConfiguration ();
@@ -116,7 +159,7 @@ public void setUp() throws Exception {
116159 loggerContext .updateLoggers ();
117160
118161 String indexName = "test" ;
119- final Settings nodeSettings = Settings .builder ()
162+ this . nodeSettings = Settings .builder ()
120163 .put (RecoverySettings .INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING .getKey (), "100ms" )
121164 .put (NodeConnectionsService .CLUSTER_NODE_RECONNECT_INTERVAL_SETTING .getKey (), "10s" )
122165 .put (FollowersChecker .FOLLOWER_CHECK_TIMEOUT_SETTING .getKey (), "200ms" )
@@ -125,7 +168,7 @@ public void setUp() throws Exception {
125168 .put (NodeConnectionsService .CLUSTER_NODE_RECONNECT_INTERVAL_SETTING .getKey (), "100ms" )
126169 .build ();
127170 // start a 3 node cluster with 1 cluster-manager
128- this .clusterManager = internalCluster ().startNode (nodeSettings );
171+ this .clusterManager = internalCluster ().startClusterManagerOnlyNode (nodeSettings );
129172 internalCluster ().startNode (Settings .builder ().put ("node.attr.color" , "blue" ).put (nodeSettings ).build ());
130173 this .redNodeName = internalCluster ().startNode (Settings .builder ().put ("node.attr.color" , "red" ).put (nodeSettings ).build ());
131174
@@ -308,6 +351,173 @@ public void testClusterStabilityWhenDisconnectDuringSlowNodeLeftTask() throws Ex
308351 assertTrue ("Expected log was not found within the timeout period" , logFound );
309352 }
310353
354+ @ RequiresTestIndexStoreListener
355+ public void testClusterStabilityWhenClusterStatePublicationLagsOnShardCleanup () throws Exception {
356+ additionalSetupForLagDuringDataMigration ();
357+ try {
358+ TestIndexStoreListener .DELAY_SHARD_ASSIGNMENT = true ;
359+ logger .info ("Moving all shards to red nodes" );
360+ client ().admin ()
361+ .indices ()
362+ .prepareUpdateSettings ("*" )
363+ .setSettings (Settings .builder ().put ("index.routing.allocation.include.color" , "red" ))
364+ .get ();
365+ validateNodeDropDueToPublicationLag ();
366+ TestIndexStoreListener .DELAY_SHARD_ASSIGNMENT = false ;
367+ validateClusterRecovery ();
368+ } finally {
369+ TestIndexStoreListener .DELAY_SHARD_ASSIGNMENT = false ;
370+ }
371+ }
372+
373+ public void testClusterStabilityWhenClusterStatePublicationLagsWithLongRunningListenerOnApplierThread () throws Exception {
374+ additionalSetupForLagDuringDataMigration ();
375+ Map <ClusterApplierService , ClusterStateListener > redNodeListeners = new HashMap <>();
376+ try {
377+ // Setup listeners only on red data nodes
378+ for (String nodeName : internalCluster ().getNodeNames ()) {
379+ Settings nodeSettings = internalCluster ().getInstance (Settings .class , nodeName );
380+ if ("red" .equals (nodeSettings .get ("node.attr.color" )) && nodeSettings .getAsBoolean ("node.data" , true )) {
381+ ClusterApplierService applierService = internalCluster ().getInstance (ClusterService .class , nodeName )
382+ .getClusterApplierService ();
383+ ClusterStateListener listener = createDelayListener (applierService );
384+ redNodeListeners .put (applierService , listener );
385+ applierService .addListener (listener );
386+ }
387+ }
388+ logger .info ("Moving all shards to red nodes" );
389+ client ().admin ()
390+ .indices ()
391+ .prepareUpdateSettings ("*" )
392+ .setSettings (Settings .builder ().put ("index.routing.allocation.include.color" , "red" ))
393+ .get ();
394+ validateNodeDropDueToPublicationLag ();
395+ } finally {
396+ // Cleanup listeners
397+ redNodeListeners .forEach (ClusterApplierService ::removeListener );
398+ }
399+ validateClusterRecovery ();
400+ }
401+
402+ private void additionalSetupForLagDuringDataMigration () {
403+ internalCluster ().startClusterManagerOnlyNodes (2 , nodeSettings );
404+ internalCluster ().startDataOnlyNode (Settings .builder ().put ("node.attr.color" , "red" ).put (nodeSettings ).build ());
405+ internalCluster ().startDataOnlyNode (Settings .builder ().put ("node.attr.color" , "red" ).put (nodeSettings ).build ());
406+ internalCluster ().startDataOnlyNode (Settings .builder ().put ("node.attr.color" , "blue" ).put (nodeSettings ).build ());
407+ internalCluster ().startDataOnlyNode (Settings .builder ().put ("node.attr.color" , "blue" ).put (nodeSettings ).build ());
408+ internalCluster ().client ().admin ().cluster ().prepareHealth ().setWaitForNodes ("9" ).get ();
409+ internalCluster ().client ()
410+ .admin ()
411+ .indices ()
412+ .prepareCreate ("index-1" )
413+ .setSettings (
414+ Settings .builder ()
415+ .put (IndexMetadata .INDEX_ROUTING_INCLUDE_GROUP_SETTING .getKey () + "color" , "blue" )
416+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
417+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 1 )
418+ )
419+ .get ();
420+ internalCluster ().client ().admin ().cluster ().prepareHealth ().setWaitForGreenStatus ().get ();
421+ ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest ();
422+ Settings settings = Settings .builder ()
423+ .put ("cluster.follower_lag.timeout" , "5s" )
424+ .put ("cluster.publish.timeout" , "15s" )
425+ .put ("cluster.routing.allocation.cluster_concurrent_recoveries" , 4 )
426+ .build ();
427+ settingsRequest .transientSettings (settings );
428+ internalCluster ().client ().admin ().cluster ().updateSettings (settingsRequest ).actionGet ();
429+ // Introducing a delay of 3sec on cluster manager applier thread to ensure join request from peer finder is received during
430+ // node-left
431+ ClusterService clusterManagerClsService = internalCluster ().getInstance (ClusterService .class , clusterManager );
432+ clusterManagerClsService .addStateApplier (event -> {
433+ if (event .nodesRemoved ()) {
434+ logger .info ("Adding a 3 sec delay on cluster manager applier thread" );
435+ CountDownLatch latch = new CountDownLatch (1 );
436+ ScheduledExecutorService executor = Executors .newSingleThreadScheduledExecutor ();
437+ executor .schedule (() -> { latch .countDown (); }, 3 , TimeUnit .SECONDS );
438+ try {
439+ latch .await ();
440+ } catch (InterruptedException e ) {
441+ logger .info ("Interrupted while waiting for cluster manager applier delay" );
442+ Thread .currentThread ().interrupt ();
443+ }
444+ executor .shutdown ();
445+ }
446+ });
447+ testLogsAppender .addMessagesToCapture (Set .of ("Sleeping for 30 seconds" , "NodeRemovalClusterStateTaskExecutor" , "reason: lagging" ));
448+ testLogsAppender .clearCapturedLogs ();
449+ }
450+
451+ private void validateNodeDropDueToPublicationLag () {
452+ ClusterHealthResponse clusterHealthResponse = internalCluster ().client ()
453+ .admin ()
454+ .cluster ()
455+ .prepareHealth ()
456+ .setWaitForNodes ("<9" )
457+ .setTimeout (TimeValue .timeValueSeconds (120 ))
458+ .get ();
459+ logger .info ("Cluster health {}" , clusterHealthResponse );
460+ assertFalse ("Cluster didn't have a node drop" , clusterHealthResponse .isTimedOut ());
461+ boolean logFound ;
462+ logFound = testLogsAppender .waitForLog ("Sleeping for 30 seconds" , 30 , TimeUnit .SECONDS );
463+ assertTrue ("Expected log for delay in shard cleanup was not found within the timeout period" , logFound );
464+ logFound = testLogsAppender .waitForLog (
465+ "Tasks batched with key: org.opensearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor" ,
466+ 30 ,
467+ TimeUnit .SECONDS
468+ ) && testLogsAppender .waitForLog ("reason: lagging" , 30 , TimeUnit .SECONDS );
469+ assertTrue ("Expected log for node removal due to publication lag was not found within the timeout period" , logFound );
470+ // assert that join requests fail with the right exception
471+ logFound = testLogsAppender .waitForLog ("failed to join" , 30 , TimeUnit .SECONDS )
472+ && testLogsAppender .waitForLog (
473+ "IllegalStateException[cannot make a new connection as disconnect to node" ,
474+ 30 ,
475+ TimeUnit .SECONDS
476+ );
477+ assertTrue ("Expected log for join request failure was not found within the timeout period" , logFound );
478+ }
479+
480+ private void validateClusterRecovery () {
481+ logger .info ("Checking if cluster is stable after long running thread" );
482+
483+ ClusterHealthResponse response = internalCluster ().client ()
484+ .admin ()
485+ .cluster ()
486+ .prepareHealth ()
487+ .setWaitForGreenStatus ()
488+ .setWaitForNodes ("9" )
489+ .setTimeout (TimeValue .timeValueSeconds (60 ))
490+ .get ();
491+ logger .info ("Cluster health response after removing delay: {}" , response );
492+ assertFalse ("Cluster health response: " + response .toString (), response .isTimedOut ());
493+ assertEquals ("Not all shards are active after moving shards from blue to red nodes" , 3 , response .getActiveShards ());
494+ // Assert that all shards are migrated to new node-type (red nodes)
495+ ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
496+ assertTrue (
497+ "All shards should be migrated to red nodes" ,
498+ clusterState .getRoutingTable ()
499+ .allShards ()
500+ .stream ()
501+ .allMatch (shard -> clusterState .nodes ().get (shard .currentNodeId ()).getAttributes ().get ("color" ).equals ("red" ))
502+ );
503+ }
504+
505+ private ClusterStateListener createDelayListener (ClusterApplierService applierService ) {
506+ return event -> applierService .runOnApplierThread ("NodeJoinLeftIT" , clusterState -> {
507+ logger .info ("Sleeping for 30 seconds" );
508+ CountDownLatch latch = new CountDownLatch (1 );
509+ ScheduledExecutorService executor = Executors .newSingleThreadScheduledExecutor ();
510+ executor .schedule (() -> { latch .countDown (); }, 30 , TimeUnit .SECONDS );
511+ try {
512+ latch .await ();
513+ } catch (InterruptedException e ) {
514+ logger .info ("Interrupted while waiting for cluster state applier" );
515+ Thread .currentThread ().interrupt ();
516+ }
517+ executor .shutdown ();
518+ }, (source , e ) -> logger .error (() -> new ParameterizedMessage ("{} unexpected error in listener wait" , source ), e ));
519+ }
520+
311521 public void testRestartDataNode () throws Exception {
312522
313523 Settings redNodeDataPathSettings = internalCluster ().dataPathSettings (redNodeName );
@@ -360,4 +570,54 @@ public void messageReceived(
360570 handler .messageReceived (request , channel , task );
361571 }
362572 }
573+
574+ public static class TestIndexStoreListenerPlugin extends Plugin implements IndexStorePlugin {
575+ @ Override
576+ public Optional <IndexStoreListener > getIndexStoreListener () {
577+ return Optional .of (new TestIndexStoreListener ());
578+ }
579+ }
580+
581+ public static class TestIndexStoreListener implements IndexStoreListener {
582+
583+ private static final Logger logger = LogManager .getLogger (TestIndexStoreListener .class );
584+ private static volatile boolean DELAY_SHARD_ASSIGNMENT = false ;
585+ private static final int SHARD_DELETE_DELAY_SECONDS = 30 ;
586+
587+ public TestIndexStoreListener () {}
588+
589+ @ Override
590+ public void beforeShardPathDeleted (ShardId shardId , IndexSettings indexSettings , NodeEnvironment env ) {
591+ if (DELAY_SHARD_ASSIGNMENT ) {
592+ logger .info (
593+ "{}: Sleeping for {} seconds before deleting data for shard: {}" ,
594+ Thread .currentThread ().getName (),
595+ SHARD_DELETE_DELAY_SECONDS ,
596+ shardId
597+ );
598+ // Add slow operation to simulate delay
599+ CountDownLatch latch = new CountDownLatch (1 );
600+ ScheduledExecutorService executor = Executors .newSingleThreadScheduledExecutor ();
601+ executor .schedule (() -> {
602+ logger .info (
603+ "{}: Done sleeping for {} sec before deleting data for shard: {}" ,
604+ Thread .currentThread ().getName (),
605+ SHARD_DELETE_DELAY_SECONDS ,
606+ shardId
607+ );
608+ latch .countDown ();
609+ }, SHARD_DELETE_DELAY_SECONDS , TimeUnit .SECONDS );
610+ try {
611+ latch .await ();
612+ } catch (InterruptedException e ) {
613+ logger .info ("Interrupted while waiting for shard deletion delay" );
614+ Thread .currentThread ().interrupt ();
615+ }
616+ executor .shutdown ();
617+ }
618+ }
619+
620+ @ Override
621+ public void beforeIndexPathDeleted (Index index , IndexSettings indexSettings , NodeEnvironment env ) {}
622+ }
363623}
0 commit comments