Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,10 @@ public void testFullFileAndFileCacheStats() throws ExecutionException, Interrupt
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
.get();

Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting docs again before force merge
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME_2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum RemovalReason {
INVALIDATED,
EVICTED,
EXPLICIT,
CAPACITY
CAPACITY,
RESTARTED // This is used by testing framework to close the CachedIndexInput during node restart.
}
31 changes: 22 additions & 9 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public NodeEnvironment(Settings settings, Environment environment, IndexStoreLis
}

if (DiscoveryNode.isWarmNode(settings) == false) {
ensureNoFileCacheData(fileCacheNodePath);
ensureNoFileCacheData(fileCacheNodePath, settings);
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
Expand Down Expand Up @@ -1204,8 +1204,8 @@ private void ensureNoShardData(final NodePath[] nodePaths) throws IOException {
/**
* Throws an exception if cache exists on a non-warm node.
*/
private void ensureNoFileCacheData(final NodePath fileCacheNodePath) throws IOException {
List<Path> cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
private void ensureNoFileCacheData(final NodePath fileCacheNodePath, final Settings settings) throws IOException {
List<Path> cacheDataPaths = collectFileCacheDataPath(fileCacheNodePath, settings);
if (cacheDataPaths.isEmpty() == false) {
final String message = String.format(
Locale.ROOT,
Expand Down Expand Up @@ -1278,12 +1278,27 @@ private static boolean isIndexMetadataPath(Path path) {
* Collect the path containing cache data in the indicated cache node path.
* The returned paths will point to the shard data folder.
*/
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath, Settings settings) throws IOException {
// Structure is: <file cache path>/<index uuid>/<shard id>/...
List<Path> indexSubPaths = new ArrayList<>();
Path fileCachePath = fileCacheNodePath.fileCachePath;
if (Files.isDirectory(fileCachePath)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(fileCachePath)) {
// Process file cache path
processDirectory(fileCacheNodePath.fileCachePath, indexSubPaths);
if (DiscoveryNode.isDedicatedWarmNode(settings)) {
// Process <indices>/... path only for warm nodes.
processDirectory(fileCacheNodePath.indicesPath, indexSubPaths);
}

return indexSubPaths;
}

@Deprecated(forRemoval = true)
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
return collectFileCacheDataPath(fileCacheNodePath, Settings.EMPTY);
}

private static void processDirectory(Path directoryPath, List<Path> indexSubPaths) throws IOException {
if (Files.isDirectory(directoryPath)) {
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(directoryPath)) {
for (Path indexPath : indexStream) {
if (Files.isDirectory(indexPath)) {
try (Stream<Path> shardStream = Files.list(indexPath)) {
Expand All @@ -1293,8 +1308,6 @@ public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) th
}
}
}

return indexSubPaths;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void processNoClusterManagerRepurposeNode(

if (repurposeSearch) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath, env.settings());
fileCachePaths = uniqueParentPaths(fileCacheDataPaths, indexMetadataPaths);
}

Expand Down Expand Up @@ -227,7 +227,7 @@ private void processClusterManagerRepurposeNode(

if (repurposeSearch) {
terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);
fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath, env.settings());
fileCachePaths = uniqueParentPaths(fileCacheDataPaths);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Arrays;
Expand Down Expand Up @@ -285,7 +287,27 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}
// Return directly from the FileCache (via TransferManager) if complete file is present
Path key = getFilePath(name);
CachedIndexInput indexInput = fileCache.get(key);

CachedIndexInput indexInput = fileCache.compute(key, (path, cachedIndexInput) -> {
// If entry exists and is not closed, use it
if (cachedIndexInput != null && cachedIndexInput.isClosed() == false) {
return cachedIndexInput;
}

// If entry is closed but file exists locally, create new IndexInput from local
if (cachedIndexInput != null && cachedIndexInput.isClosed() && Files.exists(key)) {
try {
assert cachedIndexInput instanceof RestoredCachedIndexInput;
return new CachedFullFileIndexInput(fileCache, key, localDirectory.openInput(name, IOContext.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

// Return null to fall back to remote store block download/existing block reuse.
return null;
});

if (indexInput != null) {
logger.trace("Composite Directory[{}]: Complete file {} found in FileCache", this::toString, () -> name);
try {
Expand All @@ -306,7 +328,8 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
if (uploadedSegmentMetadata == null) {
throw new NoSuchFileException("File " + name + " not found in directory");
}
// TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot specific
// TODO : Refactor FileInfo and OnDemandBlockSnapshotIndexInput to more generic names as they are not Remote Snapshot
// specific
BlobStoreIndexShardSnapshot.FileInfo fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
name,
new StoreFileMetadata(name, uploadedSegmentMetadata.getLength(), uploadedSegmentMetadata.getChecksum(), Version.LATEST),
Expand All @@ -332,7 +355,6 @@ public void close() throws IOException {
fileCache.remove(getFilePath(localFile));
}
}
fileCache.prune();
localDirectory.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public long getCacheMisses() {
return overallFileCacheStats.getCacheMisses();
}

// visible for testing.
public FileCacheStats getBlockFileCacheStats() {
return blockFileCacheStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.AGGREGATE_FILE_CACHE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER;

/**
* File Cache (FC) is introduced to solve the problem that the local disk cannot hold
Expand Down Expand Up @@ -192,6 +194,11 @@ public void logCurrentState() {
theCache.logCurrentState();
}

// To be used only in testing framework.
public void closeIndexInputReferences() {
theCache.closeIndexInputReferences();
}

/**
* Ensures that the PARENT breaker is not tripped when an entry is added to the cache
* @param filePath the path key for which entry is added
Expand All @@ -216,32 +223,29 @@ private void checkParentBreaker(Path filePath) {
* directory within the provided file cache path.
*/
public void restoreFromDirectory(List<Path> fileCacheDataPaths) {
fileCacheDataPaths.stream()
.filter(Files::isDirectory)
.map(path -> path.resolve(LOCAL_STORE_LOCATION))
.filter(Files::isDirectory)
.flatMap(dir -> {
try {
return Files.list(dir);
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to process file cache directory. Please clear the file cache for node startup.",
e
);
}
})
.filter(Files::isRegularFile)
.forEach(path -> {
try {
put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path)));
decRef(path.toAbsolutePath());
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to retrieve cache file details. Please clear the file cache for node startup.",
e
);
}
});
Stream.concat(
fileCacheDataPaths.stream()
.filter(Files::isDirectory)
.map(path -> path.resolve(LOCAL_STORE_LOCATION))
.filter(Files::isDirectory),
fileCacheDataPaths.stream()
.filter(Files::isDirectory)
.map(path -> path.resolve(INDICES_FOLDER_IDENTIFIER))
.filter(Files::isDirectory)
).flatMap(dir -> {
try {
return Files.list(dir);
} catch (IOException e) {
throw new UncheckedIOException("Unable to process file cache directory. Please clear the file cache for node startup.", e);
}
}).filter(Files::isRegularFile).forEach(path -> {
try {
put(path.toAbsolutePath(), new RestoredCachedIndexInput(Files.size(path)));
decRef(path.toAbsolutePath());
} catch (IOException e) {
throw new UncheckedIOException("Unable to retrieve cache file details. Please clear the file cache for node startup.", e);
}
});
}

/**
Expand Down Expand Up @@ -308,10 +312,10 @@ public AggregateFileCacheStats fileCacheStats() {
* These entries are eligible for eviction so if nothing needs to reference
* them they will be deleted when the disk-based local cache fills up.
*/
private static class RestoredCachedIndexInput implements CachedIndexInput {
public static class RestoredCachedIndexInput implements CachedIndexInput {
private final long length;

private RestoredCachedIndexInput(long length) {
public RestoredCachedIndexInput(long length) {
this.length = length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER;

/**
* IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible
Expand Down Expand Up @@ -55,29 +56,54 @@ public FileCacheCleaner(Provider<FileCache> fileCacheProvider) {
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
cleanupShardFileCache(shardPath);
cleanupShardFileCache(shardPath, false, true);
deleteShardFileCacheDirectory(shardPath);
} else if (indexSettings.isWarmIndex()) {
try {
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnvironment, shardId, indexSettings.customDataPath());
if (shardPath != null) {
cleanupShardFileCache(shardPath, true, false);
deleteShardFileCacheDirectory(shardPath);
}
} catch (IOException e) {
logger.error("failed to delete warm index shard file cache directory", e);
}
}
}

/**
* Cleans up the corresponding index file path entries from FileCache
*
* @param shardPath the shard path
* @param isWarmIndex flag indicating if this is a remote index
* @param isRemoteSnapshot flag indicating if this is a remote snapshot
*/
private void cleanupShardFileCache(ShardPath shardPath) {
private void cleanupShardFileCache(ShardPath shardPath, boolean isWarmIndex, boolean isRemoteSnapshot) {
try {
final FileCache fc = fileCacheProvider.get();
assert fc != null;
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);

final Path localStorePath;
if (isWarmIndex) {
localStorePath = shardPath.getDataPath().resolve(INDICES_FOLDER_IDENTIFIER);
} else if (isRemoteSnapshot) {
localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
} else {
return;
}

try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fc.remove(subPath.toRealPath());
}
}
} catch (IOException ioe) {
String operationType = isWarmIndex ? "warm index" : "remote snapshot";
logger.error(
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),
() -> new ParameterizedMessage(
"Error removing items from cache during {} shard deletion {}",
operationType,
shardPath.getShardId()
),
ioe
);
}
Expand Down Expand Up @@ -112,6 +138,15 @@ public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, Nod
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
}
}
} else if (indexSettings.isWarmIndex()) {
final Path indicesPathInCache = nodeEnvironment.fileCacheNodePath().indicesPath.resolve(index.getUUID());
if (Files.exists(indicesPathInCache)) {
try {
IOUtils.rm(indicesPathInCache);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete indices path in cache for index {}", index), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuild
Path key = removalNotification.getKey();
if (removalReason != RemovalReason.REPLACED) {
catchAsRuntimeException(value::close);
catchAsRuntimeException(() -> Files.deleteIfExists(key));
// On RESTARTED removal, we close the IndexInput but preserve the files on disk as this scenario only occurs during
// tests
if (removalReason != RemovalReason.RESTARTED) {
catchAsRuntimeException(() -> Files.deleteIfExists(key));
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class FileTypeUtils {

public static String BLOCK_FILE_IDENTIFIER = "_block_";
public static String INDICES_FOLDER_IDENTIFIER = "index";

public static boolean isTempFile(String name) {
return name.endsWith(".tmp");
Expand Down
Loading
Loading