3737import org .elasticsearch .threadpool .ThreadPool ;
3838
3939import java .io .Closeable ;
40+ import java .util .ArrayList ;
4041import java .util .Collection ;
4142import java .util .Collections ;
4243import java .util .HashMap ;
4748import java .util .stream .Collectors ;
4849import java .util .stream .Stream ;
4950
51+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
5052import static org .hamcrest .Matchers .anyOf ;
5153import static org .hamcrest .Matchers .contains ;
5254import static org .hamcrest .Matchers .empty ;
5355import static org .hamcrest .Matchers .equalTo ;
5456
5557@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
56- public class RetentionLeaseSyncIT extends ESIntegTestCase {
58+ public class RetentionLeaseIT extends ESIntegTestCase {
5759
5860 public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
5961
@@ -68,7 +70,7 @@ public List<Setting<?>> getSettings() {
6870 protected Collection <Class <? extends Plugin >> nodePlugins () {
6971 return Stream .concat (
7072 super .nodePlugins ().stream (),
71- Stream .of (RetentionLeaseBackgroundSyncIT . RetentionLeaseSyncIntervalSettingPlugin .class ))
73+ Stream .of (RetentionLeaseSyncIntervalSettingPlugin .class ))
7274 .collect (Collectors .toList ());
7375 }
7476
@@ -205,9 +207,62 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception {
205207 }
206208 }
207209
208- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/38487" )
210+ public void testBackgroundRetentionLeaseSync () throws Exception {
211+ final int numberOfReplicas = 2 - scaledRandomIntBetween (0 , 2 );
212+ internalCluster ().ensureAtLeastNumDataNodes (1 + numberOfReplicas );
213+ final Settings settings = Settings .builder ()
214+ .put ("index.number_of_shards" , 1 )
215+ .put ("index.number_of_replicas" , numberOfReplicas )
216+ .put (IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (), "1s" )
217+ .build ();
218+ createIndex ("index" , settings );
219+ ensureGreen ("index" );
220+ final String primaryShardNodeId = clusterService ().state ().routingTable ().index ("index" ).shard (0 ).primaryShard ().currentNodeId ();
221+ final String primaryShardNodeName = clusterService ().state ().nodes ().get (primaryShardNodeId ).getName ();
222+ final IndexShard primary = internalCluster ()
223+ .getInstance (IndicesService .class , primaryShardNodeName )
224+ .getShardOrNull (new ShardId (resolveIndex ("index" ), 0 ));
225+ // we will add multiple retention leases and expect to see them synced to all replicas
226+ final int length = randomIntBetween (1 , 8 );
227+ final Map <String , RetentionLease > currentRetentionLeases = new HashMap <>(length );
228+ final List <String > ids = new ArrayList <>(length );
229+ for (int i = 0 ; i < length ; i ++) {
230+ final String id = randomValueOtherThanMany (currentRetentionLeases .keySet ()::contains , () -> randomAlphaOfLength (8 ));
231+ ids .add (id );
232+ final long retainingSequenceNumber = randomLongBetween (0 , Long .MAX_VALUE );
233+ final String source = randomAlphaOfLength (8 );
234+ final CountDownLatch latch = new CountDownLatch (1 );
235+ // put a new lease
236+ currentRetentionLeases .put (
237+ id ,
238+ primary .addRetentionLease (id , retainingSequenceNumber , source , ActionListener .wrap (latch ::countDown )));
239+ latch .await ();
240+ // now renew all existing leases; we expect to see these synced to the replicas
241+ for (int j = 0 ; j <= i ; j ++) {
242+ currentRetentionLeases .put (
243+ ids .get (j ),
244+ primary .renewRetentionLease (
245+ ids .get (j ),
246+ randomLongBetween (currentRetentionLeases .get (ids .get (j )).retainingSequenceNumber (), Long .MAX_VALUE ),
247+ source ));
248+ }
249+ assertBusy (() -> {
250+ // check all retention leases have been synced to all replicas
251+ for (final ShardRouting replicaShard : clusterService ().state ().routingTable ().index ("index" ).shard (0 ).replicaShards ()) {
252+ final String replicaShardNodeId = replicaShard .currentNodeId ();
253+ final String replicaShardNodeName = clusterService ().state ().nodes ().get (replicaShardNodeId ).getName ();
254+ final IndexShard replica = internalCluster ()
255+ .getInstance (IndicesService .class , replicaShardNodeName )
256+ .getShardOrNull (new ShardId (resolveIndex ("index" ), 0 ));
257+ assertThat (replica .getRetentionLeases (), equalTo (primary .getRetentionLeases ()));
258+ }
259+ });
260+ }
261+ }
262+
209263 public void testRetentionLeasesSyncOnRecovery () throws Exception {
210- final int numberOfReplicas = 1 ;
264+ final int numberOfReplicas = 2 - scaledRandomIntBetween (0 , 2 );
265+ internalCluster ().ensureAtLeastNumDataNodes (1 + numberOfReplicas );
211266 /*
212267 * We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only
213268 * source of retention leases on the replicas would be from the commit point and recovery.
@@ -217,10 +272,9 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
217272 .put ("index.number_of_replicas" , 0 )
218273 .put (IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (), TimeValue .timeValueHours (24 ))
219274 .build ();
220- createIndex ("index" , settings );
275+ // when we increase the number of replicas below we want to exclude the replicas from being allocated so that they do not recover
276+ assertAcked (prepareCreate ("index" , 1 ).setSettings (settings ));
221277 ensureYellow ("index" );
222- // exclude the replicas from being allocated
223- allowNodes ("index" , 1 );
224278 final AcknowledgedResponse response = client ().admin ()
225279 .indices ()
226280 .prepareUpdateSettings ("index" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , numberOfReplicas ).build ())
@@ -261,11 +315,6 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
261315 .getShardOrNull (new ShardId (resolveIndex ("index" ), 0 ));
262316 final Map <String , RetentionLease > retentionLeasesOnReplica = RetentionLeases .toMap (replica .getRetentionLeases ());
263317 assertThat (retentionLeasesOnReplica , equalTo (currentRetentionLeases ));
264-
265- // check retention leases have been committed on the replica
266- final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases .decodeRetentionLeases (
267- replica .acquireLastIndexCommit (false ).getIndexCommit ().getUserData ().get (Engine .RETENTION_LEASES ));
268- assertThat (currentRetentionLeases , equalTo (RetentionLeases .toMap (replicaCommittedRetentionLeases )));
269318 }
270319 }
271320
0 commit comments