Skip to content

[Segment Replication] Design Proposal #2229

@mch2

Description

@mch2

This document outlines a proposal for implementing segment replication without additional external dependencies. This is a WIP. Please feel free to leave comments below with any thoughts or suggestions!

Objective:
Copy Lucene’s immutable segment files to replicas instead of document replication where documents are sent to and re-indexed on all replicas. This will improve indexing throughput and lower resource utilization on replicas at the expense of increased network usage. The RFC has a good introduction to both replication strategies.

Requirements:
With the feature enabled, primary shards will be the only shards indexing into Lucene. Replica shards exist as read only copies. After a primary shard creates new segment files, either from indexing or a segment merge, they are sent to replicas where they are made searchable.

Proposal:

segrep

Segment replication will be triggered when a primary shard refreshes. A refresh on a primary shard occurs for various reasons, for example on a schedule, after a merge, or flush. During a refresh, Lucene performs an os flush that writes the latest in memory segment files to the filesystem, making them available for replicas to fetch. A listener will trigger after the primary finishes refresh and send a notification to replicas. This notification will include a ReplicationCheckpoint that includes the sequence number associated with the latest document indexed, the latest commit generation, and primary term. If the replica determines that it is behind this checkpoint, it initiates a replication process.

seq2

  1. Lucene's SegmentInfos object references the set of active segments on the primary. Metadata includes names and checksums for all current segment files. The replica first fetches the latest SegmentInfos and file metadata associated with the received Checkpoint from the primary. When the primary receives this request it will incRef all files in the checkpoint to ensure they are not merged away until copy completes.
  2. The replica computes a diff against its local file metadata to determine which files are missing or changed.
  3. The replica requests missing and changed files from the primary shard.
  4. The primary shard sends all requested files to the replica. Replicas validate the files with Lucene checksum.
  5. Replicas validate their store ensuring all segment files referenced by the copied SegmentInfos are present. They then clean their local directory of any files not referenced. This will remove merged away segments.
  6. The replica refreshes its local OpenSearchDirectoryReader with the copied SegmentInfos object, making the new segments searchable. This is safe to do because we have already copied new/updated segments from the primary that will be referenced by the updated SegmentInfos.

Read only replicas:
To make replica shards read only, we need to disable their IndexWriter and noop all operations that attempt to interact with it inside of the engine. The POC does with a config param sent to the engine to conditionally start up a writer. To make this cleaner, creating separate engine and shard implementations for replicas is currently being explored.

Replication checkpoints:
ReplicationCheckpoints will be processed serially when received by replicas. If there is an active replication event when a checkpoint is received, the replica will store and attempt to process the checkpoint after it completes. Primary shards will compute the latest metadata and segmentInfos, even if this data is ahead of the requested checkpoint. The actual checkpoint processed will be computed and returned to the replica and compared against the latest received checkpoint.

Durability:
With the initial implementation, all documents will still be sent to each shard and persisted in the transaction log. Operations will not be purged from a replica's copy of the translog until a new commit point is received. While we have the translog to replay operations, we must still guarantee the stability of the index on disk so that it can be started and operations replayed. To maintain OpenSearch's durability guarantee, segments on both primary and replica shards will be by default fsynced to disk on commit. On replicas we recognize that a commit has occurred with an increase in the commit generation sent with the ReplicationCheckpoint.

In addition to fsyncs, we need to ensure that at all times replicas have the previous commit point. It is possible for a replica to be added after a Primary has been created, so sending the latest SegmentInfos info from a primary can leave the replica in a corrupt state. To ensure each replica has a both the latest SegmentInfos and all segments referenced by the latest commit point, the primary will include both in its metadata list returned to replicas.

For users who do not require this level of durability or it is acceptable to recover from a snapshot, I propose we add an additional setting that disables fsyncs on replicas and prevents sending the additional merged away files.

Merge:
There is no special logic for merges. After a primary completes a merge, it will refresh to open up a new reader on new segments and mark the old segments for deletion. This refresh will trigger our listener and publish a new ReplicationCheckpoint to replicas.

Shard startup:
Currently when replica shards are started they go through the peer recovery service and recover from the active primary. With segment replication shards will be created as empty and a notification sent to the primary to initiate tracking on the new replica. This ensures that all future checkpoints will be sent to the replica and will initiate replication on the next refresh.

Performance:
Early performance tests show improvements with segment replication enabled. This run using OpenSearch benchmark showed a ~40-45% drop in CPU and Memory usage, a 19% drop in p99 latency and a 57% increase in p100 throughput.

Instance type: m5.xlarge
Cluster Details: 3 Nodes with 6 shards and 1 replica each.
Test Dataset: Stackoverflow for 3 test iterations with 2 warmup iterations.

IOPS:
Document Replication: (Read 852k + Write 71k) / 1hr = 256 IOPS
Segment Replication: (Read 145k + Write 1M) / 1 hr = 318 IOPS

Total Bandwidth used:
Document Replication: 527 Gb
Segment Replication: 929 Gb
Main branch - Document Replication P0 P50 P99 P100
index - throughput req/s 17,401.60 18,661 N/A 22,203.40
index - latency (ms) 2,139.50 3,031.90 4,315.10 6,876.70
CPU 74 98 99 99
Memory 45 61 66 68
Feature branch - Segment Replication P0 P50 P99 P100
index - throughput req/s 28,795.90 31,525.70 N/A 34,845.70
index - latency (ms) 1,213.40 1,921 3,495.50 6,060.60
CPU 51.5 53 53 53
Memory 31.5 38 38 38

Data from more test runs will be added here as we continue building out the feature and run tests with different cluster configurations.

Failover: #2212

Shard Allocation: #6210

Configuration:
Segment replication will be disabled by default and enabled with a setting during index creation. To begin with this will be a simple boolean setting.

FAQ:

  1. Why bother sending checkpoints? Can the primary send over new segments when they are available?
    This will be a lot of load on primary shards. They will have to first fetch and compute the metadata diff from every replica and then orchestrate their copy and hold that state between requests.
  2. How do we include extension points for future improvements?
    While the first implementation will use the primary shard as our object store and transport layer for messaging, we will extract interfaces allowing for these to be swapped out for other options.
  3. Can we migrate existing indices to use Segment Replication?
    This should be supported in future versions but not part of the initial launch of the feature.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions