Skip to content

Commit 28db81c

Browse files
committed
Add composite directory factory
1 parent 80cb033 commit 28db81c

7 files changed

Lines changed: 130 additions & 2 deletions

File tree

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
197197
MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING,
198198
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
199199
IndexModule.INDEX_STORE_TYPE_SETTING,
200+
IndexModule.INDEX_COMPOSITE_STORE_TYPE_SETTING,
200201
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
201202
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,
202203
IndexModule.INDEX_RECOVERY_TYPE_SETTING,

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import org.opensearch.index.shard.IndexingOperationListener;
7777
import org.opensearch.index.shard.SearchOperationListener;
7878
import org.opensearch.index.similarity.SimilarityService;
79+
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
7980
import org.opensearch.index.store.FsDirectoryFactory;
8081
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
8182
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -133,6 +134,7 @@ public final class IndexModule {
133134
public static final Setting<Boolean> NODE_STORE_ALLOW_MMAP = Setting.boolSetting("node.store.allow_mmap", true, Property.NodeScope);
134135

135136
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
137+
private static final IndexStorePlugin.CompositeDirectoryFactory DEFAULT_COMPOSITE_DIRECTORY_FACTORY = new DefaultCompositeDirectoryFactory();
136138

137139
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
138140

@@ -144,6 +146,14 @@ public final class IndexModule {
144146
Property.NodeScope
145147
);
146148

149+
public static final Setting<String> INDEX_COMPOSITE_STORE_TYPE_SETTING = new Setting<>(
150+
"index.composite_store.type",
151+
"default",
152+
Function.identity(),
153+
Property.IndexScope,
154+
Property.NodeScope
155+
);
156+
147157
/**
148158
* Index setting which used to determine how the data is cached locally fully or partially.
149159
*/
@@ -240,6 +250,7 @@ public final class IndexModule {
240250
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
241251
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
242252
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
253+
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
243254
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
244255
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
245256
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
@@ -265,6 +276,7 @@ public IndexModule(
265276
final EngineFactory engineFactory,
266277
final EngineConfigFactory engineConfigFactory,
267278
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
279+
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
268280
final BooleanSupplier allowExpensiveQueries,
269281
final IndexNameExpressionResolver expressionResolver,
270282
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
@@ -278,6 +290,7 @@ public IndexModule(
278290
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
279291
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
280292
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
293+
this.compositeDirectoryFactories = Collections.unmodifiableMap(compositeDirectoryFactories);
281294
this.allowExpensiveQueries = allowExpensiveQueries;
282295
this.expressionResolver = expressionResolver;
283296
this.recoveryStateFactories = recoveryStateFactories;
@@ -301,6 +314,7 @@ public IndexModule(
301314
engineFactory,
302315
engineConfigFactory,
303316
directoryFactories,
317+
null,
304318
allowExpensiveQueries,
305319
expressionResolver,
306320
recoveryStateFactories,
@@ -693,6 +707,7 @@ public IndexService newIndexService(
693707
.get() == null ? (shard) -> null : indexReaderWrapper.get();
694708
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
695709
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
710+
final IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory = getCompositeDirectoryFactory(indexSettings, compositeDirectoryFactories);
696711
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
697712
QueryCache queryCache = null;
698713
IndexAnalyzers indexAnalyzers = null;
@@ -729,6 +744,7 @@ public IndexService newIndexService(
729744
client,
730745
queryCache,
731746
directoryFactory,
747+
compositeDirectoryFactory,
732748
remoteDirectoryFactory,
733749
eventListener,
734750
readerWrapperFactory,
@@ -792,6 +808,23 @@ private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
792808
return factory;
793809
}
794810

811+
private static IndexStorePlugin.CompositeDirectoryFactory getCompositeDirectoryFactory(
812+
final IndexSettings indexSettings,
813+
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> indexStoreFactories
814+
) {
815+
final String compositeStoreType = indexSettings.getValue(INDEX_COMPOSITE_STORE_TYPE_SETTING);
816+
final IndexStorePlugin.CompositeDirectoryFactory factory;
817+
if (compositeStoreType.isEmpty()) {
818+
factory = DEFAULT_COMPOSITE_DIRECTORY_FACTORY;
819+
} else {
820+
factory = indexStoreFactories.get(compositeStoreType);
821+
if (factory == null) {
822+
throw new IllegalArgumentException("Unknown composite store type [" + compositeStoreType + "]");
823+
}
824+
}
825+
return factory;
826+
}
827+
795828
private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
796829
final IndexSettings indexSettings,
797830
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
153153
private final NodeEnvironment nodeEnv;
154154
private final ShardStoreDeleter shardStoreDeleter;
155155
private final IndexStorePlugin.DirectoryFactory directoryFactory;
156+
private final IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory;
156157
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
157158
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
158159
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
@@ -218,6 +219,7 @@ public IndexService(
218219
Client client,
219220
QueryCache queryCache,
220221
IndexStorePlugin.DirectoryFactory directoryFactory,
222+
IndexStorePlugin.CompositeDirectoryFactory compositeDirectoryFactory,
221223
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
222224
IndexEventListener eventListener,
223225
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
@@ -300,6 +302,7 @@ public IndexService(
300302
this.eventListener = eventListener;
301303
this.nodeEnv = nodeEnv;
302304
this.directoryFactory = directoryFactory;
305+
this.compositeDirectoryFactory = compositeDirectoryFactory;
303306
this.remoteDirectoryFactory = remoteDirectoryFactory;
304307
this.recoveryStateFactory = recoveryStateFactory;
305308
this.engineFactory = Objects.requireNonNull(engineFactory);
@@ -384,6 +387,7 @@ public IndexService(
384387
client,
385388
queryCache,
386389
directoryFactory,
390+
null,
387391
remoteDirectoryFactory,
388392
eventListener,
389393
wrapperFactory,
@@ -659,8 +663,7 @@ protected void closeInternal() {
659663
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
660664
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
661665
this.indexSettings.isWarmIndex()) {
662-
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
663-
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
666+
directory = compositeDirectoryFactory.newDirectory(this.indexSettings, path, directoryFactory, remoteDirectory, fileCache);
664667
} else {
665668
directory = directoryFactory.newDirectory(this.indexSettings, path);
666669
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.store.Directory;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.ShardPath;
16+
import org.opensearch.index.store.remote.filecache.FileCache;
17+
import org.opensearch.plugins.IndexStorePlugin;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Default composite directory factory
23+
*/
24+
public class DefaultCompositeDirectoryFactory implements IndexStorePlugin.CompositeDirectoryFactory {
25+
26+
private static final Logger logger = LogManager.getLogger(DefaultCompositeDirectoryFactory.class);
27+
28+
@Override
29+
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, IndexStorePlugin.DirectoryFactory localDirectoryFactory, Directory remoteDirectory, FileCache fileCache) throws IOException {
30+
logger.trace("Creating composite directory from core - Default CompositeDirectoryFactory");
31+
Directory localDirectory = localDirectoryFactory.newDirectory(indexSettings, shardPath);
32+
return new CompositeDirectory(
33+
localDirectory,
34+
remoteDirectory,
35+
fileCache
36+
);
37+
}
38+
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ public class IndicesService extends AbstractLifecycleComponent
359359
private final MetaStateService metaStateService;
360360
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
361361
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
362+
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
362363
private final Map<String, IngestionConsumerFactory> ingestionConsumerFactories;
363364
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
364365
final AbstractRefCounted indicesRefCount; // pkg-private for testing
@@ -408,6 +409,7 @@ public IndicesService(
408409
MetaStateService metaStateService,
409410
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
410411
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
412+
Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories,
411413
ValuesSourceRegistry valuesSourceRegistry,
412414
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
413415
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
@@ -472,6 +474,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
472474
this.engineFactoryProviders = engineFactoryProviders;
473475

474476
this.directoryFactories = directoryFactories;
477+
this.compositeDirectoryFactories = compositeDirectoryFactories;
475478
this.recoveryStateFactories = recoveryStateFactories;
476479
this.ingestionConsumerFactories = ingestionConsumerFactories;
477480
// doClose() is called when shutting down a node, yet there might still be ongoing requests
@@ -596,6 +599,7 @@ public IndicesService(
596599
metaStateService,
597600
engineFactoryProviders,
598601
directoryFactories,
602+
null,
599603
valuesSourceRegistry,
600604
recoveryStateFactories,
601605
remoteDirectoryFactory,
@@ -995,6 +999,7 @@ private synchronized IndexService createIndexService(
995999
getEngineFactory(idxSettings),
9961000
getEngineConfigFactory(idxSettings),
9971001
directoryFactories,
1002+
compositeDirectoryFactories,
9981003
() -> allowExpensiveQueries,
9991004
indexNameExpressionResolver,
10001005
recoveryStateFactories,
@@ -1111,6 +1116,7 @@ public synchronized MapperService createIndexMapperService(IndexMetadata indexMe
11111116
getEngineFactory(idxSettings),
11121117
getEngineConfigFactory(idxSettings),
11131118
directoryFactories,
1119+
compositeDirectoryFactories,
11141120
() -> allowExpensiveQueries,
11151121
indexNameExpressionResolver,
11161122
recoveryStateFactories,

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import org.opensearch.index.recovery.RemoteStoreRestoreService;
159159
import org.opensearch.index.remote.RemoteIndexPathUploader;
160160
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
161+
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
161162
import org.opensearch.index.store.IndexStoreListener;
162163
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
163164
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -892,6 +893,20 @@ protected Node(
892893
});
893894
directoryFactories.putAll(builtInDirectoryFactories);
894895

896+
final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories = new HashMap<>();
897+
pluginsService.filterPlugins(IndexStorePlugin.class)
898+
.stream()
899+
.map(IndexStorePlugin::getCompositeDirectoryFactories)
900+
.flatMap(m -> m.entrySet().stream())
901+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
902+
.forEach((k, v) -> {
903+
if (k.equals("default")) {
904+
throw new IllegalStateException("registered composite index store type [" + k + "] conflicts with a built-in default type");
905+
}
906+
compositeDirectoryFactories.put(k, v);
907+
});
908+
compositeDirectoryFactories.put("default", new DefaultCompositeDirectoryFactory());
909+
895910
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = pluginsService.filterPlugins(
896911
IndexStorePlugin.class
897912
)
@@ -950,6 +965,7 @@ protected Node(
950965
metaStateService,
951966
engineFactoryProviders,
952967
Map.copyOf(directoryFactories),
968+
Map.copyOf(compositeDirectoryFactories),
953969
searchModule.getValuesSourceRegistry(),
954970
recoveryStateFactories,
955971
remoteDirectoryFactory,

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@
3636
import org.opensearch.cluster.node.DiscoveryNode;
3737
import org.opensearch.cluster.routing.ShardRouting;
3838
import org.opensearch.common.Nullable;
39+
import org.opensearch.common.annotation.ExperimentalApi;
3940
import org.opensearch.common.annotation.PublicApi;
4041
import org.opensearch.index.IndexSettings;
4142
import org.opensearch.index.shard.ShardPath;
4243
import org.opensearch.index.store.IndexStoreListener;
44+
import org.opensearch.index.store.remote.filecache.FileCache;
4345
import org.opensearch.indices.recovery.RecoveryState;
4446

4547
import java.io.IOException;
@@ -82,6 +84,35 @@ interface DirectoryFactory {
8284
*/
8385
Map<String, DirectoryFactory> getDirectoryFactories();
8486

87+
/**
88+
* An interface that describes how to create a new composite directory instance per shard.
89+
*
90+
* @opensearch.api
91+
*/
92+
@FunctionalInterface
93+
@ExperimentalApi
94+
interface CompositeDirectoryFactory {
95+
/**
96+
* Creates a new composite directory per shard
97+
* @param indexSettings the shards index settings
98+
* @param shardPath the path the shard is using
99+
* @return a new composite directory instance
100+
* @throws IOException if an IOException occurs while opening the directory
101+
*/
102+
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, DirectoryFactory localDirectoryFactory, Directory remoteDirectory, FileCache fileCache) throws IOException;
103+
}
104+
105+
/**
106+
* The {@link CompositeDirectoryFactory} mappings for this plugin. When an index is created the composite store type setting
107+
* {@link org.opensearch.index.IndexModule#INDEX_COMPOSITE_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
108+
* built-in type, or looked up among all the composite directory factories from {@link IndexStorePlugin} plugins.
109+
*
110+
* @return a map from composite store type to a composite directory factory
111+
*/
112+
default Map<String, CompositeDirectoryFactory> getCompositeDirectoryFactories() {
113+
return Collections.emptyMap();
114+
}
115+
85116
/**
86117
* An interface that allows to create a new {@link RecoveryState} per shard.
87118
*

0 commit comments

Comments
 (0)