From f958869c9174d8308a55bb9a5a7802eb8476e776 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Mon, 9 Jun 2025 14:59:18 +0530 Subject: [PATCH 1/3] Initialize FileCache for warm index on node boot-up/restart Signed-off-by: Sandeep Kumawat --- .../WarmIndexSegmentReplicationIT.java | 194 +++++++++++++++--- .../remotestore/WritableWarmIT.java | 10 - .../common/cache/RemovalReason.java | 3 +- .../org/opensearch/env/NodeEnvironment.java | 26 ++- .../opensearch/env/NodeRepurposeCommand.java | 4 +- .../index/store/CompositeDirectory.java | 28 ++- .../filecache/AggregateFileCacheStats.java | 5 + .../store/remote/filecache/FileCache.java | 60 +++--- .../remote/filecache/FileCacheCleaner.java | 45 +++- .../remote/filecache/FileCacheFactory.java | 6 +- .../store/remote/utils/FileTypeUtils.java | 1 + .../store/remote/utils/cache/LRUCache.java | 20 ++ .../remote/utils/cache/SegmentedCache.java | 7 + .../main/java/org/opensearch/node/Node.java | 2 +- .../index/store/CompositeDirectoryTests.java | 50 +++++ .../filecache/FileCacheCleanerTests.java | 180 +++++++++++++++- .../remote/filecache/FileCacheTests.java | 122 +++++++++++ .../opensearch/test/InternalTestCluster.java | 6 + 18 files changed, 675 insertions(+), 94 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java index d48f0eaead04c..2f0b1e5b07f16 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java @@ -20,9 +20,9 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; @@ -46,6 +46,8 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; @@ -54,6 +56,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexModule; @@ -64,8 +67,11 @@ import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats; import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheStats; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.Node; import org.opensearch.search.sort.SortOrder; @@ -75,11 +81,13 @@ import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; import org.junit.After; import org.junit.Before; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -111,7 +119,6 @@ /** * This class runs Segment Replication Integ test suite with partial locality indices (warm indices). */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/18157") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class WarmIndexSegmentReplicationIT extends SegmentReplicationBaseIT { @@ -164,19 +171,11 @@ protected boolean warmIndexSegmentReplicationEnabled() { @After public void teardown() throws Exception { - assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); - for (String nodeName : internalCluster().getNodeNames()) { - FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache(); - if (fileCache != null) { - fileCache.clear(); - } - } clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/17526") public void testRestartPrimary_NoReplicas() throws Exception { - final String primary = internalCluster().startDataAndWarmNodes(1).get(0); + final String primary = internalCluster().startWarmOnlyNodes(1).get(0); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); @@ -188,11 +187,57 @@ public void testRestartPrimary_NoReplicas() throws Exception { } else { refresh(INDEX_NAME); } - FileCache fileCache = internalCluster().getInstance(Node.class, primary).fileCache(); + Long initialSize = internalCluster().getInstance(Node.class, primary).fileCache().size(); internalCluster().restartNode(primary); ensureYellow(INDEX_NAME); + Long currentSize = internalCluster().getInstance(Node.class, primary).fileCache().size(); assertDocCounts(1, primary); - fileCache.prune(); + assertTrue(initialSize != 0); + assertTrue(currentSize != 0); + if (currentSize < initialSize) { + fail("FileCache did not initialise correctly"); + } + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + } + + public void testRestartPrimaryAndReplicaWithDocuments() throws Exception { + final String primary = internalCluster().startWarmOnlyNodes(1).get(0); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + final String replica = internalCluster().startWarmOnlyNodes(1).get(0); + ensureGreen(INDEX_NAME); + + assertEquals(getNodeContainingPrimaryShard().getName(), primary); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + FileCache fileCache = internalCluster().getInstance(Node.class, primary).fileCache(); + waitForSearchableDocs(2, primary, replica); + logger.info("---> restarting primary node"); + internalCluster().restartNode(primary); + + // check replica is promoted to primary. + assertEquals(getNodeContainingPrimaryShard().getName(), replica); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + waitForSearchableDocs(3, primary, replica); + + logger.info("---> restarting replica node"); + internalCluster().restartNode(replica); + assertEquals(getNodeContainingPrimaryShard().getName(), primary); + ensureGreen(INDEX_NAME); + FileCache replicaFileCache = internalCluster().getInstance(Node.class, replica).fileCache(); + assertTrue("FileCache should not be empty", replicaFileCache.size() > 0); + + AggregateFileCacheStats aggregateStats = replicaFileCache.fileCacheStats(); + FileCacheStats blockFileStats = aggregateStats.getBlockFileCacheStats(); + assertTrue(blockFileStats.getTotal() > 0); } public void testPrimaryStopped_ReplicaPromoted() throws Exception { @@ -210,7 +255,6 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - FileCache fileCache1 = internalCluster().getInstance(Node.class, primary).fileCache(); // stop the primary node - we only have one shard on here. internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); ensureYellowAndNoInitializingShards(INDEX_NAME); @@ -234,7 +278,6 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { refresh(INDEX_NAME); waitForSearchableDocs(4, nodeC, replica); verifyStoreContent(); - fileCache1.prune(); } public void testRestartPrimary() throws Exception { @@ -250,7 +293,6 @@ public void testRestartPrimary() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - FileCache fileCache = internalCluster().getInstance(Node.class, primary).fileCache(); waitForSearchableDocs(initialDocCount, replica, primary); internalCluster().restartNode(primary); ensureGreen(INDEX_NAME); @@ -260,7 +302,6 @@ public void testRestartPrimary() throws Exception { flushAndRefresh(INDEX_NAME); waitForSearchableDocs(initialDocCount, replica, primary); verifyStoreContent(); - fileCache.prune(); } public void testCancelPrimaryAllocation() throws Exception { @@ -417,7 +458,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { * the new primary starts indexing from the correct maxSeqNo and replays the correct count of docs * from xlog. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/17527") + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/18157") public void testReplicationPostDeleteAndForceMerge() throws Exception { final String primary = internalCluster().startDataAndWarmNodes(1).get(0); createIndex(INDEX_NAME); @@ -456,7 +497,6 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get(); } // Drop the primary and wait until replica is promoted. - FileCache fileCache1 = internalCluster().getInstance(Node.class, primary).fileCache(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); ensureYellowAndNoInitializingShards(INDEX_NAME); @@ -475,7 +515,6 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception { client().prepareIndex(INDEX_NAME).setId(String.valueOf(expectedMaxSeqNo + 1)).setSource("another", "doc").get(); refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount + 1); - fileCache1.clear(); } public void testScrollWithConcurrentIndexAndSearch() throws Exception { @@ -703,7 +742,6 @@ public void testNodeDropWithOngoingReplication() throws Exception { // Refresh, this should trigger round of segment replication refresh(INDEX_NAME); blockFileCopy.countDown(); - FileCache fileCache = internalCluster().getInstance(Node.class, primaryNode).fileCache(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellow(INDEX_NAME); assertBusy(() -> { assertDocCounts(docCount, replicaNode); }); @@ -719,7 +757,6 @@ public void testNodeDropWithOngoingReplication() throws Exception { .allocationId() .getId(); assertEquals(currentAllocationID, replicaAllocationId); - fileCache.prune(); } public void testCancellation() throws Exception { @@ -890,7 +927,6 @@ public void testDropPrimaryDuringReplication() throws Exception { waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); // don't wait for replication to complete, stop the primary immediately. - FileCache fileCache = internalCluster().getInstance(Node.class, primaryNode).fileCache(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellow(INDEX_NAME); @@ -906,7 +942,6 @@ public void testDropPrimaryDuringReplication() throws Exception { flushAndRefresh(INDEX_NAME); waitForSearchableDocs(initialDocCount + 1, dataNodes); verifyStoreContent(); - fileCache.prune(); } } @@ -967,7 +1002,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/17527") + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/18157") public void testPressureServiceStats() throws Exception { final String primaryNode = internalCluster().startDataAndWarmNodes(1).get(0); createIndex(INDEX_NAME); @@ -1019,7 +1054,6 @@ public void testPressureServiceStats() throws Exception { assertTrue(replicaNode_service.nodeStats().getShardStats().isEmpty()); // drop the primary, this won't hand off pressure stats between old/new primary. - FileCache fileCache = internalCluster().getInstance(Node.class, primaryNode).fileCache(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellowAndNoInitializingShards(INDEX_NAME); @@ -1063,7 +1097,6 @@ public void testPressureServiceStats() throws Exception { final SegmentReplicationShardStats stats = shardStatsSet.stream().findFirst().get(); assertEquals(0, stats.getCheckpointsBehindCount()); }); - fileCache.prune(); } } @@ -1627,8 +1660,6 @@ public void testReplicaAlreadyAtCheckpoint() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get(); refresh(INDEX_NAME); waitForSearchableDocs(1, primaryNode, replicaNode, replicaNode2); - - FileCache fileCache = internalCluster().getInstance(Node.class, primaryNode).fileCache(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellowAndNoInitializingShards(INDEX_NAME); IndexShard replica_1 = getIndexShard(replicaNode, INDEX_NAME); @@ -1658,7 +1689,110 @@ public void testReplicaAlreadyAtCheckpoint() throws Exception { assertEquals(0L, replicationStats.maxReplicationLag); assertEquals(0L, replicationStats.totalBytesBehind); }); - fileCache.prune(); } + public void testShardPathDeletionWhenWarmIndexRelocate() throws Exception { + InternalTestCluster internalTestCluster = internalCluster(); + internalTestCluster.startClusterManagerOnlyNode(); + String primary = internalTestCluster.startWarmOnlyNodes(1).get(0); + + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + + String secondWarmNode = internalTestCluster.startWarmOnlyNodes(1).get(0); + ensureGreen(INDEX_NAME); + + String thirdWarmNode = internalTestCluster.startWarmOnlyNodes(1).get(0); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get(); + + // Ingesting docs again before force merge + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + ensureGreen(); + + final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get( + client().admin().indices().prepareGetSettings(INDEX_NAME).get().getIndexToSettings().get(INDEX_NAME) + ); + final Index index = resolveIndex(INDEX_NAME); + assertWarmIndexDirectoryExistence(primary, index, true, customDataPath); + assertWarmIndexDirectoryExistence(secondWarmNode, index, true, customDataPath); + assertWarmIndexDirectoryExistence(thirdWarmNode, index, false, customDataPath); + + logger.info("Relocating shard from {} to {}", primary, thirdWarmNode); + final Client client = client(); + client.admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, thirdWarmNode)) + .execute() + .actionGet(); + + ClusterHealthResponse clusterHealthResponse = client.admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(new TimeValue(5, TimeUnit.MINUTES)) + .execute() + .actionGet(); + + if (clusterHealthResponse.isTimedOut()) { + logger.warn("Cluster health check timed out after relocation"); + } + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + assertWarmIndexDirectoryExistence(primary, index, false, customDataPath); + assertWarmIndexDirectoryExistence(secondWarmNode, index, true, customDataPath); + assertWarmIndexDirectoryExistence(thirdWarmNode, index, true, customDataPath); + + assertDocCounts(2, secondWarmNode, thirdWarmNode); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertBusy(() -> { + assertWarmIndexDirectoryExistence(secondWarmNode, index, false, customDataPath); + assertWarmIndexDirectoryExistence(thirdWarmNode, index, false, customDataPath); + }); + } + + private void assertWarmIndexDirectoryExistence(String nodeName, Index index, boolean exists, String customDataPath) throws Exception { + logger.debug("Checking warm index directory existence for node: {}, expected exists: {}", nodeName, exists); + + final Node node = internalCluster().getInstance(Node.class, nodeName); + final ShardId shardId = new ShardId(index, 0); + + final ShardPath shardPath = ShardPath.loadShardPath(logger, node.getNodeEnvironment(), shardId, customDataPath); + if (shardPath == null) { + logger.debug("ShardPath is null for node: {}", nodeName); + assertFalse(exists); + return; + } + + final Path shardStatePath = shardPath.getShardStatePath(); + final Path shardDataPath = shardPath.getDataPath(); + + assertBusy(() -> { + if (exists) { + logger.debug("Verifying shard paths exist for node: {}", nodeName); + assertTrue(Files.exists(shardStatePath)); + assertTrue(Files.exists(shardDataPath)); + } else { + logger.debug("Verifying shard paths do not exist for node: {}", nodeName); + assertEquals(false, Files.exists(shardStatePath)); + assertEquals(false, Files.exists(shardDataPath)); + } + }); + + final Path cacheDataPath = node.getNodeEnvironment().fileCacheNodePath().fileCachePath.resolve(index.getUUID()); + final Path indicesCacheDataPath = node.getNodeEnvironment().fileCacheNodePath().indicesPath.resolve(index.getUUID()); + + assertBusy(() -> { + logger.info("Checking cache paths for node: {}", nodeName); + assertEquals("cache data path should not exists", false, Files.exists(cacheDataPath)); + assertTrue("indices cache path should " + (exists ? "exist" : "not exist"), Files.exists(indicesCacheDataPath) == exists); + }, 30, TimeUnit.SECONDS); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java index 6674bbb5afd24..9ae3ae5b7a451 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java @@ -189,20 +189,10 @@ public void testFullFileAndFileCacheStats() throws ExecutionException, Interrupt Settings settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) .build(); assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get()); - // Verify from the cluster settings if the data locality is partial - GetIndexResponse getIndexResponse = client().admin() - .indices() - .getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true)) - .get(); - - Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2); - assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); - // Ingesting docs again before force merge indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK); flushAndRefresh(INDEX_NAME_2); diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalReason.java b/server/src/main/java/org/opensearch/common/cache/RemovalReason.java index 514b84a7823ca..ecc441e7390f1 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalReason.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalReason.java @@ -21,5 +21,6 @@ public enum RemovalReason { INVALIDATED, EVICTED, EXPLICIT, - CAPACITY + CAPACITY, + RESTARTED // This is used by testing framework to close the CachedIndexInput during node restart. } diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 9560bfc547480..9d7596cf4ea69 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -389,7 +389,7 @@ public NodeEnvironment(Settings settings, Environment environment, IndexStoreLis } if (DiscoveryNode.isWarmNode(settings) == false) { - ensureNoFileCacheData(fileCacheNodePath); + ensureNoFileCacheData(fileCacheNodePath, settings); } this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths); @@ -1204,8 +1204,8 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException { /** * Throws an exception if cache exists on a non-warm node. */ - private void ensureNoFileCacheData(final NodePath fileCacheNodePath) throws IOException { - List cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath); + private void ensureNoFileCacheData(final NodePath fileCacheNodePath, final Settings settings) throws IOException { + List cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath, settings); if (cacheDataPaths.isEmpty() == false) { final String message = String.format( Locale.ROOT, @@ -1278,12 +1278,22 @@ private static boolean isIndexMetadataPath(Path path) { * Collect the path containing cache data in the indicated cache node path. * The returned paths will point to the shard data folder. */ - public static List collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException { + public static List collectFileCacheDataPath(NodePath fileCacheNodePath, Settings settings) throws IOException { // Structure is: ///... List indexSubPaths = new ArrayList<>(); - Path fileCachePath = fileCacheNodePath.fileCachePath; - if (Files.isDirectory(fileCachePath)) { - try (DirectoryStream indexStream = Files.newDirectoryStream(fileCachePath)) { + // Process file cache path + processDirectory(fileCacheNodePath.fileCachePath, indexSubPaths); + if (DiscoveryNode.isDedicatedWarmNode(settings)) { + // Process /... path only for warm nodes. + processDirectory(fileCacheNodePath.indicesPath, indexSubPaths); + } + + return indexSubPaths; + } + + private static void processDirectory(Path directoryPath, List indexSubPaths) throws IOException { + if (Files.isDirectory(directoryPath)) { + try (DirectoryStream indexStream = Files.newDirectoryStream(directoryPath)) { for (Path indexPath : indexStream) { if (Files.isDirectory(indexPath)) { try (Stream shardStream = Files.list(indexPath)) { @@ -1293,8 +1303,6 @@ public static List collectFileCacheDataPath(NodePath fileCacheNodePath) th } } } - - return indexSubPaths; } /** diff --git a/server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java b/server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java index 1d68829ececfd..2712e5ab0b386 100644 --- a/server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java +++ b/server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java @@ -137,7 +137,7 @@ private void processNoClusterManagerRepurposeNode( if (repurposeSearch) { terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths"); - fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath); + fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath, env.settings()); fileCachePaths = uniqueParentPaths(fileCacheDataPaths, indexMetadataPaths); } @@ -227,7 +227,7 @@ private void processClusterManagerRepurposeNode( if (repurposeSearch) { terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths"); - fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath); + fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath, env.settings()); fileCachePaths = uniqueParentPaths(fileCacheDataPaths); } diff --git a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java index e2da49479d42f..99eb1db04b296 100644 --- a/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/CompositeDirectory.java @@ -25,11 +25,13 @@ import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput; import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput; import org.opensearch.index.store.remote.utils.FileTypeUtils; import org.opensearch.index.store.remote.utils.TransferManager; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.Arrays; @@ -285,7 +287,27 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } // Return directly from the FileCache (via TransferManager) if complete file is present Path key = getFilePath(name); - CachedIndexInput indexInput = fileCache.get(key); + + CachedIndexInput indexInput = fileCache.compute(key, (path, cachedIndexInput) -> { + // If entry exists and is not closed, use it + if (cachedIndexInput != null && cachedIndexInput.isClosed() == false) { + return cachedIndexInput; + } + + // If entry is closed but file exists locally, create new IndexInput from local + if (cachedIndexInput != null && cachedIndexInput.isClosed() && Files.exists(key)) { + try { + assert cachedIndexInput instanceof RestoredCachedIndexInput; + return new CachedFullFileIndexInput(fileCache, key, localDirectory.openInput(name, IOContext.DEFAULT)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Return null to fall back to remote store block download/existing block reuse. + return null; + }); + if (indexInput != null) { logger.trace("Composite Directory[{}]: Complete file {} found in FileCache", this::toString, () -> name); try { @@ -306,7 +328,8 @@ public IndexInput openInput(String name, IOContext context) throws IOException { if (uploadedSegmentMetadata == null) { throw new NoSuchFileException("File " + name + " not found in directory"); } - // TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific + // TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot + // specific BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo( name, new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST), @@ -332,7 +355,6 @@ public void close() throws IOException { fileCache.remove(getFilePath(localFile)); } } - fileCache.prune(); localDirectory.close(); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/AggregateFileCacheStats.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/AggregateFileCacheStats.java index c5129a7b4c346..fc8de8c24350e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/AggregateFileCacheStats.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/AggregateFileCacheStats.java @@ -116,6 +116,11 @@ public long getCacheMisses() { return overallFileCacheStats.getCacheMisses(); } + // visible for testing. + public FileCacheStats getBlockFileCacheStats() { + return blockFileCacheStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.AGGREGATE_FILE_CACHE); diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 92ee2d08f9d5f..718f79a74f9d7 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -28,8 +28,10 @@ import java.util.List; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.Stream; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; +import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER; /** * File Cache (FC) is introduced to solve the problem that the local disk cannot hold @@ -192,6 +194,11 @@ public void logCurrentState() { theCache.logCurrentState(); } + // To be used only in testing framework. + public void closeIndexInputReferences() { + theCache.closeIndexInputReferences(); + } + /** * Ensures that the PARENT breaker is not tripped when an entry is added to the cache * @param filePath the path key for which entry is added @@ -216,32 +223,29 @@ private void checkParentBreaker(Path filePath) { * directory within the provided file cache path. */ public void restoreFromDirectory(List fileCacheDataPaths) { - fileCacheDataPaths.stream() - .filter(Files::isDirectory) - .map(path -> path.resolve(LOCAL_STORE_LOCATION)) - .filter(Files::isDirectory) - .flatMap(dir -> { - try { - return Files.list(dir); - } catch (IOException e) { - throw new UncheckedIOException( - "Unable to process file cache directory. Please clear the file cache for node startup.", - e - ); - } - }) - .filter(Files::isRegularFile) - .forEach(path -> { - try { - put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path))); - decRef(path.toAbsolutePath()); - } catch (IOException e) { - throw new UncheckedIOException( - "Unable to retrieve cache file details. Please clear the file cache for node startup.", - e - ); - } - }); + Stream.concat( + fileCacheDataPaths.stream() + .filter(Files::isDirectory) + .map(path -> path.resolve(LOCAL_STORE_LOCATION)) + .filter(Files::isDirectory), + fileCacheDataPaths.stream() + .filter(Files::isDirectory) + .map(path -> path.resolve(INDICES_FOLDER_IDENTIFIER)) + .filter(Files::isDirectory) + ).flatMap(dir -> { + try { + return Files.list(dir); + } catch (IOException e) { + throw new UncheckedIOException("Unable to process file cache directory. Please clear the file cache for node startup.", e); + } + }).filter(Files::isRegularFile).forEach(path -> { + try { + put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path))); + decRef(path.toAbsolutePath()); + } catch (IOException e) { + throw new UncheckedIOException("Unable to retrieve cache file details. Please clear the file cache for node startup.", e); + } + }); } /** @@ -308,10 +312,10 @@ public AggregateFileCacheStats fileCacheStats() { * These entries are eligible for eviction so if nothing needs to reference * them they will be deleted when the disk-based local cache fills up. */ - private static class RestoredCachedIndexInput implements CachedIndexInput { + public static class RestoredCachedIndexInput implements CachedIndexInput { private final long length; - private RestoredCachedIndexInput(long length) { + public RestoredCachedIndexInput(long length) { this.length = length; } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java index 3cdd41b94a5e9..725ddaf89781c 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; +import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER; /** * IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible @@ -55,29 +56,54 @@ public FileCacheCleaner(Provider fileCacheProvider) { public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) { if (indexSettings.isRemoteSnapshot()) { final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId); - cleanupShardFileCache(shardPath); + cleanupShardFileCache(shardPath, false, true); deleteShardFileCacheDirectory(shardPath); + } else if (indexSettings.isWarmIndex()) { + try { + final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnvironment, shardId, indexSettings.customDataPath()); + if (shardPath != null) { + cleanupShardFileCache(shardPath, true, false); + deleteShardFileCacheDirectory(shardPath); + } + } catch (IOException e) { + logger.error("failed to delete warm index shard file cache directory", e); + } } } /** * Cleans up the corresponding index file path entries from FileCache * - * @param shardPath the shard path + * @param isWarmIndex flag indicating if this is a remote index + * @param isRemoteSnapshot flag indicating if this is a remote snapshot */ - private void cleanupShardFileCache(ShardPath shardPath) { + private void cleanupShardFileCache(ShardPath shardPath, boolean isWarmIndex, boolean isRemoteSnapshot) { try { final FileCache fc = fileCacheProvider.get(); assert fc != null; - final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); + + final Path localStorePath; + if (isWarmIndex) { + localStorePath = shardPath.getDataPath().resolve(INDICES_FOLDER_IDENTIFIER); + } else if (isRemoteSnapshot) { + localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION); + } else { + return; + } + try (DirectoryStream ds = Files.newDirectoryStream(localStorePath)) { for (Path subPath : ds) { fc.remove(subPath.toRealPath()); } } } catch (IOException ioe) { + String operationType = isWarmIndex ? "warm index" : "remote snapshot"; logger.error( - () -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()), + () -> new ParameterizedMessage( + "Error removing items from cache during {} shard deletion {}", + operationType, + shardPath.getShardId() + ), ioe ); } @@ -112,6 +138,15 @@ public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, Nod logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e); } } + } else if (indexSettings.isWarmIndex()) { + final Path indicesPathInCache = nodeEnvironment.fileCacheNodePath().indicesPath.resolve(index.getUUID()); + if (Files.exists(indicesPathInCache)) { + try { + IOUtils.rm(indicesPathInCache); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to delete indices path in cache for index {}", index), e); + } + } } } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java index 9fe67dc67020a..0b7f4df66c1b3 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java @@ -55,7 +55,11 @@ private static SegmentedCache.Builder createDefaultBuild Path key = removalNotification.getKey(); if (removalReason != RemovalReason.REPLACED) { catchAsRuntimeException(value::close); - catchAsRuntimeException(() -> Files.deleteIfExists(key)); + // On RESTARTED removal, we close the IndexInput but preserve the files on disk as this scenario only occurs during + // tests + if (removalReason != RemovalReason.RESTARTED) { + catchAsRuntimeException(() -> Files.deleteIfExists(key)); + } } }); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java b/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java index ca0e6652f5ea4..ecfd60c92c76c 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java @@ -19,6 +19,7 @@ public class FileTypeUtils { public static String BLOCK_FILE_IDENTIFIER = "_block_"; + public static String INDICES_FOLDER_IDENTIFIER = "index"; public static boolean isTempFile(String name) { return name.endsWith(".tmp"); diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java index f1ba326f82445..53e630670fbd4 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java @@ -184,6 +184,24 @@ public void remove(K key) { } } + // To be used only in testing framework. + public void closeIndexInputReferences() { + lock.lock(); + try { + int closedEntries = 0; + final Iterator> iterator = data.values().iterator(); + while (iterator.hasNext()) { + closedEntries++; + Node node = iterator.next(); + iterator.remove(); + listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.RESTARTED)); + } + logger.trace("Reference cleanup completed - Total entries: {}", closedEntries); + } finally { + lock.unlock(); + } + } + @Override public void clear() { lock.lock(); @@ -411,6 +429,8 @@ public void logCurrentState() { .append(entry.getValue().refCount) .append(" , Weight: ") .append(entry.getValue().weight) + .append(" , Pinned: ") + .append(entry.getValue().pinned) .append(" ]\n"); } if (allFiles.length() > 1) { diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java index 562f7286df0fd..bb6261d9ef820 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java @@ -244,6 +244,13 @@ public void logCurrentState() { } } + // To be used only in testing framework. + public void closeIndexInputReferences() { + for (RefCountedCache cache : table) { + ((LRUCache) cache).closeIndexInputReferences(); + } + } + enum SingletonWeigher implements Weigher { INSTANCE; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 024581122c8a9..5bcb7390b7eb1 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -2252,7 +2252,7 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker); fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES); - List fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath); + List fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath, settings); this.fileCache.restoreFromDirectory(fileCacheDataPaths); } diff --git a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java index 4010215e1b7b8..531bca97df662 100644 --- a/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java @@ -19,7 +19,9 @@ import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; +import org.opensearch.index.store.remote.filecache.CachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput; import org.opensearch.index.store.remote.filecache.FileCacheFactory; import org.opensearch.index.store.remote.filecache.FileCachedIndexInput; import org.opensearch.index.store.remote.utils.FileTypeUtils; @@ -190,6 +192,54 @@ public void testAfterSyncToRemote() throws IOException { assertNull(fileCache.get(getFilePath(FILE_PRESENT_LOCALLY))); } + public void testOpenInputWithClosedCachedInput() throws Exception { + // Setup: Create a file and get it into cache + try (IndexOutput indexOutput = compositeDirectory.createOutput(NEW_FILE, IOContext.DEFAULT)) { + indexOutput.writeString("test data"); + } + + // Get the cached input and close it + Path key = getFilePath(NEW_FILE); + RestoredCachedIndexInput restoredCachedIndexInput = new RestoredCachedIndexInput(0); + CachedIndexInput cachedInput = fileCache.get(key); + cachedInput.close(); + // replace the original index input with RestoredCachedIndexInput + fileCache.put(key, restoredCachedIndexInput); + + // Verify that we can still open the file and get a valid input + IndexInput input = compositeDirectory.openInput(NEW_FILE, IOContext.DEFAULT); + assertNotNull(input); + assertTrue(input instanceof FileCachedIndexInput); + input.close(); + } + + public void testOpenInputAfterFileCacheEviction() throws IOException { + // First create and cache the file locally + try (IndexOutput indexOutput = compositeDirectory.createOutput(FILE_PRESENT_IN_REMOTE_ONLY, IOContext.DEFAULT)) { + indexOutput.writeString("test data"); + } + + // Clear the file cache + fileCache.clear(); + + // Should still be able to open input, now from remote + IndexInput input = compositeDirectory.openInput(FILE_PRESENT_IN_REMOTE_ONLY, IOContext.DEFAULT); + assertNotNull(input); + assertTrue(input instanceof OnDemandBlockSnapshotIndexInput); + input.close(); + } + + public void testOpenInputThrowsIOException() throws IOException { + // Use FILE_PRESENT_LOCALLY ("_1.cfe") which is already set up locally + // Corrupt the local file to cause IOException + Path filePath = getFilePath(NEW_FILE); + try (IndexOutput output = localDirectory.createOutput(NEW_FILE, IOContext.DEFAULT)) { + output.writeString("corrupted data"); + } + + assertThrows(IOException.class, () -> compositeDirectory.openInput(NEW_FILE, IOContext.DEFAULT)); + } + private void addFilesToDirectory(String[] files) throws IOException { for (String file : files) { IndexOutput indexOutput = compositeDirectory.createOutput(file, IOContext.DEFAULT); diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java index e2a6a4011a6b7..84c8f179947fb 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java @@ -8,16 +8,21 @@ package org.opensearch.index.store.remote.filecache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IndexInput; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.AllocationId; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; +import org.opensearch.gateway.WriteStateException; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.shard.ShardStateMetadata; import org.opensearch.test.OpenSearchTestCase; import org.hamcrest.MatcherAssert; import org.junit.After; @@ -26,10 +31,14 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermissions; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import static org.opensearch.index.IndexModule.IS_WARM_INDEX_SETTING; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION; +import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER; import static org.hamcrest.Matchers.equalTo; public class FileCacheCleanerTests extends OpenSearchTestCase { @@ -46,6 +55,28 @@ public class FileCacheCleanerTests extends OpenSearchTestCase { SETTINGS ); + private static final ShardId WARM_SHARD_0 = new ShardId("warm-index-0", "warm-uuid-0", 0); + private static final ShardId WARM_SHARD_1 = new ShardId("warm-index-1", "warm-uuid-1", 1); + + private static final Settings WARM_SETTINGS = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IS_WARM_INDEX_SETTING.getKey(), true) + .build(); + + private static final IndexSettings WARM_INDEX_SETTINGS_0 = new IndexSettings( + IndexMetadata.builder("warm-index-0").settings(WARM_SETTINGS).build(), + WARM_SETTINGS + ); + + private static final IndexSettings WARM_INDEX_SETTINGS_1 = new IndexSettings( + IndexMetadata.builder("warm-index-1").settings(WARM_SETTINGS).build(), + WARM_SETTINGS + ); + + private static final Logger logger = LogManager.getLogger(FileCache.class); + private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( 1024 * 1024, 1, @@ -61,7 +92,25 @@ public void setUpFileCache() throws IOException { cleaner = new FileCacheCleaner(() -> fileCache); files.put(SHARD_0, addFile(fileCache, env, SHARD_0)); files.put(SHARD_1, addFile(fileCache, env, SHARD_1)); - MatcherAssert.assertThat(fileCache.size(), equalTo(2L)); + + // add files in filecache for warm index shards. + Path[] paths0 = env.availableShardPaths(WARM_SHARD_0); + Path[] paths1 = env.availableShardPaths(WARM_SHARD_1); + Path path1 = randomFrom(paths0); + Path path2 = randomFrom(paths1); + writeShardStateMetadata("warm-uuid-0", path1); + writeShardStateMetadata("warm-uuid-1", path2); + files.put(WARM_SHARD_0, addFileForWarmIndex(fileCache, env, WARM_SHARD_0)); + files.put(WARM_SHARD_1, addFileForWarmIndex(fileCache, env, WARM_SHARD_1)); + + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + } + + private static void writeShardStateMetadata(String indexUUID, Path... paths) throws WriteStateException { + ShardStateMetadata.FORMAT.writeAndCleanup( + new ShardStateMetadata(true, indexUUID, AllocationId.newInitializing(), ShardStateMetadata.IndexDataLocation.LOCAL), + paths + ); } private static Path addFile(FileCache fileCache, NodeEnvironment env, ShardId shardId) throws IOException { @@ -93,6 +142,39 @@ public void close() { return file; } + private static Path addFileForWarmIndex(FileCache fileCache, NodeEnvironment env, ShardId shardId) throws IOException { + final ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, ""); + final Path localStorePath = shardPath.getDataPath().resolve(INDICES_FOLDER_IDENTIFIER); + + logger.info("warm index local store location [{}]", localStorePath); + + Files.createDirectories(localStorePath); + final Path file = Files.createFile(localStorePath.resolve("file")); + + fileCache.put(file, new CachedIndexInput() { + @Override + public IndexInput getIndexInput() { + return null; + } + + @Override + public long length() { + return 1024; + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() { + + } + }); + return file; + } + @After public void tearDownFileCache() { env.close(); @@ -101,15 +183,105 @@ public void tearDownFileCache() { public void testShardRemoved() { final Path cachePath = ShardPath.loadFileCachePath(env, SHARD_0).getDataPath(); assertTrue(Files.exists(cachePath)); - + // Initially 4 files exits in fileCache. + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + // cleanup shard_0 files. cleaner.beforeShardPathDeleted(SHARD_0, INDEX_SETTINGS, env); - MatcherAssert.assertThat(fileCache.size(), equalTo(1L)); + // assert fileCache has 3 files. + MatcherAssert.assertThat(fileCache.size(), equalTo(3L)); assertNull(fileCache.get(files.get(SHARD_0))); assertFalse(Files.exists(files.get(SHARD_0))); assertTrue(Files.exists(files.get(SHARD_1))); assertFalse(Files.exists(cachePath)); } + public void testShardRemovedForWarmIndex() throws IOException { + final Path indexFilePath0 = ShardPath.loadShardPath(logger, env, WARM_SHARD_0, "").getDataPath(); + final Path indexFilePath1 = ShardPath.loadShardPath(logger, env, WARM_SHARD_1, "").getDataPath(); + assertTrue(Files.exists(indexFilePath0)); + // Initially 4 files exits in fileCache. + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + // clean warm_shard_0 files + cleaner.beforeShardPathDeleted(WARM_SHARD_0, WARM_INDEX_SETTINGS_0, env); + MatcherAssert.assertThat(fileCache.size(), equalTo(3L)); + assertNull(fileCache.get(files.get(WARM_SHARD_0))); + assertFalse(Files.exists(files.get(WARM_SHARD_0))); + assertTrue(Files.exists(files.get(WARM_SHARD_1))); + assertFalse(Files.exists(indexFilePath0)); + + // clean warm_shard_1 files as well. + cleaner.beforeShardPathDeleted(WARM_SHARD_1, WARM_INDEX_SETTINGS_1, env); + MatcherAssert.assertThat(fileCache.size(), equalTo(2L)); + assertNull(fileCache.get(files.get(WARM_SHARD_1))); + assertFalse(Files.exists(files.get(WARM_SHARD_1))); + assertFalse(Files.exists(indexFilePath1)); + } + + public void testIndexRemovedForWarmIndexWhenShardPathDeletedFirst() throws IOException { + final Path indexFilePath = ShardPath.loadShardPath(logger, env, WARM_SHARD_0, "").getDataPath(); + assertTrue(Files.exists(indexFilePath)); + // assert filecache contains 4 files + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + // clean shard path files first. + cleaner.beforeShardPathDeleted(WARM_SHARD_0, WARM_INDEX_SETTINGS_0, env); + // now clean the index path. + cleaner.beforeIndexPathDeleted(WARM_INDEX_SETTINGS_0.getIndex(), WARM_INDEX_SETTINGS_0, env); + // Assert that index path deleted and fileCache also doesn't have files for that index shard. + MatcherAssert.assertThat(fileCache.size(), equalTo(3L)); + assertFalse(Files.exists(indexFilePath)); + } + + public void testIndexNotRemovedForWarmIndexWhenShardPathNotDeletedFirst() throws IOException { + final Path indexFilePath = ShardPath.loadShardPath(logger, env, WARM_SHARD_0, "").getDataPath(); + assertTrue(Files.exists(indexFilePath)); + // assert filecache contains 4 files + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + // Try to not clean the index path without calling beforeShardPathDeleted method + cleaner.beforeIndexPathDeleted(WARM_INDEX_SETTINGS_0.getIndex(), WARM_INDEX_SETTINGS_0, env); + // Assert that index path should exists and fileCache also have files for that index shard. + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + assertTrue(Files.exists(indexFilePath)); + } + + public void testFileCacheNotClearedAndWithFileAlreadyDeleted() throws IOException { + // Delete the shard path to simulate IOException when trying to load shard path + Path shardPath = ShardPath.loadShardPath(logger, env, WARM_SHARD_0, "").getDataPath(); + Path shardFilePath = shardPath.resolve(INDICES_FOLDER_IDENTIFIER).resolve("file"); + assertTrue(Files.exists(shardFilePath)); + Files.delete(shardFilePath); + + // Initially 4 files exist in fileCache + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + + // Try to clean up the deleted shard path + cleaner.beforeShardPathDeleted(WARM_SHARD_0, WARM_INDEX_SETTINGS_0, env); + + // Verify fileCache still contains all files as remove operation won't get executed. + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + // Shard path is still deleted + assertFalse(Files.exists(shardPath)); + } + + public void testShardRemovedForWarmIndexWithIOExceptionOnDirectoryStream() throws IOException { + Path shardPath = ShardPath.loadShardPath(logger, env, WARM_SHARD_0, "").getDataPath(); + Path indicesPath = shardPath.resolve(INDICES_FOLDER_IDENTIFIER); + + // Make the indices directory non-readable to cause IOException during directory stream + Files.setPosixFilePermissions(indicesPath, new HashSet<>()); + + // Initially 4 files exist in fileCache + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + + // Try to clean up with non-readable directory + cleaner.beforeShardPathDeleted(WARM_SHARD_0, WARM_INDEX_SETTINGS_0, env); + + // Restore permissions for cleanup + Files.setPosixFilePermissions(indicesPath, PosixFilePermissions.fromString("rwxrwxrwx")); + + // Verify fileCache still contains all files as operation failed + MatcherAssert.assertThat(fileCache.size(), equalTo(4L)); + } + public void testIndexRemoved() { final Path indexCachePath = env.fileCacheNodePath().fileCachePath.resolve(SHARD_0.getIndex().getUUID()); assertTrue(Files.exists(indexCachePath)); @@ -117,7 +289,7 @@ public void testIndexRemoved() { cleaner.beforeShardPathDeleted(SHARD_0, INDEX_SETTINGS, env); cleaner.beforeShardPathDeleted(SHARD_1, INDEX_SETTINGS, env); cleaner.beforeIndexPathDeleted(SHARD_0.getIndex(), INDEX_SETTINGS, env); - MatcherAssert.assertThat(fileCache.size(), equalTo(0L)); + MatcherAssert.assertThat(fileCache.size(), equalTo(2L)); assertFalse(Files.exists(indexCachePath)); } } diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 7964077eb04ae..643caa85b5862 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -18,12 +18,14 @@ import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; +import org.opensearch.index.store.remote.utils.FileTypeUtils; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermissions; import java.util.List; public class FileCacheTests extends OpenSearchTestCase { @@ -66,6 +68,18 @@ private void createFile(String indexName, String shardId, String fileName) throw Files.write(filePath, "test-data".getBytes()); } + @SuppressForbidden(reason = "creating a test file for cache") + private void createWarmIndexFile(String indexName, String shardId, String fileName) throws IOException { + Path folderPath = path.resolve(NodeEnvironment.INDICES_FOLDER) + .resolve(indexName) + .resolve(shardId) + .resolve(FileTypeUtils.INDICES_FOLDER_IDENTIFIER); + Path filePath = folderPath.resolve(fileName); + Files.createDirectories(folderPath); + Files.createFile(filePath); + Files.write(filePath, "test-data".getBytes()); + } + // test get method public void testGet() { FileCache fileCache = createFileCache(8 * MEGA_BYTES); @@ -357,6 +371,114 @@ public void testCacheRestore() throws IOException { assertEquals(0, fileCache.activeUsage()); } + public void testCloseIndexInputReferences() throws IOException { + FileCache fileCache = createFileCache(MEGA_BYTES); + // Add some entries to cache + int numEntries = 2; + Path tempDir = createTempDir(); + Path path1 = tempDir.resolve("test1.tmp"); + Path path2 = tempDir.resolve("test2.tmp"); + + // Create the files + Files.createFile(path1); + Files.createFile(path2); + + try { + fileCache.put(path1, new StubCachedIndexInput(8 * MEGA_BYTES)); + fileCache.incRef(path1); // Increase reference count + fileCache.put(path2, new StubCachedIndexInput(8 * MEGA_BYTES)); + fileCache.incRef(path2); // Increase reference count + // Verify initial state + assertEquals(numEntries, fileCache.size()); + // Close all references + fileCache.closeIndexInputReferences(); + // Verify cache is empty + assertEquals(0, fileCache.size()); + // Verify all entries are removed + assertNull(fileCache.get(path1)); + assertNull(fileCache.get(path2)); + // Verify path still exists + assertTrue(Files.exists(path1)); + assertTrue(Files.exists(path2)); + } finally { + Files.deleteIfExists(path1); + Files.deleteIfExists(path2); + } + } + + public void testRestoreFromEmptyDirectory() throws IOException { + FileCache fileCache = createFileCache(MEGA_BYTES); + Path emptyDir = createTempDir(); + fileCache.restoreFromDirectory(List.of(emptyDir)); + assertEquals(0, fileCache.usage()); + assertEquals(0, fileCache.activeUsage()); + } + + public void testRestoreWithNonExistentDirectory() { + FileCache fileCache = createFileCache(MEGA_BYTES); + Path nonExistentPath = path.resolve("non-existent"); + + fileCache.restoreFromDirectory(List.of(nonExistentPath)); + assertEquals(0, fileCache.usage()); + } + + public void testWarmIndexCacheRestore() throws IOException { + String indexName = "test-warm-index"; + String shardId = "0"; + createWarmIndexFile(indexName, shardId, "test.0"); + FileCache fileCache = createFileCache(MEGA_BYTES); + assertEquals(0, fileCache.usage()); + Path indicesCachePath = path.resolve(NodeEnvironment.INDICES_FOLDER).resolve(indexName).resolve(shardId); + fileCache.restoreFromDirectory(List.of(indicesCachePath)); + assertTrue(fileCache.usage() > 0); + assertEquals(0, fileCache.activeUsage()); + } + + public void testRestoreFromMultipleDirectories() throws IOException { + String index1 = "test-index-1"; + String index2 = "test-warm-index-2"; + String shardId = "0"; + + // Create files in both cache and indices directories + createFile(index1, shardId, "test1.0"); + createWarmIndexFile(index2, shardId, "test2.0"); + + FileCache fileCache = createFileCache(MEGA_BYTES); + Path cachePath = path.resolve(NodeEnvironment.CACHE_FOLDER).resolve(index1).resolve(shardId); + Path indicesPath = path.resolve(NodeEnvironment.INDICES_FOLDER).resolve(index2).resolve(shardId); + + fileCache.restoreFromDirectory(List.of(cachePath, indicesPath)); + assertTrue(fileCache.usage() > 0); + assertEquals(0, fileCache.activeUsage()); + } + + public void testRestoreWithInvalidWarmIndexFiles() throws IOException { + String indexName = "test-warm-index"; + String shardId = "0"; + + // Create a valid warm index file + createWarmIndexFile(indexName, shardId, "valid.0"); + + // Create an invalid/corrupt warm index file + Path invalidFilePath = path.resolve(NodeEnvironment.INDICES_FOLDER) + .resolve(indexName) + .resolve(shardId) + .resolve(FileTypeUtils.INDICES_FOLDER_IDENTIFIER) + .resolve("invalid.0"); + Files.createDirectories(invalidFilePath.getParent()); + Files.createFile(invalidFilePath); + // Make file unreadable + Files.setPosixFilePermissions(invalidFilePath, PosixFilePermissions.fromString("---------")); + + FileCache fileCache = createFileCache(MEGA_BYTES); + Path indicesPath = path.resolve(NodeEnvironment.INDICES_FOLDER).resolve(indexName).resolve(shardId); + + // Should handle the invalid file gracefully + fileCache.restoreFromDirectory(List.of(indicesPath)); + assertTrue("File cache should contain at least the valid file", fileCache.usage() > 0); + assertEquals("No files should be actively used", 0, fileCache.activeUsage()); + } + private void putAndDecRef(FileCache cache, int path, long indexInputSize) { final Path key = createPath(Integer.toString(path)); cache.put(key, new StubCachedIndexInput(indexInputSize)); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 24a195e0c1b78..38c5426401503 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -1982,6 +1982,12 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) assert Thread.holdsLock(this); logger.info("Restarting node [{}] ", nodeAndClient.name); + FileCache fileCache = nodeAndClient.node().fileCache(); + // Close IndexInput reference file to avoid file leaks during node restart + if (fileCache != null && WARM_NODE_PREDICATE.test(nodeAndClient)) { + fileCache.closeIndexInputReferences(); + } + if (activeDisruptionScheme != null) { activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); } From 326bfe78d1746179ff3797790f7162fdf3a20d92 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Tue, 17 Jun 2025 18:26:27 +0530 Subject: [PATCH 2/3] Empty commit Signed-off-by: Sandeep Kumawat From f6edc154ba5b64d33a4c11acff8c3b9fabb312cc Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Tue, 17 Jun 2025 22:48:43 +0530 Subject: [PATCH 3/3] Fix checks Signed-off-by: Sandeep Kumawat --- server/src/main/java/org/opensearch/env/NodeEnvironment.java | 5 +++++ .../opensearch/index/store/remote/utils/cache/LRUCache.java | 2 -- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 9d7596cf4ea69..4e284f1fe5c3e 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -1291,6 +1291,11 @@ public static List collectFileCacheDataPath(NodePath fileCacheNodePath, Se return indexSubPaths; } + @Deprecated(forRemoval = true) + public static List collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException { + return collectFileCacheDataPath(fileCacheNodePath, Settings.EMPTY); + } + private static void processDirectory(Path directoryPath, List indexSubPaths) throws IOException { if (Files.isDirectory(directoryPath)) { try (DirectoryStream indexStream = Files.newDirectoryStream(directoryPath)) { diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java index 53e630670fbd4..5398ba03d863d 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java @@ -429,8 +429,6 @@ public void logCurrentState() { .append(entry.getValue().refCount) .append(" , Weight: ") .append(entry.getValue().weight) - .append(" , Pinned: ") - .append(entry.getValue().pinned) .append(" ]\n"); } if (allFiles.length() > 1) {