Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,31 +153,31 @@ private boolean isFollowerCommitBehindLastCommitIndex() {
return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
}

private boolean installSnapshot() {
if (installSnapshotEnabled) {
final SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
installSnapshot(snapshot);
return true;
}
} else {
// check installSnapshotNotification
final TermIndex firstAvailable = shouldNotifyToInstallSnapshot();
if (firstAvailable != null) {
notifyInstallSnapshot(firstAvailable);
return true;
}
}
return false;
}

@Override
public void run() throws IOException {
boolean installSnapshotRequired;
for(; isRunning(); mayWait()) {
installSnapshotRequired = false;

//HB period is expired OR we have messages OR follower is behind with commit index
if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {

if (installSnapshotEnabled) {
SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
installSnapshot(snapshot);
installSnapshotRequired = true;
}
} else {
TermIndex installSnapshotNotificationTermIndex = shouldNotifyToInstallSnapshot();
if (installSnapshotNotificationTermIndex != null) {
installSnapshot(installSnapshotNotificationTermIndex);
installSnapshotRequired = true;
}
}

appendLog(installSnapshotRequired || haveTooManyPendingRequests());

final boolean installingSnapshot = installSnapshot();
appendLog(installingSnapshot || haveTooManyPendingRequests());
}
getLeaderState().checkHealth(getFollower());
}
Expand Down Expand Up @@ -403,14 +403,14 @@ public void onNext(AppendEntriesReplyProto reply) {
}

try {
onNextImpl(reply);
onNextImpl(request, reply);
} catch(Exception t) {
LOG.error("Failed onNext request=" + request
+ ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t);
}
}

private void onNextImpl(AppendEntriesReplyProto reply) {
private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto reply) {
errCount.set(0);

if (!firstResponseReceived) {
Expand All @@ -435,8 +435,9 @@ private void onNextImpl(AppendEntriesReplyProto reply) {
break;
case INCONSISTENCY:
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
LOG.warn("{}: received {} reply with nextIndex {}", this, reply.getResult(), reply.getNextIndex());
updateNextIndex(Math.max(getFollower().getMatchIndex() + 1, reply.getNextIndex()));
LOG.warn("{}: received {} reply with nextIndex {}, request={}",
this, reply.getResult(), reply.getNextIndex(), request);
updateNextIndex(getNextIndexForInconsistency(request.getFirstIndex(), reply.getNextIndex()));
break;
default:
throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
Expand Down Expand Up @@ -662,7 +663,7 @@ private void installSnapshot(SnapshotInfo snapshot) {
snapshotRequestObserver.onCompleted();
grpcServerMetrics.onInstallSnapshot();
} catch (Exception e) {
LOG.warn("{}: failed to install snapshot {}: {}", this, snapshot.getFiles(), e);
LOG.warn(this + ": failed to installSnapshot " + snapshot, e);
if (snapshotRequestObserver != null) {
snapshotRequestObserver.onError(e);
}
Expand All @@ -684,17 +685,17 @@ private void installSnapshot(SnapshotInfo snapshot) {
}

/**
* Send installSnapshot request to Follower with only a notification that a snapshot needs to be installed.
* @param firstAvailableLogTermIndex the first available log's index on the Leader
* Send an installSnapshot notification request to the Follower.
* @param firstAvailable the first available log's index on the Leader
*/
private void installSnapshot(TermIndex firstAvailableLogTermIndex) {
LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, notify follower to install snapshot-{}",
this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), firstAvailableLogTermIndex);
private void notifyInstallSnapshot(TermIndex firstAvailable) {
LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}",
this, firstAvailable, getFollower().getNextIndex());

final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
// prepare and enqueue the notify install snapshot request.
final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailableLogTermIndex);
final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable);
if (LOG.isInfoEnabled()) {
LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
}
Expand Down Expand Up @@ -770,6 +771,7 @@ static class AppendEntriesRequest {
private final TermIndex previousLog;
private final int entriesCount;

private final TermIndex firstEntry;
private final TermIndex lastEntry;

private volatile Timestamp sendTime;
Expand All @@ -778,6 +780,7 @@ static class AppendEntriesRequest {
this.callId = proto.getServerRequest().getCallId();
this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null;
this.entriesCount = proto.getEntriesCount();
this.firstEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(0)): null;
this.lastEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(entriesCount - 1)): null;

this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), isHeartbeat());
Expand All @@ -792,7 +795,11 @@ TermIndex getPreviousLog() {
return previousLog;
}

public Timestamp getSendTime() {
long getFirstIndex() {
return Optional.ofNullable(firstEntry).map(TermIndex::getIndex).orElse(RaftLog.INVALID_LOG_INDEX);
}

Timestamp getSendTime() {
return sendTime;
}

Expand All @@ -811,10 +818,13 @@ boolean isHeartbeat() {

@Override
public String toString() {
final String entries = entriesCount == 0? ""
: entriesCount == 1? ",entry=" + firstEntry
: ",entries=" + firstEntry + "..." + lastEntry;
return JavaUtils.getClassSimpleName(getClass())
+ ":cid=" + callId
+ ",entriesCount=" + entriesCount
+ ",lastEntry=" + lastEntry;
+ entries;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.Preconditions;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -90,24 +92,22 @@ static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId,
long currentTerm, InstallSnapshotResult result, long installedSnapshotIndex) {
final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
replyId, isSuccess(result));
final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
.newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result);
if (installedSnapshotIndex > 0) {
builder.setSnapshotIndex(installedSnapshotIndex);
}
return builder.build();
final boolean success = isSuccess(result);
Preconditions.assertTrue(success || installedSnapshotIndex == RaftLog.INVALID_LOG_INDEX,
() -> "result=" + result + " but installedSnapshotIndex=" + installedSnapshotIndex);
final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId, replyId, success);
return InstallSnapshotReplyProto.newBuilder()
.setServerReply(rb)
.setTerm(currentTerm)
.setResult(result)
.setSnapshotIndex(installedSnapshotIndex > 0? installedSnapshotIndex: 0)
.build();
}

static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId,
InstallSnapshotResult result) {
final RaftRpcReplyProto.Builder rb = toRaftRpcReplyProtoBuilder(requestorId,
replyId, isSuccess(result));
final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
.newBuilder().setServerReply(rb).setResult(result);
return builder.build();
long currentTerm, InstallSnapshotResult result) {
return toInstallSnapshotReplyProto(requestorId, replyId, currentTerm, result, RaftLog.INVALID_LOG_INDEX);
}

static ReadIndexRequestProto toReadIndexRequestProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt

// There is a mismatch between configurations on leader and follower.
final InstallSnapshotReplyProto failedReply = ServerProtoUtils.toInstallSnapshotReplyProto(
leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
leaderId, request.hasSnapshotChunk(), server.getId(), installSnapshotEnabled);
Expand Down Expand Up @@ -209,7 +209,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
currentTerm = state.getCurrentTerm();
if (!recognized) {
final InstallSnapshotReplyProto reply = ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
currentTerm, InstallSnapshotResult.NOT_LEADER);
LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId());
return reply;
}
Expand Down Expand Up @@ -308,7 +308,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
}

// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
Expand All @@ -335,7 +335,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
InstallSnapshotResult.IN_PROGRESS);
}
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
currentTerm, InstallSnapshotResult.IN_PROGRESS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,22 @@ private TermIndex getPrevious(long nextIndex) {
return null;
}

protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) {
long next = replyNextIndex;
final long i = getFollower().getMatchIndex() + 1;
if (i > next && i != requestFirstIndex) {
// Ideally, we should set nextIndex to a value greater than matchIndex.
// However, we must not resend the same first entry due to some special cases (e.g. the log is empty).
// Otherwise, the follower will reply INCONSISTENCY again.
next = i;
}
if (next == requestFirstIndex && next > RaftLog.LEAST_VALID_LOG_INDEX) {
// Avoid resending the same first entry.
next--;
}
return next;
}

@Override
public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -170,10 +171,18 @@ public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result, lon
*/
@Test
public void testAddNewFollowers() throws Exception {
runWithNewCluster(1, this::testAddNewFollowers);
final int numRequests = SNAPSHOT_TRIGGER_THRESHOLD*2 - 1; // trigger a snapshot
runWithNewCluster(1, c -> testAddNewFollowers(c, numRequests));
}

private void testAddNewFollowers(CLUSTER cluster) throws Exception {
@Test
public void testAddNewFollowersNoSnapshot() throws Exception {
final int numRequests = SNAPSHOT_TRIGGER_THRESHOLD/8; // do not trigger a snapshot;
runWithNewCluster(1, c -> testAddNewFollowers(c, numRequests));
}

private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Exception {
final boolean shouldInstallSnapshot = numRequests >= SNAPSHOT_TRIGGER_THRESHOLD;
leaderSnapshotInfoRef.set(null);
final List<LogSegmentPath> logs;
int i = 0;
Expand All @@ -182,24 +191,27 @@ private void testAddNewFollowers(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = cluster.getLeader().getId();

try(final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
for (; i < numRequests; i++) {
final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}

// wait for the snapshot to be done
final RaftServer.Division leader = cluster.getLeader();
final long nextIndex = leader.getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
if (shouldInstallSnapshot) {
// wait for the snapshot to be done
final RaftServer.Division leader = cluster.getLeader();
final long nextIndex = leader.getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
} else {
logs = Collections.emptyList();
}
} finally {
cluster.shutdown();
}
Expand Down Expand Up @@ -238,8 +250,9 @@ private void testAddNewFollowers(CLUSTER cluster) throws Exception {
// Check the installed snapshot index on each Follower matches with the
// leader snapshot.
for (RaftServer.Division follower : cluster.getFollowers()) {
Assert.assertEquals(leaderSnapshotInfo.getIndex(),
RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
final long expected = shouldInstallSnapshot ? leaderSnapshotInfo.getIndex() : RaftLog.INVALID_LOG_INDEX;
Assert.assertEquals(expected, RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
RaftSnapshotBaseTest.assertLogContent(follower, false);
}

// restart the peer and check if it can correctly handle conf change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,28 @@ public static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startInd

public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftLog leaderLog = leader.getRaftLog();
final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
final LogEntryProto e = leaderLog.get(lastIndex);
assertLogContent(leader, true);
}

public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception {
final RaftLog log = server.getRaftLog();
final long lastIndex = log.getLastEntryTermIndex().getIndex();
final LogEntryProto e = log.get(lastIndex);
Assert.assertTrue(e.hasMetadataEntry());

JavaUtils.attemptRepeatedly(() -> {
Assert.assertEquals(leaderLog.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
Assert.assertEquals(log.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex());
return null;
}, 50, BaseTest.HUNDRED_MILLIS, "CheckMetadataEntry", LOG);

SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(leader);
Assert.assertTrue("Is not notified as a leader", simpleStateMachine.isNotifiedAsLeader());
SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(server);
if (isLeader) {
Assert.assertTrue("Not notified as a leader", simpleStateMachine.isNotifiedAsLeader());
}
final LogEntryProto[] entries = simpleStateMachine.getContent();
long message = 0;
for (int i = 0; i < entries.length; i++) {
LOG.info("{}) {} {}", i, message, entries[i]);
LOG.info("{}) {} {}", i, message, entries[i].toString().replace("\n", ", "));
if (entries[i].hasStateMachineLogEntry()) {
final SimpleMessage m = new SimpleMessage("m" + message++);
Assert.assertArrayEquals(m.getContent().toByteArray(),
Expand Down