Skip to content

[BUG] [Segment Replication] Possible data loss (or inconsistencies) during primary relocation and noticed issues with performSegRep within Peer Recovery code #5848

@ashking94

Description

@ashking94

Describe the bug
During relocation, before starting relocation handoff, in the IndexShardopearionPermits#blockOperations, all permits are acquired and then relocation handoff is performed. Below is the code for the same -

<E extends Exception> void blockOperations(final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked)
    throws InterruptedException, TimeoutException, E {
    delayOperations();
    try (Releasable ignored = acquireAll(timeout, timeUnit)) {
        onBlocked.run();
    } finally {
        releaseDelayedOperations();
    }
}

With #5344, we have changed the code here to allow force segment replication to happen. However, with this change we have also made the code from sync & blocking to async & non-blocking. To better explain this, refer the code before the PR. The runnable that is passed as the 2nd argument to the blockOperations gets executed until the last line before the control goes back to the caller of the blockOperations method. There is a transport network call for handOffPrimaryContext also waits for the response and then only proceeds to the rest of handoff process. Post the PR, for seg rep use cases, we have an async step listener. Now what happens is that once the async execution starts, we come out of the onBlocked.run() and go to the releaseDelayedOperations. This leads to indexing operation played on older primary with older primary hold primaryMode as true in parallel with performSegRep happening which will eventually change the primary mode on the new primary from false to true. There are multiple cases of how source and target can see primary mode during a lifetime of write request which is scary.

Before the PR -

public void relocated(final String targetAllocationId, final Consumer<ReplicationTracker.PrimaryContext> consumer)
    throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
    assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
    try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
        indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
            forceRefreshes.close();
            // no shard operation permits are being held here, move state from started to relocated
            assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
                : "in-flight operations in progress while moving shard state to relocated";
            /*
             * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
             * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
             */
            verifyRelocatingState();
            final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId);
            try {
                consumer.accept(primaryContext);
                synchronized (mutex) {
                    verifyRelocatingState();
                    replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under mutex
                }
            } catch (final Exception e) {
                try {
                    replicationTracker.abortRelocationHandoff();
                } catch (final Exception inner) {
                    e.addSuppressed(inner);
                }
                throw e;
            }
        });
    } catch (TimeoutException e) {
        logger.warn("timed out waiting for relocation hand-off to complete");
        // This is really bad as ongoing replication operations are preventing this shard from completing relocation hand-off.
        // Fail primary relocation source and target shards.
        failShard("timed out waiting for relocation hand-off to complete", null);
        throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
    }
}

After the PR -

final StepListener<Void> segRepSyncListener = new StepListener<>();
performSegRep.accept(segRepSyncListener);
segRepSyncListener.whenComplete(r -> {
    /*
     * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
     * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
     */
    verifyRelocatingState();
    final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId);
    try {
        consumer.accept(primaryContext);
        synchronized (mutex) {
            verifyRelocatingState();
            replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under
                                                            // mutex
        }
    } catch (final Exception e) {
        try {
            replicationTracker.abortRelocationHandoff();
        } catch (final Exception inner) {
            e.addSuppressed(inner);
        }
        throw e;
    }
    listener.onResponse(null);
}, listener::onFailure);

To Reproduce
Steps to reproduce the behavior:
Currently the performSegRep call itself was timing out. If we can make it work, we can follow below steps -

  1. Create index
curl -X PUT "localhost:9200/test-index?pretty" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "replication.type" : "SEGMENT"
  }
}
'
  1. Start indexing lot of docs in parallel and in order of 1000s.
for i in {1..1000}
do
   curl --location --request POST "localhost:9202/test-index/_doc" \
    --header 'Content-Type: application/json' \
    --data-raw "{
      \"name\":\"abc${i}\"
    }"
    echo "\n"
done
  1. Start primary relocation -
    (from opensearch-node1 to opensearch-node2)
curl -XPUT localhost:9201/test-index/_settings -H 'Content-Type: application/json' -d '
{
  "index.routing.allocation.include._name": "opensearch-node2"
}'

Expected behavior
A clear and concise description of what you expected to happen.

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions