Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1deaf16
Introduce retention lease syncing
jasontedor Jan 13, 2019
a7eb62f
Fix toString
jasontedor Jan 14, 2019
f74bca6
Merge branch 'master' into sync-retention-leases
jasontedor Jan 15, 2019
184874e
Fix test bug
jasontedor Jan 15, 2019
e20143c
Merge branch 'master' into sync-retention-leases
jasontedor Jan 15, 2019
8947835
Merge branch 'master' into sync-retention-leases
jasontedor Jan 15, 2019
6a10869
Merge branch 'master' into sync-retention-leases
jasontedor Jan 15, 2019
d6620ec
Run under permit
jasontedor Jan 15, 2019
3d75772
Do not hold a lock on callback
jasontedor Jan 15, 2019
11a8406
Revert unneeded permit
jasontedor Jan 15, 2019
b2b4d5b
Fewer replicas
jasontedor Jan 15, 2019
ae5faff
Merge remote-tracking branch 'elastic/master' into sync-retention-leases
jasontedor Jan 15, 2019
eb90d54
Fix imports
jasontedor Jan 15, 2019
e5bb88c
Merge remote-tracking branch 'elastic/master' into sync-retention-leases
jasontedor Jan 21, 2019
690940b
Use write action
jasontedor Jan 21, 2019
da6e688
Separate add and update retention lease APIs
jasontedor Jan 21, 2019
df5f711
Wip listener
jasontedor Jan 23, 2019
37380dd
Merge remote-tracking branch 'elastic/master' into sync-retention-leases
jasontedor Jan 24, 2019
44f8afa
Tighten javadocs
jasontedor Jan 24, 2019
0496d83
Javadocs
jasontedor Jan 24, 2019
1cb71ec
Merge remote-tracking branch 'elastic/master' into sync-retention-leases
jasontedor Jan 24, 2019
4380c65
Use a dedicated interface
jasontedor Jan 24, 2019
50df0db
Fix imports
jasontedor Jan 25, 2019
35850f4
Remove unnecessary throws
jasontedor Jan 25, 2019
fff8763
Javadocs
jasontedor Jan 25, 2019
9413bf7
Add security privileges
jasontedor Jan 25, 2019
42ab377
Flush on new lease
jasontedor Jan 25, 2019
54f2511
Add comment
jasontedor Jan 25, 2019
09fcc7c
Add assertion
jasontedor Jan 25, 2019
922d69f
Merge remote-tracking branch 'elastic/master' into sync-retention-leases
jasontedor Jan 25, 2019
202922b
Update server/src/test/java/org/elasticsearch/index/seqno/RetentionLe…
jasontedor Jan 26, 2019
df30d48
Move comment
jasontedor Jan 26, 2019
3ec870d
Fix typo and method name
jasontedor Jan 26, 2019
13465a2
Rename
jasontedor Jan 26, 2019
4a28ebf
Remove newline
jasontedor Jan 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public synchronized void onSuccess(boolean forcedRefresh) {
/**
* Result of taking the action on the replica.
*/
protected static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
extends ReplicaResult implements RespondingWriteResult {
public final Location location;
boolean finishedAsyncActions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -310,7 +311,11 @@ private long getAvgShardSizeInBytes() throws IOException {
}
}

public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand Down Expand Up @@ -398,6 +403,7 @@ public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardI
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
(retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener),
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -43,6 +46,7 @@
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -142,6 +146,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private final LongSupplier currentTimeMillisSupplier;

/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
Expand All @@ -156,7 +166,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();

/**
* Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
*
* @return the retention leases
*/
Expand All @@ -174,15 +184,60 @@ public synchronized Collection<RetentionLease> getRetentionLeases() {
}

/**
* Adds a new or updates an existing retention lease.
* Adds a new retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @param listener the callback when the retention lease is successfully added and synced to replicas
* @return the new retention lease
* @throws IllegalArgumentException if the specified retention lease already exists
*/
public RetentionLease addRetentionLease(
final String id,
final long retainingSequenceNumber,
final String source,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final Collection<RetentionLease> currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.containsKey(id)) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists");
}
retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
retentionLeases.put(id, retentionLease);
currentRetentionLeases = retentionLeases.values();
}
onNewRetentionLease.accept(Collections.unmodifiableCollection(new ArrayList<>(currentRetentionLeases)), listener);
Comment thread
jasontedor marked this conversation as resolved.
return retentionLease;
}

/**
* Renews an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the renewed retention lease
* @throws IllegalArgumentException if the specified retention lease does not exist
*/
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
if (retentionLeases.containsKey(id) == false) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a concern about the strictness introduced here. I think clients of this API are generally going to be trying to ensure that there is a retention lease in place, and I worry about a situation where, for example, the system clock were to jump forward far enough for all existing leases to expire. I think it would be good in that case if clients were to re-establish these expired leases, but today they would be rejected by this method which means that clients will generally have to be prepared to call addRetentionLease if they discover that their lease has expired. I think it'd be better if we implemented that fall-back behaviour here where clients can't forget to call it (and where it can be correctly synchronised).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this concern is a good one and worth discussing in a larger group. You can see what it presents for consumers in the current integration of CCR with shard history retention leases (WIP branch on my fork).

That said, I am not sure how much to worry about this in practice. In most cases, the life of a retention lease should be significantly longer than the renewal period (e.g., default retention of twelve hours versus renewal frequencies of at most sixty seconds in the case of a CCR follower). And also longer than what most clocks jump by (e.g., for daylight savings time) except when they are completely misconfigured and then we have no guarantees.

}
final RetentionLease retentionLease =
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease);
assert existingRetentionLease != null;
assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
"retention lease renewal for [" + id + "]"
+ " from [" + source + "]"
+ " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
+ " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
return retentionLease;
}

/**
Expand Down Expand Up @@ -440,18 +495,20 @@ private static long inSyncCheckpointStates(
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onNewRetentionLease a callback when a new retention lease is created
*/
public ReplicationTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier) {
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -462,6 +519,7 @@ public ReplicationTracker(
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;

/**
* Write action responsible for syncing retention leases to replicas. This action is deliberately a write action so that if a replica misses
* a retention lease sync then that shard will be marked as stale.
*/
public class RetentionLeaseSyncAction extends
Comment thread
dnhatn marked this conversation as resolved.
TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";

private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
return LOGGER;
}

@Inject
public RetentionLeaseSyncAction(
final Settings settings,
final TransportService transportService,
final ClusterService clusterService,
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT);
}

/**
* Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally
*/
public void syncRetentionLeasesForShard(
final ShardId shardId,
final Collection<RetentionLease> retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
Objects.requireNonNull(listener);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new RetentionLeaseSyncAction.Request(shardId, retentionLeases),
ActionListener.wrap(
listener::onResponse,
e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
}
listener.onFailure(e);
}));
}
}

@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
// we flush to ensure that retention leases are committed
flush(primary);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the primary recovers from the safe commit, we will lose the committed retention leases. Should we copy them in Store#trimUnsafeCommits ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Let us get that in a follow-up.

return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
}

@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
// we flush to ensure that retention leases are committed
flush(replica);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to deal with the fact that a sequence-based or file-based recovery with a synced-flush does not propagate the retention leases to a recovering replica.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and also hand-off on relocation. I intend to address these later.

return new WriteReplicaResult<>(request, null, null, replica, logger);
}

private void flush(final IndexShard indexShard) {
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(true);
flushRequest.waitIfOngoing(true);
indexShard.flush(flushRequest);
}

public static final class Request extends ReplicatedWriteRequest<Request> {

private Collection<RetentionLease> retentionLeases;

public Collection<RetentionLease> getRetentionLeases() {
return retentionLeases;
}

public Request() {

}

public Request(final ShardId shardId, final Collection<RetentionLease> retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retentionLeases = in.readList(RetentionLease::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(Objects.requireNonNull(out));
out.writeCollection(retentionLeases);
}

@Override
public String toString() {
return "Request{" +
"retentionLeases=" + retentionLeases +
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", waitForActiveShards=" + waitForActiveShards +
'}';
}

}

public static final class Response extends ReplicationResponse implements WriteResponse {

@Override
public void setForcedRefresh(final boolean forcedRefresh) {
// ignore
}

}

@Override
protected Response newResponseInstance() {
return new Response();
}

}
Loading