-
Notifications
You must be signed in to change notification settings - Fork 25.9k
Introduce retention lease syncing #37398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1deaf16
a7eb62f
f74bca6
184874e
e20143c
8947835
6a10869
d6620ec
3d75772
11a8406
b2b4d5b
ae5faff
eb90d54
e5bb88c
690940b
da6e688
df5f711
37380dd
44f8afa
0496d83
1cb71ec
4380c65
50df0db
35850f4
fff8763
9413bf7
42ab377
54f2511
09fcc7c
922d69f
202922b
df30d48
3ec870d
13465a2
4a28ebf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ | |
| import com.carrotsearch.hppc.ObjectLongHashMap; | ||
| import com.carrotsearch.hppc.ObjectLongMap; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.support.replication.ReplicationResponse; | ||
| import org.elasticsearch.cluster.routing.AllocationId; | ||
| import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
| import org.elasticsearch.cluster.routing.ShardRouting; | ||
|
|
@@ -35,6 +37,7 @@ | |
| import org.elasticsearch.index.shard.ShardId; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
|
|
@@ -43,6 +46,7 @@ | |
| import java.util.Objects; | ||
| import java.util.OptionalLong; | ||
| import java.util.Set; | ||
| import java.util.function.BiConsumer; | ||
| import java.util.function.Function; | ||
| import java.util.function.LongConsumer; | ||
| import java.util.function.LongSupplier; | ||
|
|
@@ -142,6 +146,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L | |
| */ | ||
| private final LongSupplier currentTimeMillisSupplier; | ||
|
|
||
| /** | ||
| * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync | ||
| * retention leases to replicas. | ||
| */ | ||
| private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease; | ||
|
|
||
| /** | ||
| * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the | ||
| * current global checkpoint. | ||
|
|
@@ -156,7 +166,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L | |
| private final Map<String, RetentionLease> retentionLeases = new HashMap<>(); | ||
|
|
||
| /** | ||
| * Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned. | ||
| * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. | ||
| * | ||
| * @return the retention leases | ||
| */ | ||
|
|
@@ -174,15 +184,60 @@ public synchronized Collection<RetentionLease> getRetentionLeases() { | |
| } | ||
|
|
||
| /** | ||
| * Adds a new or updates an existing retention lease. | ||
| * Adds a new retention lease. | ||
| * | ||
| * @param id the identifier of the retention lease | ||
| * @param retainingSequenceNumber the retaining sequence number | ||
| * @param source the source of the retention lease | ||
| * @param listener the callback when the retention lease is successfully added and synced to replicas | ||
| * @return the new retention lease | ||
| * @throws IllegalArgumentException if the specified retention lease already exists | ||
| */ | ||
| public RetentionLease addRetentionLease( | ||
| final String id, | ||
| final long retainingSequenceNumber, | ||
| final String source, | ||
| final ActionListener<ReplicationResponse> listener) { | ||
| Objects.requireNonNull(listener); | ||
| final RetentionLease retentionLease; | ||
| final Collection<RetentionLease> currentRetentionLeases; | ||
| synchronized (this) { | ||
| assert primaryMode; | ||
| if (retentionLeases.containsKey(id)) { | ||
| throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists"); | ||
| } | ||
| retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); | ||
| retentionLeases.put(id, retentionLease); | ||
| currentRetentionLeases = retentionLeases.values(); | ||
| } | ||
| onNewRetentionLease.accept(Collections.unmodifiableCollection(new ArrayList<>(currentRetentionLeases)), listener); | ||
| return retentionLease; | ||
| } | ||
|
|
||
| /** | ||
| * Renews an existing retention lease. | ||
| * | ||
| * @param id the identifier of the retention lease | ||
| * @param retainingSequenceNumber the retaining sequence number | ||
| * @param source the source of the retention lease | ||
| * @return the renewed retention lease | ||
| * @throws IllegalArgumentException if the specified retention lease does not exist | ||
| */ | ||
| public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { | ||
| public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { | ||
| assert primaryMode; | ||
| retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source)); | ||
| if (retentionLeases.containsKey(id) == false) { | ||
| throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a concern about the strictness introduced here. I think clients of this API are generally going to be trying to ensure that there is a retention lease in place, and I worry about a situation where, for example, the system clock were to jump forward far enough for all existing leases to expire. I think it would be good in that case if clients were to re-establish these expired leases, but today they would be rejected by this method which means that clients will generally have to be prepared to call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this concern is a good one and worth discussing in a larger group. You can see what it presents for consumers in the current integration of CCR with shard history retention leases (WIP branch on my fork). That said, I am not sure how much to worry about this in practice. In most cases, the life of a retention lease should be significantly longer than the renewal period (e.g., default retention of twelve hours versus renewal frequencies of at most sixty seconds in the case of a CCR follower). And also longer than what most clocks jump by (e.g., for daylight savings time) except when they are completely misconfigured and then we have no guarantees. |
||
| } | ||
| final RetentionLease retentionLease = | ||
| new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); | ||
| final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease); | ||
| assert existingRetentionLease != null; | ||
| assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() : | ||
| "retention lease renewal for [" + id + "]" | ||
| + " from [" + source + "]" | ||
| + " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]" | ||
| + " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]"; | ||
| return retentionLease; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -440,18 +495,20 @@ private static long inSyncCheckpointStates( | |
| * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or | ||
| * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. | ||
| * | ||
| * @param shardId the shard ID | ||
| * @param allocationId the allocation ID | ||
| * @param indexSettings the index settings | ||
| * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} | ||
| * @param shardId the shard ID | ||
| * @param allocationId the allocation ID | ||
| * @param indexSettings the index settings | ||
| * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} | ||
| * @param onNewRetentionLease a callback when a new retention lease is created | ||
| */ | ||
| public ReplicationTracker( | ||
| final ShardId shardId, | ||
| final String allocationId, | ||
| final IndexSettings indexSettings, | ||
| final long globalCheckpoint, | ||
| final LongConsumer onGlobalCheckpointUpdated, | ||
| final LongSupplier currentTimeMillisSupplier) { | ||
| final LongSupplier currentTimeMillisSupplier, | ||
| final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) { | ||
| super(shardId, indexSettings); | ||
| assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; | ||
| this.shardAllocationId = allocationId; | ||
|
|
@@ -462,6 +519,7 @@ public ReplicationTracker( | |
| checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); | ||
| this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); | ||
| this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); | ||
| this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease); | ||
| this.pendingInSync = new HashSet<>(); | ||
| this.routingTable = null; | ||
| this.replicationGroup = null; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,206 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.elasticsearch.index.seqno; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.apache.lucene.store.AlreadyClosedException; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.admin.indices.flush.FlushRequest; | ||
| import org.elasticsearch.action.support.ActionFilters; | ||
| import org.elasticsearch.action.support.WriteResponse; | ||
| import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; | ||
| import org.elasticsearch.action.support.replication.ReplicationResponse; | ||
| import org.elasticsearch.action.support.replication.TransportWriteAction; | ||
| import org.elasticsearch.cluster.action.shard.ShardStateAction; | ||
| import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.inject.Inject; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
| import org.elasticsearch.index.shard.IndexShard; | ||
| import org.elasticsearch.index.shard.IndexShardClosedException; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.indices.IndicesService; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.TransportService; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Collection; | ||
| import java.util.Objects; | ||
|
|
||
| /** | ||
| * Write action responsible for syncing retention leases to replicas. This action is deliberately a write action so that if a replica misses | ||
| * a retention lease sync then that shard will be marked as stale. | ||
| */ | ||
| public class RetentionLeaseSyncAction extends | ||
|
dnhatn marked this conversation as resolved.
|
||
| TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> { | ||
|
|
||
| public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; | ||
|
|
||
| private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); | ||
|
|
||
| protected Logger getLogger() { | ||
| return LOGGER; | ||
| } | ||
|
|
||
| @Inject | ||
| public RetentionLeaseSyncAction( | ||
| final Settings settings, | ||
| final TransportService transportService, | ||
| final ClusterService clusterService, | ||
| final IndicesService indicesService, | ||
| final ThreadPool threadPool, | ||
| final ShardStateAction shardStateAction, | ||
| final ActionFilters actionFilters, | ||
| final IndexNameExpressionResolver indexNameExpressionResolver) { | ||
| super( | ||
| settings, | ||
| ACTION_NAME, | ||
| transportService, | ||
| clusterService, | ||
| indicesService, | ||
| threadPool, | ||
| shardStateAction, | ||
| actionFilters, | ||
| indexNameExpressionResolver, | ||
| RetentionLeaseSyncAction.Request::new, | ||
| RetentionLeaseSyncAction.Request::new, | ||
| ThreadPool.Names.MANAGEMENT); | ||
| } | ||
|
|
||
| /** | ||
| * Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails. | ||
| * | ||
| * @param shardId the shard to sync | ||
| * @param retentionLeases the retention leases to sync | ||
| * @param listener the callback to invoke when the sync completes normally or abnormally | ||
| */ | ||
| public void syncRetentionLeasesForShard( | ||
| final ShardId shardId, | ||
| final Collection<RetentionLease> retentionLeases, | ||
| final ActionListener<ReplicationResponse> listener) { | ||
| Objects.requireNonNull(shardId); | ||
| Objects.requireNonNull(retentionLeases); | ||
| Objects.requireNonNull(listener); | ||
| final ThreadContext threadContext = threadPool.getThreadContext(); | ||
| try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { | ||
| // we have to execute under the system context so that if security is enabled the sync is authorized | ||
| threadContext.markAsSystemContext(); | ||
| execute( | ||
| new RetentionLeaseSyncAction.Request(shardId, retentionLeases), | ||
| ActionListener.wrap( | ||
| listener::onResponse, | ||
| e -> { | ||
| if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { | ||
| getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); | ||
| } | ||
| listener.onFailure(e); | ||
| })); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) { | ||
| Objects.requireNonNull(request); | ||
| Objects.requireNonNull(primary); | ||
| // we flush to ensure that retention leases are committed | ||
| flush(primary); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the primary recovers from the safe commit, we will lose the committed retention leases. Should we copy them in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Let us get that in a follow-up. |
||
| return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger); | ||
| } | ||
|
|
||
| @Override | ||
| protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) { | ||
| Objects.requireNonNull(request); | ||
| Objects.requireNonNull(replica); | ||
| replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); | ||
| // we flush to ensure that retention leases are committed | ||
| flush(replica); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may need to deal with the fact that a sequence-based or file-based recovery with a synced-flush does not propagate the retention leases to a recovering replica.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and also hand-off on relocation. I intend to address these later. |
||
| return new WriteReplicaResult<>(request, null, null, replica, logger); | ||
| } | ||
|
|
||
| private void flush(final IndexShard indexShard) { | ||
| final FlushRequest flushRequest = new FlushRequest(); | ||
| flushRequest.force(true); | ||
| flushRequest.waitIfOngoing(true); | ||
| indexShard.flush(flushRequest); | ||
| } | ||
|
|
||
| public static final class Request extends ReplicatedWriteRequest<Request> { | ||
|
|
||
| private Collection<RetentionLease> retentionLeases; | ||
|
|
||
| public Collection<RetentionLease> getRetentionLeases() { | ||
| return retentionLeases; | ||
| } | ||
|
|
||
| public Request() { | ||
|
|
||
| } | ||
|
|
||
| public Request(final ShardId shardId, final Collection<RetentionLease> retentionLeases) { | ||
| super(Objects.requireNonNull(shardId)); | ||
| this.retentionLeases = Objects.requireNonNull(retentionLeases); | ||
| } | ||
|
|
||
| @Override | ||
| public void readFrom(final StreamInput in) throws IOException { | ||
| super.readFrom(in); | ||
| retentionLeases = in.readList(RetentionLease::new); | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(final StreamOutput out) throws IOException { | ||
| super.writeTo(Objects.requireNonNull(out)); | ||
| out.writeCollection(retentionLeases); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "Request{" + | ||
| "retentionLeases=" + retentionLeases + | ||
| ", shardId=" + shardId + | ||
| ", timeout=" + timeout + | ||
| ", index='" + index + '\'' + | ||
| ", waitForActiveShards=" + waitForActiveShards + | ||
| '}'; | ||
| } | ||
|
|
||
| } | ||
|
|
||
| public static final class Response extends ReplicationResponse implements WriteResponse { | ||
|
|
||
| @Override | ||
| public void setForcedRefresh(final boolean forcedRefresh) { | ||
| // ignore | ||
| } | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
| protected Response newResponseInstance() { | ||
| return new Response(); | ||
| } | ||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.