Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,6 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
// Remove any null or blank aliases
aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank());

// Add aliases to new index FIRST before deleting old indices
// This ensures the alias always points to a valid index during the swap
if (!aliasesToAttach.isEmpty()) {
searchClient.addAliases(stagedIndex, aliasesToAttach);
}
LOG.info(
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
stagedIndex,
entityType,
aliasesToAttach);

// Now it's safe to delete old indices
Set<String> allEntityIndices = searchClient.listIndicesByPrefix(canonicalIndex);
for (String oldIndex : allEntityIndices) {
if (oldIndex.equals(stagedIndex)) {
Expand Down Expand Up @@ -118,15 +106,20 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
}
}

// Only delete activeIndex if it's not the same as canonicalIndex which is now an alias as
// added it above
if (activeIndex != null
&& !activeIndex.equals(canonicalIndex)
&& searchClient.indexExists(activeIndex)) {
if (activeIndex != null && searchClient.indexExists(activeIndex)) {
searchClient.deleteIndex(activeIndex);
LOG.info(
"Deleted previously active index '{}' for entity '{}'.", activeIndex, entityType);
}

if (!aliasesToAttach.isEmpty()) {
searchClient.addAliases(stagedIndex, aliasesToAttach);
}
LOG.info(
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
stagedIndex,
entityType,
aliasesToAttach);
} catch (Exception ex) {
LOG.error(
"Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,4 @@ public interface IndexManagementClient {
* @return set of indices that start with the prefix
*/
Set<String> listIndicesByPrefix(String prefix);

/**
* Wait for an index to become ready (shards allocated and available for queries).
* This method polls the index health until it reaches at least YELLOW status
* or the timeout is reached.
*
* @param indexName the name of the index to wait for
* @param timeoutSeconds the maximum time to wait in seconds
* @return true if the index became ready, false if timeout was reached
*/
boolean waitForIndexReady(String indexName, int timeoutSeconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,6 @@ public void createIndexes() {
Set<String> parentAliases =
new HashSet<>(listOrEmpty(context.getParentAliases(entityType)));

if (stagedIndex != null && !stagedIndex.isEmpty()) {
boolean indexReady = searchClient.waitForIndexReady(stagedIndex, 30);
if (!indexReady) {
LOG.warn(
"Staged index '{}' did not become ready within timeout. Proceeding anyway.",
stagedIndex);
}
}

EntityReindexContext entityReindexContext =
EntityReindexContext.builder()
.entityType(entityType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,6 @@ public Set<String> listIndicesByPrefix(String prefix) {
return indexManager.listIndicesByPrefix(prefix);
}

@Override
public boolean waitForIndexReady(String indexName, int timeoutSeconds) {
return indexManager.waitForIndexReady(indexName, timeoutSeconds);
}

@Override
public void updateIndex(IndexMapping indexMapping, String indexMappingContent) {
indexManager.updateIndex(indexMapping, indexMappingContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import es.co.elastic.clients.elasticsearch.ElasticsearchClient;
import es.co.elastic.clients.elasticsearch._types.ElasticsearchException;
import es.co.elastic.clients.elasticsearch.cluster.HealthRequest;
import es.co.elastic.clients.elasticsearch.cluster.HealthResponse;
import es.co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import es.co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
import es.co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
Expand Down Expand Up @@ -338,45 +336,4 @@ public Set<String> listIndicesByPrefix(String prefix) {
}
return indices;
}

@Override
public boolean waitForIndexReady(String indexName, int timeoutSeconds) {
if (!isClientAvailable) {
LOG.error("ElasticSearch client is not available. Cannot wait for index.");
return false;
}

LOG.info("Waiting for index '{}' to become ready (timeout: {}s)", indexName, timeoutSeconds);

try {
HealthRequest healthRequest =
HealthRequest.of(
h ->
h.index(indexName)
.waitForStatus(es.co.elastic.clients.elasticsearch._types.HealthStatus.Yellow)
.timeout(t -> t.time(timeoutSeconds + "s")));

HealthResponse healthResponse = client.cluster().health(healthRequest);

boolean isReady =
healthResponse.status() == es.co.elastic.clients.elasticsearch._types.HealthStatus.Green
|| healthResponse.status()
== es.co.elastic.clients.elasticsearch._types.HealthStatus.Yellow;

if (isReady) {
LOG.info("Index '{}' is ready with status: {}", indexName, healthResponse.status());
} else {
LOG.warn(
"Index '{}' not ready after {}s, status: {}",
indexName,
timeoutSeconds,
healthResponse.status());
}

return isReady;
} catch (Exception e) {
LOG.error("Failed to wait for index '{}' readiness: {}", indexName, e.getMessage(), e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ public Set<String> listIndicesByPrefix(String prefix) {
return indexManager.listIndicesByPrefix(prefix);
}

@Override
public boolean waitForIndexReady(String indexName, int timeoutSeconds) {
return indexManager.waitForIndexReady(indexName, timeoutSeconds);
}

@Override
public void updateIndex(IndexMapping indexMapping, String indexMappingContent) {
indexManager.updateIndex(indexMapping, indexMappingContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.search.IndexManagementClient;
import os.org.opensearch.client.opensearch.OpenSearchClient;
import os.org.opensearch.client.opensearch._types.HealthStatus;
import os.org.opensearch.client.opensearch._types.OpenSearchException;
import os.org.opensearch.client.opensearch._types.mapping.TypeMapping;
import os.org.opensearch.client.opensearch.cluster.HealthRequest;
import os.org.opensearch.client.opensearch.cluster.HealthResponse;
import os.org.opensearch.client.opensearch.indices.CreateIndexRequest;
import os.org.opensearch.client.opensearch.indices.CreateIndexResponse;
import os.org.opensearch.client.opensearch.indices.DeleteIndexRequest;
Expand Down Expand Up @@ -412,44 +409,4 @@ public Set<String> listIndicesByPrefix(String prefix) {
}
return indices;
}

@Override
public boolean waitForIndexReady(String indexName, int timeoutSeconds) {
if (!isClientAvailable) {
LOG.error("OpenSearch client is not available. Cannot wait for index.");
return false;
}

LOG.info("Waiting for index '{}' to become ready (timeout: {}s)", indexName, timeoutSeconds);

try {
HealthRequest healthRequest =
HealthRequest.of(
h ->
h.index(indexName)
.waitForStatus(HealthStatus.Yellow)
.timeout(t -> t.time(timeoutSeconds + "s")));

HealthResponse healthResponse = client.cluster().health(healthRequest);

boolean isReady =
healthResponse.status() == HealthStatus.Green
|| healthResponse.status() == HealthStatus.Yellow;

if (isReady) {
LOG.info("Index '{}' is ready with status: {}", indexName, healthResponse.status());
} else {
LOG.warn(
"Index '{}' not ready after {}s, status: {}",
indexName,
timeoutSeconds,
healthResponse.status());
}

return isReady;
} catch (Exception e) {
LOG.error("Failed to wait for index '{}' readiness: {}", indexName, e.getMessage(), e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.http.client.HttpResponseException;
Expand Down Expand Up @@ -819,8 +817,7 @@ void delete_logicalSuiteWithPipeline(TestInfo test) throws IOException {
}

@Test
void get_listTestSuiteFromSearchWithPagination(TestInfo testInfo)
throws IOException, InterruptedException {
void get_listTestSuiteFromSearchWithPagination(TestInfo testInfo) throws IOException {
if (supportsSearchIndex) {
Random rand = new Random();
int tablesNum = rand.nextInt(3) + 3;
Expand All @@ -829,7 +826,7 @@ void get_listTestSuiteFromSearchWithPagination(TestInfo testInfo)
TestSuiteResourceTest testSuiteResourceTest = new TestSuiteResourceTest();

List<Table> tables = new ArrayList<>();
List<UUID> createdTestSuiteIds = new ArrayList<>();
Map<String, TestSuite> testSuites = new HashMap<>();

for (int i = 0; i < tablesNum; i++) {
CreateTable tableReq =
Expand All @@ -850,66 +847,9 @@ void get_listTestSuiteFromSearchWithPagination(TestInfo testInfo)
testSuiteResourceTest.createRequest(table.getFullyQualifiedName());
TestSuite testSuite =
testSuiteResourceTest.createBasicTestSuite(createTestSuite, ADMIN_AUTH_HEADERS);
createdTestSuiteIds.add(testSuite.getId());
}

Thread.sleep(3000);

Map<String, String> queryParams = new HashMap<>();
queryParams.put("sortField", "id");
queryParams.put("sortOrder", "asc");

ResultList<TestSuite> allTestSuites =
listEntitiesFromSearch(queryParams, 1000, 0, ADMIN_AUTH_HEADERS);

List<TestSuite> createdTestSuites =
allTestSuites.getData().stream()
.filter(ts -> createdTestSuiteIds.contains(ts.getId()))
.sorted((a, b) -> a.getId().compareTo(b.getId()))
.collect(Collectors.toList());

assertEquals(
tablesNum,
createdTestSuites.size(),
"Expected to find all created test suites in search results");

List<UUID> sortedCreatedIds =
createdTestSuites.stream().map(TestSuite::getId).collect(Collectors.toList());
for (int i = 0; i < createdTestSuites.size() - 1; i++) {
assertTrue(
createdTestSuites.get(i).getId().compareTo(createdTestSuites.get(i + 1).getId()) < 0,
"Test suites should be sorted by ID in ascending order");
}

Set<UUID> foundTestSuiteIds = new HashSet<>();
for (int limit = 1; limit <= Math.min(5, tablesNum); limit++) {
foundTestSuiteIds.clear();
int offset = 0;
int pageCount = 0;
int maxPages = 10000; // Safety limit to prevent infinite loops

while (pageCount < maxPages) {
ResultList<TestSuite> page =
listEntitiesFromSearch(queryParams, limit, offset, ADMIN_AUTH_HEADERS);

if (page.getData().isEmpty()) {
break;
}

page.getData().stream()
.filter(ts -> createdTestSuiteIds.contains(ts.getId()))
.forEach(ts -> foundTestSuiteIds.add(ts.getId()));

offset += limit;
pageCount++;
}

assertEquals(
new HashSet<>(createdTestSuiteIds),
foundTestSuiteIds,
String.format(
"All created test suites should be found when paginating with limit %d", limit));
testSuites.put(table.getFullyQualifiedName(), testSuite);
}
validateEntityListFromSearchWithPagination(new HashMap<>(), testSuites.size());
}
}

Expand Down
Loading