Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho
### Features
### Enhancements
### Bug Fixes
* Reset datasource metadata when failed to update it in postIndex and postDelete to force refresh it from the primary index shard. ([#761](https://github.com/opensearch-project/geospatial/pull/761))
### Infrastructure
### Documentation
### Maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ public String getIndexName(final String datasourceName) {
}

public boolean isExpired(final String datasourceName) {
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
final Instant expirationDate = getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate();
final Instant now = Instant.now();
final boolean isExpired = expirationDate.isBefore(now);
if (isExpired) {
log.warn("Datasource {} is expired. Expiration date is {} and now is {}.", datasourceName, expirationDate, now);
}
return isExpired;
}

public boolean has(final String datasourceName) {
Expand All @@ -83,23 +89,27 @@ public Map<String, Object> getGeoData(final String indexName, final String ip) {
}

private Map<String, DatasourceMetadata> getMetadata() {
if (metadata != null) {
return metadata;
// Use a local variable to hold the reference of the metadata in case another thread set the metadata as null,
// and we unexpectedly return the null. Using this local variable we ensure we return a non-null value.
Map<String, DatasourceMetadata> currentMetadata = metadata;
if (currentMetadata != null) {
return currentMetadata;
}
synchronized (this) {
if (metadata != null) {
return metadata;
currentMetadata = metadata;
if (currentMetadata != null) {
return currentMetadata;
}
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
currentMetadata = new ConcurrentHashMap<>();
try {
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
for (Datasource datasource : datasourceDao.getAllDatasources()) {
currentMetadata.put(datasource.getName(), new DatasourceMetadata(datasource));
}
} catch (IndexNotFoundException e) {
log.debug("Datasource has never been created");
}
metadata = tempData;
return metadata;
metadata = currentMetadata;
return currentMetadata;
}
}

Expand All @@ -112,9 +122,26 @@ private void remove(final String datasourceName) {
getMetadata().remove(datasourceName);
}

private void clearMetadata() {
log.info("Resetting all datasource metadata to force a refresh from the primary index shard.");
metadata = null;
}

@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
log.error("Skipped updating datasource metadata for datasource {} due to an indexing exception.", index.id(), ex);
clearMetadata();
}

@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (Engine.Result.Type.FAILURE.equals(result.getResultType())) {
log.error(
"Skipped updating datasource metadata for datasource {} because the indexing result was a failure.",
index.id(),
result.getFailure()
);
clearMetadata();
return;
}

Expand All @@ -124,14 +151,28 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
parser.nextToken();
Datasource datasource = Datasource.PARSER.parse(parser, null);
put(datasource);
log.info("Updated datasource metadata for datasource {} successfully.", index.id());
} catch (IOException e) {
log.error("IOException occurred updating datasource metadata for datasource {} ", index.id(), e);
clearMetadata();
}
}

@Override
public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
log.error("Skipped updating datasource metadata for datasource {} due to an exception.", delete.id(), ex);
clearMetadata();
}

@Override
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
if (result.getResultType().equals(Engine.Result.Type.FAILURE)) {
log.error(
"Skipped updating datasource metadata for datasource {} because the delete result was a failure.",
delete.id(),
result.getFailure()
);
clearMetadata();
return;
}
remove(delete.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,16 @@ private void validateDatasourceIsInAvailableState(final String datasourceName) {
throw new IllegalStateException("datasource does not exist");
}

if (DatasourceState.AVAILABLE.equals(ip2GeoCachedDao.getState(datasourceName)) == false) {
throw new IllegalStateException("datasource is not in an available state");
final DatasourceState currentState = ip2GeoCachedDao.getState(datasourceName);
if (DatasourceState.AVAILABLE.equals(currentState) == false) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"datasource %s is not in an available state, current state is %s.",
datasourceName,
currentState.name()
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,17 @@ public void testGetGeoData_whenCalled_thenReturnGeoData() {
}

@SneakyThrows
public void testPostIndex_whenFailed_thenNoUpdate() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList());
public void testPostIndex_whenFailed_thenResetMetadataToForcePullDataFromIndex() {
Datasource datasource = randomDatasource();

// At the beginning we don't have the new datasource in the system index and the cache metadata
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList());
// Verify we don't have the new datasource
assertFalse(ip2GeoCachedDao.has(datasource.getName()));

// Mock the new datasource is added to the system index
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

ShardId shardId = mock(ShardId.class);
Engine.Index index = mock(Engine.Index.class);
BytesReference bytesReference = BytesReference.bytes(datasource.toXContent(XContentFactory.jsonBuilder(), null));
Expand All @@ -147,10 +154,35 @@ public void testPostIndex_whenFailed_thenNoUpdate() {
ip2GeoCachedDao.postIndex(shardId, index, result);

// Verify
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
assertEquals(datasource.currentIndexName(), ip2GeoCachedDao.getIndexName(datasource.getName()));
assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName()));
}

@SneakyThrows
public void testPostIndex_whenException_thenResetMetadataToForcePullDataFromIndex() {
Datasource datasource = randomDatasource();

// At the beginning we don't have the new datasource in the system index and the cache metadata
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList());
// Verify we don't have the new datasource
assertFalse(ip2GeoCachedDao.has(datasource.getName()));
assertTrue(ip2GeoCachedDao.isExpired(datasource.getName()));
assertNull(ip2GeoCachedDao.getIndexName(datasource.getName()));
assertNull(ip2GeoCachedDao.getState(datasource.getName()));

// Mock the new datasource is added to the system index
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

ShardId shardId = mock(ShardId.class);
Engine.Index index = mock(Engine.Index.class);
BytesReference bytesReference = BytesReference.bytes(datasource.toXContent(XContentFactory.jsonBuilder(), null));
when(index.source()).thenReturn(bytesReference);

// Run
ip2GeoCachedDao.postIndex(shardId, index, new Exception());

// Verify
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
assertEquals(datasource.currentIndexName(), ip2GeoCachedDao.getIndexName(datasource.getName()));
assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName()));
}

@SneakyThrows
Expand All @@ -175,8 +207,38 @@ public void testPostIndex_whenSucceed_thenUpdate() {
assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName()));
}

public void testPostDelete_whenFailed_thenNoUpdate() {
public void testPostDelete_whenFailed_thenResetMetadataToForcePullDataFromIndex() {
Datasource datasource = randomDatasource();

// At the beginning we don't have the new datasource in the system index and the cache metadata
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList());
// Verify we don't have the new datasource
assertFalse(ip2GeoCachedDao.has(datasource.getName()));

// Mock the new datasource is added to the system index
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

ShardId shardId = mock(ShardId.class);
Engine.Delete index = mock(Engine.Delete.class);
Engine.DeleteResult result = mock(Engine.DeleteResult.class);
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);

// Run
ip2GeoCachedDao.postDelete(shardId, index, result);

// Verify
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
}

public void testPostDelete_whenException_thenResetMetadataToForcePullDataFromIndex() {
Datasource datasource = randomDatasource();

// At the beginning we don't have the new datasource in the system index and the cache metadata
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList());
// Verify we don't have the new datasource
assertFalse(ip2GeoCachedDao.has(datasource.getName()));

// Mock the new datasource is added to the system index
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

ShardId shardId = mock(ShardId.class);
Expand Down
Loading