1111import org .apache .logging .log4j .LogManager ;
1212import org .apache .logging .log4j .Logger ;
1313import org .apache .logging .log4j .message .ParameterizedMessage ;
14- import org .apache .lucene .store .IndexInput ;
1514import org .opensearch .action .LatchedActionListener ;
16- import org .opensearch .cluster .ClusterState ;
1715import org .opensearch .cluster .DiffableUtils ;
1816import org .opensearch .cluster .routing .IndexRoutingTable ;
1917import org .opensearch .cluster .routing .RoutingTable ;
20- import org .opensearch .common .blobstore .AsyncMultiStreamBlobContainer ;
21- import org .opensearch .common .blobstore .BlobContainer ;
2218import org .opensearch .common .blobstore .BlobPath ;
23- import org .opensearch .common .blobstore .stream .write .WritePriority ;
24- import org .opensearch .common .blobstore .transfer .RemoteTransferContainer ;
25- import org .opensearch .common .blobstore .transfer .stream .OffsetRangeIndexInputStream ;
26- import org .opensearch .common .io .stream .BytesStreamOutput ;
2719import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
28- import org .opensearch .common .lucene . store . ByteArrayIndexInput ;
20+ import org .opensearch .common .remote . RemoteWritableEntityStore ;
2921import org .opensearch .common .settings .ClusterSettings ;
30- import org .opensearch .common .settings .Setting ;
3122import org .opensearch .common .settings .Settings ;
3223import org .opensearch .common .util .io .IOUtils ;
3324import org .opensearch .core .action .ActionListener ;
34- import org .opensearch .core .common .bytes .BytesReference ;
35- import org .opensearch .core .index .Index ;
25+ import org .opensearch .core .compress .Compressor ;
3626import org .opensearch .gateway .remote .ClusterMetadataManifest ;
3727import org .opensearch .gateway .remote .RemoteStateTransferException ;
28+ import org .opensearch .gateway .remote .model .RemoteRoutingTableBlobStore ;
3829import org .opensearch .gateway .remote .routingtable .RemoteIndexRoutingTable ;
39- import org .opensearch .index .remote .RemoteStoreEnums ;
40- import org .opensearch .index .remote .RemoteStorePathStrategy ;
41- import org .opensearch .index .remote .RemoteStoreUtils ;
30+ import org .opensearch .index .translog .transfer .BlobStoreTransferService ;
4231import org .opensearch .node .Node ;
4332import org .opensearch .node .remotestore .RemoteStoreNodeAttribute ;
4433import org .opensearch .repositories .RepositoriesService ;
5140import java .util .List ;
5241import java .util .Map ;
5342import java .util .Optional ;
54- import java .util .concurrent .ExecutorService ;
5543import java .util .function .Function ;
5644import java .util .function .Supplier ;
5745import java .util .stream .Collectors ;
5846
59- import static org .opensearch .gateway .remote .RemoteClusterStateUtils .DELIMITER ;
6047import static org .opensearch .node .remotestore .RemoteStoreNodeAttribute .isRemoteRoutingTableEnabled ;
6148
6249/**
6653 */
6754public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {
6855
69- /**
70- * This setting is used to set the remote routing table store blob store path type strategy.
71- */
72- public static final Setting <RemoteStoreEnums .PathType > REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting <>(
73- "cluster.remote_store.routing_table.path_type" ,
74- RemoteStoreEnums .PathType .HASHED_PREFIX .toString (),
75- RemoteStoreEnums .PathType ::parseString ,
76- Setting .Property .NodeScope ,
77- Setting .Property .Dynamic
78- );
79-
80- /**
81- * This setting is used to set the remote routing table store blob store path hash algorithm strategy.
82- * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
83- * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
84- */
85- public static final Setting <RemoteStoreEnums .PathHashAlgorithm > REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting <>(
86- "cluster.remote_store.routing_table.path_hash_algo" ,
87- RemoteStoreEnums .PathHashAlgorithm .FNV_1A_BASE64 .toString (),
88- RemoteStoreEnums .PathHashAlgorithm ::parseString ,
89- Setting .Property .NodeScope ,
90- Setting .Property .Dynamic
91- );
92-
93- public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing" ;
94- public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing" ;
95- public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--" ;
96-
9756 private static final Logger logger = LogManager .getLogger (InternalRemoteRoutingTableService .class );
9857 private final Settings settings ;
9958 private final Supplier <RepositoriesService > repositoriesService ;
59+ private Compressor compressor ;
60+ private RemoteWritableEntityStore <IndexRoutingTable , RemoteIndexRoutingTable > remoteIndexRoutingTableStore ;
61+ private final ClusterSettings clusterSettings ;
10062 private BlobStoreRepository blobStoreRepository ;
101- private RemoteStoreEnums .PathType pathType ;
102- private RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ;
103- private ThreadPool threadPool ;
63+ private final ThreadPool threadPool ;
64+ private final String clusterName ;
10465
10566 public InternalRemoteRoutingTableService (
10667 Supplier <RepositoriesService > repositoriesService ,
10768 Settings settings ,
10869 ClusterSettings clusterSettings ,
109- ThreadPool threadpool
70+ ThreadPool threadpool ,
71+ String clusterName
11072 ) {
11173 assert isRemoteRoutingTableEnabled (settings ) : "Remote routing table is not enabled" ;
11274 this .repositoriesService = repositoriesService ;
11375 this .settings = settings ;
114- this .pathType = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING );
115- this .pathHashAlgo = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING );
116- clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING , this ::setPathTypeSetting );
117- clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING , this ::setPathHashAlgoSetting );
11876 this .threadPool = threadpool ;
119- }
120-
121- private void setPathTypeSetting (RemoteStoreEnums .PathType pathType ) {
122- this .pathType = pathType ;
123- }
124-
125- private void setPathHashAlgoSetting (RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ) {
126- this .pathHashAlgo = pathHashAlgo ;
77+ this .clusterName = clusterName ;
78+ this .clusterSettings = clusterSettings ;
12779 }
12880
12981 public List <IndexRoutingTable > getIndicesRouting (RoutingTable routingTable ) {
@@ -150,43 +102,31 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
150102
151103 /**
152104 * Async action for writing one {@code IndexRoutingTable} to remote store
153- * @param clusterState current cluster state
105+ * @param term current term
106+ * @param version current version
107+ * @param clusterUUID current cluster UUID
154108 * @param indexRouting indexRoutingTable to write to remote store
155109 * @param latchedActionListener listener for handling async action response
156- * @param clusterBasePath base path for remote file
157110 */
158111 @ Override
159- public void getIndexRoutingAsyncAction (
160- ClusterState clusterState ,
112+ public void getAsyncIndexRoutingWriteAction (
113+ String clusterUUID ,
114+ long term ,
115+ long version ,
161116 IndexRoutingTable indexRouting ,
162- LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener ,
163- BlobPath clusterBasePath
117+ LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener
164118 ) {
165119
166- BlobPath indexRoutingPath = clusterBasePath .add (INDEX_ROUTING_PATH_TOKEN );
167- BlobPath path = pathType .path (
168- RemoteStorePathStrategy .BasePathInput .builder ().basePath (indexRoutingPath ).indexUUID (indexRouting .getIndex ().getUUID ()).build (),
169- pathHashAlgo
170- );
171- final BlobContainer blobContainer = blobStoreRepository .blobStore ().blobContainer (path );
172-
173- final String fileName = getIndexRoutingFileName (clusterState .term (), clusterState .version ());
120+ RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable (indexRouting , clusterUUID , compressor , term , version );
174121
175122 ActionListener <Void > completionListener = ActionListener .wrap (
176- resp -> latchedActionListener .onResponse (
177- new ClusterMetadataManifest .UploadedIndexMetadata (
178- indexRouting .getIndex ().getName (),
179- indexRouting .getIndex ().getUUID (),
180- path .buildAsString () + fileName ,
181- INDEX_ROUTING_METADATA_PREFIX
182- )
183- ),
123+ resp -> latchedActionListener .onResponse (remoteIndexRoutingTable .getUploadedMetadata ()),
184124 ex -> latchedActionListener .onFailure (
185125 new RemoteStateTransferException ("Exception in writing index to remote store: " + indexRouting .getIndex ().toString (), ex )
186126 )
187127 );
188128
189- uploadIndex ( indexRouting , fileName , blobContainer , completionListener );
129+ remoteIndexRoutingTableStore . writeAsync ( remoteIndexRoutingTable , completionListener );
190130 }
191131
192132 /**
@@ -213,111 +153,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
213153 return new ArrayList <>(allUploadedIndicesRouting .values ());
214154 }
215155
216- private void uploadIndex (
217- IndexRoutingTable indexRouting ,
218- String fileName ,
219- BlobContainer blobContainer ,
220- ActionListener <Void > completionListener
221- ) {
222- RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable (indexRouting );
223- BytesReference bytesInput = null ;
224- try (BytesStreamOutput streamOutput = new BytesStreamOutput ()) {
225- indexRoutingInput .writeTo (streamOutput );
226- bytesInput = streamOutput .bytes ();
227- } catch (IOException e ) {
228- logger .error ("Failed to serialize IndexRoutingTable for [{}]: [{}]" , indexRouting , e );
229- completionListener .onFailure (e );
230- return ;
231- }
232-
233- if (blobContainer instanceof AsyncMultiStreamBlobContainer == false ) {
234- try {
235- blobContainer .writeBlob (fileName , bytesInput .streamInput (), bytesInput .length (), true );
236- completionListener .onResponse (null );
237- } catch (IOException e ) {
238- logger .error ("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]" , indexRouting , e );
239- completionListener .onFailure (e );
240- }
241- return ;
242- }
243-
244- try (IndexInput input = new ByteArrayIndexInput ("indexrouting" , BytesReference .toBytes (bytesInput ))) {
245- try (
246- RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer (
247- fileName ,
248- fileName ,
249- input .length (),
250- true ,
251- WritePriority .URGENT ,
252- (size , position ) -> new OffsetRangeIndexInputStream (input , size , position ),
253- null ,
254- false
255- )
256- ) {
257- ((AsyncMultiStreamBlobContainer ) blobContainer ).asyncBlobUpload (
258- remoteTransferContainer .createWriteContext (),
259- completionListener
260- );
261- } catch (IOException e ) {
262- logger .error ("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]" , indexRouting , e );
263- completionListener .onFailure (e );
264- }
265- } catch (IOException e ) {
266- logger .error (
267- "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]" ,
268- indexRouting ,
269- e
270- );
271- completionListener .onFailure (e );
272- }
273- }
274-
275156 @ Override
276157 public void getAsyncIndexRoutingReadAction (
158+ String clusterUUID ,
277159 String uploadedFilename ,
278- Index index ,
279160 LatchedActionListener <IndexRoutingTable > latchedActionListener
280161 ) {
281- int idx = uploadedFilename .lastIndexOf ("/" );
282- String blobFileName = uploadedFilename .substring (idx + 1 );
283- BlobContainer blobContainer = blobStoreRepository .blobStore ()
284- .blobContainer (BlobPath .cleanPath ().add (uploadedFilename .substring (0 , idx )));
285162
286- readAsync (
287- blobContainer ,
288- blobFileName ,
289- index ,
290- threadPool .executor (ThreadPool .Names .REMOTE_STATE_READ ),
291- ActionListener .wrap (
292- response -> latchedActionListener .onResponse (response .getIndexRoutingTable ()),
293- latchedActionListener ::onFailure
294- )
163+ ActionListener <IndexRoutingTable > actionListener = ActionListener .wrap (
164+ latchedActionListener ::onResponse ,
165+ latchedActionListener ::onFailure
295166 );
296- }
297167
298- private void readAsync (
299- BlobContainer blobContainer ,
300- String name ,
301- Index index ,
302- ExecutorService executorService ,
303- ActionListener <RemoteIndexRoutingTable > listener
304- ) {
305- executorService .execute (() -> {
306- try {
307- listener .onResponse (read (blobContainer , name , index ));
308- } catch (Exception e ) {
309- listener .onFailure (e );
310- }
311- });
312- }
168+ RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable (uploadedFilename , clusterUUID , compressor );
313169
314- private RemoteIndexRoutingTable read (BlobContainer blobContainer , String path , Index index ) {
315- try {
316- return new RemoteIndexRoutingTable (blobContainer .readBlob (path ), index );
317- } catch (IOException | AssertionError e ) {
318- logger .error (() -> new ParameterizedMessage ("RoutingTable read failed for path {}" , path ), e );
319- throw new RemoteStateTransferException ("Failed to read RemoteRoutingTable from Manifest with error " , e );
320- }
170+ remoteIndexRoutingTableStore .readAsync (remoteIndexRoutingTable , actionListener );
321171 }
322172
323173 @ Override
@@ -334,16 +184,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
334184 }).collect (Collectors .toList ());
335185 }
336186
337- private String getIndexRoutingFileName (long term , long version ) {
338- return String .join (
339- DELIMITER ,
340- INDEX_ROUTING_FILE_PREFIX ,
341- RemoteStoreUtils .invertLong (term ),
342- RemoteStoreUtils .invertLong (version ),
343- RemoteStoreUtils .invertLong (System .currentTimeMillis ())
344- );
345- }
346-
347187 @ Override
348188 protected void doClose () throws IOException {
349189 if (blobStoreRepository != null ) {
@@ -361,6 +201,16 @@ protected void doStart() {
361201 final Repository repository = repositoriesService .get ().repository (remoteStoreRepo );
362202 assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository" ;
363203 blobStoreRepository = (BlobStoreRepository ) repository ;
204+ compressor = blobStoreRepository .getCompressor ();
205+
206+ this .remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore <>(
207+ new BlobStoreTransferService (blobStoreRepository .blobStore (), threadPool ),
208+ blobStoreRepository ,
209+ clusterName ,
210+ threadPool ,
211+ ThreadPool .Names .REMOTE_STATE_READ ,
212+ clusterSettings
213+ );
364214 }
365215
366216 @ Override
@@ -376,5 +226,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
376226 throw e ;
377227 }
378228 }
379-
380229}
0 commit comments