diff --git a/CHANGELOG.md b/CHANGELOG.md index 0910086e..d4e03e13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on ho ### Features ### Enhancements ### Bug Fixes +* Reset datasource metadata when failed to update it in postIndex and postDelete to force refresh it from the primary index shard. ([#761](https://github.com/opensearch-project/geospatial/pull/761)) ### Infrastructure ### Documentation ### Maintenance diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java index 9f229377..fef328dc 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java @@ -63,7 +63,13 @@ public String getIndexName(final String datasourceName) { } public boolean isExpired(final String datasourceName) { - return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now()); + final Instant expirationDate = getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate(); + final Instant now = Instant.now(); + final boolean isExpired = expirationDate.isBefore(now); + if (isExpired) { + log.warn("Datasource {} is expired. Expiration date is {} and now is {}.", datasourceName, expirationDate, now); + } + return isExpired; } public boolean has(final String datasourceName) { @@ -83,23 +89,27 @@ public Map getGeoData(final String indexName, final String ip) { } private Map getMetadata() { - if (metadata != null) { - return metadata; + // Use a local variable to hold the reference of the metadata in case another thread set the metadata as null, + // and we unexpectedly return the null. Using this local variable we ensure we return a non-null value. + Map currentMetadata = metadata; + if (currentMetadata != null) { + return currentMetadata; } synchronized (this) { - if (metadata != null) { - return metadata; + currentMetadata = metadata; + if (currentMetadata != null) { + return currentMetadata; } - Map tempData = new ConcurrentHashMap<>(); + currentMetadata = new ConcurrentHashMap<>(); try { - datasourceDao.getAllDatasources() - .stream() - .forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource))); + for (Datasource datasource : datasourceDao.getAllDatasources()) { + currentMetadata.put(datasource.getName(), new DatasourceMetadata(datasource)); + } } catch (IndexNotFoundException e) { log.debug("Datasource has never been created"); } - metadata = tempData; - return metadata; + metadata = currentMetadata; + return currentMetadata; } } @@ -112,9 +122,26 @@ private void remove(final String datasourceName) { getMetadata().remove(datasourceName); } + private void clearMetadata() { + log.info("Resetting all datasource metadata to force a refresh from the primary index shard."); + metadata = null; + } + + @Override + public void postIndex(ShardId shardId, Engine.Index index, Exception ex) { + log.error("Skipped updating datasource metadata for datasource {} due to an indexing exception.", index.id(), ex); + clearMetadata(); + } + @Override public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) { if (Engine.Result.Type.FAILURE.equals(result.getResultType())) { + log.error( + "Skipped updating datasource metadata for datasource {} because the indexing result was a failure.", + index.id(), + result.getFailure() + ); + clearMetadata(); return; } @@ -124,14 +151,28 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re parser.nextToken(); Datasource datasource = Datasource.PARSER.parse(parser, null); put(datasource); + log.info("Updated datasource metadata for datasource {} successfully.", index.id()); } catch (IOException e) { log.error("IOException occurred updating datasource metadata for datasource {} ", index.id(), e); + clearMetadata(); } } + @Override + public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) { + log.error("Skipped updating datasource metadata for datasource {} due to an exception.", delete.id(), ex); + clearMetadata(); + } + @Override public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) { if (result.getResultType().equals(Engine.Result.Type.FAILURE)) { + log.error( + "Skipped updating datasource metadata for datasource {} because the delete result was a failure.", + delete.id(), + result.getFailure() + ); + clearMetadata(); return; } remove(delete.id()); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 03f90e88..44c04e1b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -178,8 +178,16 @@ private void validateDatasourceIsInAvailableState(final String datasourceName) { throw new IllegalStateException("datasource does not exist"); } - if (DatasourceState.AVAILABLE.equals(ip2GeoCachedDao.getState(datasourceName)) == false) { - throw new IllegalStateException("datasource is not in an available state"); + final DatasourceState currentState = ip2GeoCachedDao.getState(datasourceName); + if (DatasourceState.AVAILABLE.equals(currentState) == false) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "datasource %s is not in an available state, current state is %s.", + datasourceName, + currentState.name() + ) + ); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java index d130d593..bcf3cde9 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java @@ -132,10 +132,17 @@ public void testGetGeoData_whenCalled_thenReturnGeoData() { } @SneakyThrows - public void testPostIndex_whenFailed_thenNoUpdate() { - when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList()); + public void testPostIndex_whenFailed_thenResetMetadataToForcePullDataFromIndex() { Datasource datasource = randomDatasource(); + // At the beginning we don't have the new datasource in the system index and the cache metadata + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList()); + // Verify we don't have the new datasource + assertFalse(ip2GeoCachedDao.has(datasource.getName())); + + // Mock the new datasource is added to the system index + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource)); + ShardId shardId = mock(ShardId.class); Engine.Index index = mock(Engine.Index.class); BytesReference bytesReference = BytesReference.bytes(datasource.toXContent(XContentFactory.jsonBuilder(), null)); @@ -147,10 +154,35 @@ public void testPostIndex_whenFailed_thenNoUpdate() { ip2GeoCachedDao.postIndex(shardId, index, result); // Verify + assertTrue(ip2GeoCachedDao.has(datasource.getName())); + assertEquals(datasource.currentIndexName(), ip2GeoCachedDao.getIndexName(datasource.getName())); + assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName())); + } + + @SneakyThrows + public void testPostIndex_whenException_thenResetMetadataToForcePullDataFromIndex() { + Datasource datasource = randomDatasource(); + + // At the beginning we don't have the new datasource in the system index and the cache metadata + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList()); + // Verify we don't have the new datasource assertFalse(ip2GeoCachedDao.has(datasource.getName())); - assertTrue(ip2GeoCachedDao.isExpired(datasource.getName())); - assertNull(ip2GeoCachedDao.getIndexName(datasource.getName())); - assertNull(ip2GeoCachedDao.getState(datasource.getName())); + + // Mock the new datasource is added to the system index + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource)); + + ShardId shardId = mock(ShardId.class); + Engine.Index index = mock(Engine.Index.class); + BytesReference bytesReference = BytesReference.bytes(datasource.toXContent(XContentFactory.jsonBuilder(), null)); + when(index.source()).thenReturn(bytesReference); + + // Run + ip2GeoCachedDao.postIndex(shardId, index, new Exception()); + + // Verify + assertTrue(ip2GeoCachedDao.has(datasource.getName())); + assertEquals(datasource.currentIndexName(), ip2GeoCachedDao.getIndexName(datasource.getName())); + assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName())); } @SneakyThrows @@ -175,8 +207,38 @@ public void testPostIndex_whenSucceed_thenUpdate() { assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName())); } - public void testPostDelete_whenFailed_thenNoUpdate() { + public void testPostDelete_whenFailed_thenResetMetadataToForcePullDataFromIndex() { + Datasource datasource = randomDatasource(); + + // At the beginning we don't have the new datasource in the system index and the cache metadata + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList()); + // Verify we don't have the new datasource + assertFalse(ip2GeoCachedDao.has(datasource.getName())); + + // Mock the new datasource is added to the system index + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource)); + + ShardId shardId = mock(ShardId.class); + Engine.Delete index = mock(Engine.Delete.class); + Engine.DeleteResult result = mock(Engine.DeleteResult.class); + when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE); + + // Run + ip2GeoCachedDao.postDelete(shardId, index, result); + + // Verify + assertTrue(ip2GeoCachedDao.has(datasource.getName())); + } + + public void testPostDelete_whenException_thenResetMetadataToForcePullDataFromIndex() { Datasource datasource = randomDatasource(); + + // At the beginning we don't have the new datasource in the system index and the cache metadata + when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList()); + // Verify we don't have the new datasource + assertFalse(ip2GeoCachedDao.has(datasource.getName())); + + // Mock the new datasource is added to the system index when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource)); ShardId shardId = mock(ShardId.class);