Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,33 @@ private static IndexMetadata indexMetadata(final Client client, final String ind
return clusterStateResponse.getState().metadata().index(index);
}

private void assertNoInFlightRecoveryOrRetentionLeaseSync() throws Exception {
assertBusy(() -> {
final var tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
final var relevant = tasksResponse.getTasks()
.stream()
.filter(
t -> "retention_lease_sync".equals(t.getAction())
|| "internal:index/shard/recovery/start_recovery".equals(t.getAction())
|| t.getAction().startsWith("indices:admin/seq_no/retention_lease_sync")
)
.toList();

assertTrue("Expected no in-flight recovery/retention-lease tasks but found: " + relevant, relevant.isEmpty());
});
}

public void testCreateSplitIndex() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final String longSyncInterval = "1h";
Version version = VersionUtils.randomIndexCompatibleVersion(random());
prepareCreate("source").setSettings(
Settings.builder().put(indexSettings()).put("number_of_shards", 1).put("index.version.created", version)
Settings.builder()
.put(indexSettings())
.put("number_of_shards", 1)
.put("index.version.created", version)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
).get();
final int docs = randomIntBetween(0, 128);
for (int i = 0; i < docs; i++) {
Expand All @@ -409,7 +431,16 @@ public void testCreateSplitIndex() throws Exception {
// to the require._name below.
ensureGreen();
// relocate all shards to one node such that we can merge it.
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
client().admin()
.indices()
.prepareUpdateSettings("source")
.setSettings(
Settings.builder()
.put("index.blocks.write", true)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
)
.get();
ensureGreen();

final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get();
Expand All @@ -421,8 +452,8 @@ public void testCreateSplitIndex() throws Exception {
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
boolean success = false;
try {

final boolean createWithReplicas = randomBoolean();
assertAcked(
client().admin()
Expand All @@ -434,6 +465,8 @@ public void testCreateSplitIndex() throws Exception {
.put("index.number_of_replicas", createWithReplicas ? 1 : 0)
.put("index.number_of_shards", 2)
.putNull("index.blocks.write")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.build()
)
.get()
Expand Down Expand Up @@ -496,6 +529,7 @@ public void testCreateSplitIndex() throws Exception {
assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null));
success = true;
} finally {
// clean up
client().admin()
Expand All @@ -505,85 +539,130 @@ public void testCreateSplitIndex() throws Exception {
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
)
.get();
if (success) {
if (client().admin().indices().prepareExists("source").get().isExists()) {
ensureGreen("source");
}
if (client().admin().indices().prepareExists("target").get().isExists()) {
ensureGreen("target");
}
assertNoInFlightRecoveryOrRetentionLeaseSync();
}
}

}

public void testCreateSplitWithIndexSort() throws Exception {
SortField expectedSortField = new SortedSetSortField("id", true, SortedSetSelector.Type.MAX);
expectedSortField.setMissingValue(SortedSetSortField.STRING_FIRST);
Sort expectedIndexSort = new Sort(expectedSortField);
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(
Settings.builder()
.put(indexSettings())
.put("sort.field", "id")
.put("sort.order", "desc")
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
).setMapping("id", "type=keyword,doc_values=true").get();
for (int i = 0; i < 20; i++) {
client().prepareIndex("source")
.setId(Integer.toString(i))
.setSource("{\"foo\" : \"bar\", \"id\" : " + i + "}", MediaTypeRegistry.JSON)
.get();
}
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();

flushAndRefresh();
assertSortedSegments("source", expectedIndexSort);
final String longSyncInterval = "1h";
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
.get();
boolean success = false;
try {
prepareCreate("source").setSettings(
Settings.builder()
.put(indexSettings())
.put("sort.field", "id")
.put("sort.order", "desc")
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
.put(org.opensearch.index.IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.put(org.opensearch.index.IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
).setMapping("id", "type=keyword,doc_values=true").get();
for (int i = 0; i < 20; i++) {
client().prepareIndex("source")
.setId(Integer.toString(i))
.setSource("{\"foo\" : \"bar\", \"id\" : " + i + "}", MediaTypeRegistry.JSON)
.get();
}

client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureYellow();
ensureGreen("source");
flushAndRefresh();
assertSortedSegments("source", expectedIndexSort);

// check that index sort cannot be set on the target index
IllegalArgumentException exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.SPLIT)
.prepareUpdateSettings("source")
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 4)
.put("index.sort.field", "foo")
.build()
.put("index.blocks.write", true)
.put(org.opensearch.index.IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.put(org.opensearch.index.IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
)
.get()
);
assertThat(exc.getMessage(), containsString("can't override index sort when resizing an index"));
.get();
ensureGreen("source");

// check that the index sort order of `source` is correctly applied to the `target`
assertAcked(
// check that index sort cannot be set on the target index
IllegalArgumentException exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.SPLIT)
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 4)
.put("index.sort.field", "foo")
.build()
)
.get()
);
assertThat(exc.getMessage(), containsString("can't override index sort when resizing an index"));

// check that the index sort order of `source` is correctly applied to the `target`
assertAcked(
client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.SPLIT)
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 4)
.putNull("index.blocks.write")
.put(org.opensearch.index.IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.put(org.opensearch.index.IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), longSyncInterval)
.build()
)
.get()
);
ensureGreen("target");
flushAndRefresh();
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("target").execute().actionGet();
assertEquals(settingsResponse.getSetting("target", "index.sort.field"), "id");
assertEquals(settingsResponse.getSetting("target", "index.sort.order"), "desc");
assertSortedSegments("target", expectedIndexSort);

// ... and that the index sort is also applied to updates
for (int i = 20; i < 40; i++) {
client().prepareIndex("target").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
flushAndRefresh();
assertSortedSegments("target", expectedIndexSort);
success = true;
} finally {
if (success) {
if (client().admin().indices().prepareExists("source").get().isExists()) {
ensureGreen("source");
}
if (client().admin().indices().prepareExists("target").get().isExists()) {
ensureGreen("target");
}
assertNoInFlightRecoveryOrRetentionLeaseSync();
}
client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setResizeType(ResizeType.SPLIT)
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 4)
.putNull("index.blocks.write")
.build()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
)
.get()
);
ensureGreen();
flushAndRefresh();
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("target").execute().actionGet();
assertEquals(settingsResponse.getSetting("target", "index.sort.field"), "id");
assertEquals(settingsResponse.getSetting("target", "index.sort.order"), "desc");
assertSortedSegments("target", expectedIndexSort);

// ... and that the index sort is also applied to updates
for (int i = 20; i < 40; i++) {
client().prepareIndex("target").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
.get();
}
flushAndRefresh();
assertSortedSegments("target", expectedIndexSort);
}
}
Loading