Skip to content

[BUG] Segment Replication - SegRep bytes behind and lag metrics incorrect post primary relocation #11211

@mch2

Description

@mch2

Describe the bug
The replication lag metric appears to be growing indefinitely even though no document has been indexed.

      "segment_replication" : {
        "max_bytes_behind" : 0,
        "total_bytes_behind" : 0,
        "max_replication_lag" : 7555216
      },

Some logs I captured

[2023-11-09T09:19:33,698][INFO ][o.o.i.r.RecoverySourceHandler] [45d10d39f1d283b11e920bc74e55fdf8] [test-index][1][recover to 2c02e270b7490c83796b2e271a4c490e] finalizing recovery took [46.6ms]
[2023-11-09T09:34:57,336][INFO ][o.o.i.r.RecoverySourceHandler] [45d10d39f1d283b11e920bc74e55fdf8] [test-index][1][recover to 49ec3af290fd4b0794a6bdb24d5bcda3] finalizing recovery took [62.2ms]
[2023-11-09T09:34:58,466][INFO ][o.o.i.r.RecoverySourceHandler] [45d10d39f1d283b11e920bc74e55fdf8] [test-index][1][recover to 10d328ac6d4e995c52df28caf143325c] finalizing recovery took [777.1ms]

Where the node assignment is as follows:

Recovered shard to 2c02e270b7490c83796b2e271a4c490e - old primary
Recovered shard to 49ec3af290fd4b0794a6bdb24d5bcda3 - replica
Recovered shard to 10d328ac6d4e995c52df28caf143325c - new primary

From the replica's logs it is trying to notify the old primary of its state instead of the new, after the time the primary relocated.

[2023-11-09T09:34:59,172][ERROR][o.o.i.r.SegmentReplicationTargetService] [49ec3af290fd4b0794a6bdb24d5bcda3] Failed to update visible checkpoint for replica [test-index][1], ReplicationCheckpoint{shardId=[test-index][1], primaryTerm=1, segmentsGen=6, version=9, size=0, codec=Lucene95}:
RemoteTransportException[[45d10d39f1d283b11e920bc74e55fdf8][internal:index/shard/replication/update_visible_checkpoint]]; nested: IndexNotFoundException[no such index [test-index]];
Caused by: [test-index/01RD3t9jZMSnCHvntC8p9DlQ] IndexNotFoundException[no such index [test-index]]
        at org.opensearch.indices.IndicesService.indexServiceSafe(IndicesService.java:734)
        at org.opensearch.indices.replication.SegmentReplicationSourceService$UpdateVisibleCheckpointRequestHandler.messageReceived(SegmentReplicationSourceService.java:157)
        at org.opensearch.indices.replication.SegmentReplicationSourceService$UpdateVisibleCheckpointRequestHandler.messageReceived(SegmentReplicationSourceService.java:153)
        at ...

I've found from logs that this happens after a primary relocation. From this it looks like the primary is moving to a new node and refreshing, publishing a checkpoint to its replicas and starting its timers, the replica syncs/discards the checkpoint and calls back to the old primary to update its state.
SegmentReplicationTargetService identifies the primary using:

        ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();

To Reproduce
Steps to reproduce the behavior:
This is not 100% reproducible...

  1. Create an index with n replicas
  2. Continuously relocate the primary to a new node
  3. Check /_cluster/stats and we will see lag even though no document has been indexed.

I have also been able to reproduce this case with an IT using NetworkDisruption, added this to SegmentReplicationUsingRemoteStoreDisruptionIT and fails 100% of the time:

    public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates() throws Exception {
        Path location = randomRepoPath().toAbsolutePath();
        Settings nodeSettings = Settings.builder()
            .put(buildRemoteStoreNodeAttributes(location, 0d, "metadata", Long.MAX_VALUE)).build();
        internalCluster().startClusterManagerOnlyNode(nodeSettings);
        internalCluster().startDataOnlyNodes(2, nodeSettings);
        final Settings indexSettings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
        createIndex(INDEX_NAME, indexSettings);
        ensureGreen(INDEX_NAME);
        final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
        final String replicaNode = getNode(dataNodeNames, false);
        final String oldPrimary = getNode(dataNodeNames, true);

        // index a doc.
        client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get();
        refresh(INDEX_NAME);

        logger.info("--> start another node");
        final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings);
        ClusterHealthResponse clusterHealthResponse = client().admin()
            .cluster()
            .prepareHealth()
            .setWaitForEvents(Priority.LANGUID)
            .setWaitForNodes("4")
            .get();
        assertEquals(clusterHealthResponse.isTimedOut(), false);

        SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(replicaNode, random(), 0, 0, 1000, 2000);
        internalCluster().setDisruptionScheme(disruption);
        disruption.startDisrupting();

        //relocate the primary
        logger.info("--> relocate the shard");
        client().admin()
            .cluster()
            .prepareReroute()
            .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary))
            .execute()
            .actionGet();
        clusterHealthResponse = client().admin()
            .cluster()
            .prepareHealth()
            .setWaitForEvents(Priority.LANGUID)
            .setWaitForNoRelocatingShards(true)
            .setTimeout(ACCEPTABLE_RELOCATION_TIME)
            .execute()
            .actionGet();
        assertEquals(clusterHealthResponse.isTimedOut(), false);

        IndexShard newPrimary_shard = getIndexShard(newPrimary, INDEX_NAME);
        IndexShard replica = getIndexShard(replicaNode, INDEX_NAME);
        assertBusy(() -> {
            assertEquals(
                newPrimary_shard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
                replica.getLatestReplicationCheckpoint().getSegmentInfosVersion()
            );
        });

        assertBusy(() -> {
            ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get();
            ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats();
            assertEquals(0L, replicationStats.maxBytesBehind);
            assertEquals(0L, replicationStats.maxReplicationLag);
            assertEquals(0L, replicationStats.totalBytesBehind);
        });
        disruption.stopDisrupting();
        disableRepoConsistencyCheck("Remote Store Creates System Repository");
        cleanupRepo();
    }

Expected behavior
Lag should not grow unless there is an active replication event in the the group.

Plugins
N/A

Metadata

Metadata

Assignees

No one assigned

    Labels

    Indexing:ReplicationIssues and PRs related to core replication framework eg segrepbugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions