Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.tests.index.RandomIndexWriter;
Expand All @@ -35,7 +33,6 @@
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
Expand All @@ -50,14 +47,14 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.LuceneDocument;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperServiceTestCase;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
Expand Down Expand Up @@ -92,13 +89,13 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
Expand All @@ -117,6 +114,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -130,7 +128,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RecoverySourceHandlerTests extends ESTestCase {
public class RecoverySourceHandlerTests extends MapperServiceTestCase {
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT).build()
Expand Down Expand Up @@ -248,19 +246,20 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
}

public void testSendSnapshotSendsOps() throws IOException {
IndexOpFactory iof = randomBoolean() ? new StandardModeIndexOpFactory() : new TimeSeriesModeIndexOpFactory();
final int fileChunkSizeInBytes = between(1, 4096);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(10, 1000);
for (int i = 0; i < initialNumberOfDocs; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
final Engine.Index index = iof.createIndexOp(i);
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true, index.id())));
}
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(10, 1000);
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
final Engine.Index index = iof.createIndexOp(i);
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true, index.id())));
}
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
Expand Down Expand Up @@ -321,13 +320,14 @@ public void indexTranslogOperations(
}

public void testSendSnapshotStopOnError() throws Exception {
IndexOpFactory iof = randomBoolean() ? new StandardModeIndexOpFactory() : new TimeSeriesModeIndexOpFactory();
final int fileChunkSizeInBytes = between(1, 10 * 1024);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final List<Translog.Operation> ops = new ArrayList<>();
for (int numOps = between(1, 256), i = 0; i < numOps; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
final Engine.Index index = iof.createIndexOp(i);
ops.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i, true, index.id())));
}
final AtomicBoolean wasFailed = new AtomicBoolean();
Expand Down Expand Up @@ -462,29 +462,72 @@ public void indexTranslogOperations(
assertThat(receivedSeqNos, equalTo(sentSeqNos));
}

private Engine.Index getIndex(String id) {
final LuceneDocument document = new LuceneDocument();
document.add(new TextField("test", "test", Field.Store.YES));
final Field idField = IdFieldMapper.standardIdField(id); // TODO tsdbid field could be different.
final Field versionField = new NumericDocValuesField("_version", Versions.MATCH_ANY);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
document.add(idField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[] { 1 });
final ParsedDocument doc = new ParsedDocument(
versionField,
seqID,
id,
null,
Arrays.asList(document),
source,
XContentType.JSON,
null
);
return new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc);
private interface IndexOpFactory {
Engine.Index createIndexOp(int docIdent);
}

private class StandardModeIndexOpFactory implements IndexOpFactory {
private final MapperService mapper;

private StandardModeIndexOpFactory() throws IOException {
mapper = createMapperService(mapping(b -> {}));
}

@Override
public Engine.Index createIndexOp(int docIdent) {
SourceToParse source = new SourceToParse(Integer.toString(docIdent), new BytesArray("{}"), XContentType.JSON);
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
return IndexShard.prepareIndex(
mapper,
source,
seqID.seqNo.numericValue().longValue(),
randomNonNegativeLong(),
Versions.MATCH_ANY,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
-1,
false,
UNASSIGNED_SEQ_NO,
0
);
}
}

private class TimeSeriesModeIndexOpFactory implements IndexOpFactory {
private final MapperService mapper;

private TimeSeriesModeIndexOpFactory() throws IOException {
mapper = createMapperService(
Settings.builder()
.put(IndexSettings.MODE.getKey(), "time_series")
.put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim")
.build(),
mapping(b -> b.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject())
);
}

@Override
public Engine.Index createIndexOp(int docIdent) {
SourceToParse source = new SourceToParse(null, new BytesArray(String.format(Locale.ROOT, """
{
"@timestamp": %s,
"dim": "dim"
}""", docIdent)), XContentType.JSON);
SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
return IndexShard.prepareIndex(
mapper,
source,
seqID.seqNo.numericValue().longValue(),
randomNonNegativeLong(),
Versions.MATCH_ANY,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
-1,
false,
UNASSIGNED_SEQ_NO,
0
);
}
}

public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
Expand Down