diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index d65c203ab17de..741090c7adcdb 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -9,14 +9,12 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.Term; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.tests.index.RandomIndexWriter; @@ -35,7 +33,6 @@ import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; @@ -50,14 +47,14 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.engine.SegmentsStats; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.LuceneDocument; -import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MapperServiceTestCase; import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeases; @@ -92,13 +89,13 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -117,6 +114,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -130,7 +128,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class RecoverySourceHandlerTests extends ESTestCase { +public class RecoverySourceHandlerTests extends MapperServiceTestCase { private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( "index", Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build() @@ -248,6 +246,7 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException { } public void testSendSnapshotSendsOps() throws IOException { + IndexOpFactory iof = randomBoolean() ? new StandardModeIndexOpFactory() : new TimeSeriesModeIndexOpFactory(); final int fileChunkSizeInBytes = between(1, 4096); final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); @@ -255,12 +254,12 @@ public void testSendSnapshotSendsOps() throws IOException { final List operations = new ArrayList<>(); final int initialNumberOfDocs = randomIntBetween(10, 1000); for (int i = 0; i < initialNumberOfDocs; i++) { - final Engine.Index index = getIndex(Integer.toString(i)); + final Engine.Index index = iof.createIndexOp(i); operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true, index.id()))); } final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(10, 1000); for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) { - final Engine.Index index = getIndex(Integer.toString(i)); + final Engine.Index index = iof.createIndexOp(i); operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true, index.id()))); } final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); @@ -321,13 +320,14 @@ public void indexTranslogOperations( } public void testSendSnapshotStopOnError() throws Exception { + IndexOpFactory iof = randomBoolean() ? new StandardModeIndexOpFactory() : new TimeSeriesModeIndexOpFactory(); final int fileChunkSizeInBytes = between(1, 10 * 1024); final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); final List ops = new ArrayList<>(); for (int numOps = between(1, 256), i = 0; i < numOps; i++) { - final Engine.Index index = getIndex(Integer.toString(i)); + final Engine.Index index = iof.createIndexOp(i); ops.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i, true, index.id()))); } final AtomicBoolean wasFailed = new AtomicBoolean(); @@ -462,29 +462,72 @@ public void indexTranslogOperations( assertThat(receivedSeqNos, equalTo(sentSeqNos)); } - private Engine.Index getIndex(String id) { - final LuceneDocument document = new LuceneDocument(); - document.add(new TextField("test", "test", Field.Store.YES)); - final Field idField = IdFieldMapper.standardIdField(id); // TODO tsdbid field could be different. - final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY); - final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(idField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - final BytesReference source = new BytesArray(new byte[] { 1 }); - final ParsedDocument doc = new ParsedDocument( - versionField, - seqID, - id, - null, - Arrays.asList(document), - source, - XContentType.JSON, - null - ); - return new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc); + private interface IndexOpFactory { + Engine.Index createIndexOp(int docIdent); + } + + private class StandardModeIndexOpFactory implements IndexOpFactory { + private final MapperService mapper; + + private StandardModeIndexOpFactory() throws IOException { + mapper = createMapperService(mapping(b -> {})); + } + + @Override + public Engine.Index createIndexOp(int docIdent) { + SourceToParse source = new SourceToParse(Integer.toString(docIdent), new BytesArray("{}"), XContentType.JSON); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + return IndexShard.prepareIndex( + mapper, + source, + seqID.seqNo.numericValue().longValue(), + randomNonNegativeLong(), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + -1, + false, + UNASSIGNED_SEQ_NO, + 0 + ); + } + } + + private class TimeSeriesModeIndexOpFactory implements IndexOpFactory { + private final MapperService mapper; + + private TimeSeriesModeIndexOpFactory() throws IOException { + mapper = createMapperService( + Settings.builder() + .put(IndexSettings.MODE.getKey(), "time_series") + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim") + .build(), + mapping(b -> b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject()) + ); + } + + @Override + public Engine.Index createIndexOp(int docIdent) { + SourceToParse source = new SourceToParse(null, new BytesArray(String.format(Locale.ROOT, """ + { + "@timestamp": %s, + "dim": "dim" + }""", docIdent)), XContentType.JSON); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + return IndexShard.prepareIndex( + mapper, + source, + seqID.seqNo.numericValue().longValue(), + randomNonNegativeLong(), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + -1, + false, + UNASSIGNED_SEQ_NO, + 0 + ); + } } public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {