Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.MultipeerSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager;
import tech.pegasys.teku.beacon.sync.forward.singlepeer.SinglePeerSyncServiceFactory;
import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher;
import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final SyncReorgManager syncReorgManager;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final Duration startupTimeout;
Expand All @@ -98,6 +100,7 @@ public DefaultSyncServiceFactory(
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final SyncReorgManager syncReorgManager,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
final Duration startupTimeout,
Expand All @@ -119,6 +122,7 @@ public DefaultSyncServiceFactory(
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.syncReorgManager = syncReorgManager;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
this.startupTimeout = startupTimeout;
Expand Down Expand Up @@ -208,6 +212,7 @@ protected ForwardSyncService createForwardSyncService() {
blockBlobSidecarsTrackersPool,
executionPayloadManager,
syncPreImportBlockChannel,
syncReorgManager,
syncConfig.getForwardSyncBatchSize(),
syncConfig.getForwardSyncMaxPendingBatches(),
syncConfig.getForwardSyncMaxBlocksPerMinute(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
Expand All @@ -46,6 +47,8 @@ public class BatchSync implements Sync {
private static final Logger LOG = LogManager.getLogger();
private static final Duration PAUSE_ON_SERVICE_OFFLINE_OR_DAS_CHECK = Duration.ofSeconds(5);

private final Subscribers<BlocksImportedSubscriber> subscribers = Subscribers.create(true);

private final EventThread eventThread;
private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
Expand Down Expand Up @@ -123,6 +126,11 @@ public static BatchSync create(
syncPreImportBlockChannel);
}

@Override
public long subscribeToBlocksImportedEvent(final BlocksImportedSubscriber subscriber) {
return subscribers.subscribe(subscriber);
}

/**
* Begin a sync to the specified target chain. If a sync was previously in progress to a different
* chain, the sync will switch to this new chain.
Expand Down Expand Up @@ -455,6 +463,11 @@ private void onImportComplete(
// Everything prior to this batch must already exist on our chain so we can drop them all
activeBatches.removeUpToIncluding(importedBatch);
commonAncestorSlot = SafeFuture.completedFuture(importedBatch.getLastSlot());
importedBatch
.getLastBlock()
.ifPresent(
lastBlock ->
subscribers.deliver(subscriber -> subscriber.onBlocksImported(lastBlock)));
}
progressSync();
if (activeBatches.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static MultipeerSyncService create(
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ExecutionPayloadManager executionPayloadManager,
final SyncPreImportBlockChannel syncPreImportBlockChannel,
final SyncReorgManager syncReorgManager,
final int batchSize,
final int maxPendingBatches,
final int maxBlocksPerMinute,
Expand Down Expand Up @@ -124,6 +125,7 @@ eventThread, spec, blobSidecarManager, new PeerScoringConflictResolutionStrategy
finalizedTargetChains,
nonfinalizedTargetChains,
spec.getSlotsPerEpoch(recentChainData.getCurrentSlot().orElse(UInt64.ZERO))),
syncReorgManager,
batchSync);
final PeerChainTracker peerChainTracker =
new PeerChainTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;

public interface Sync {
Expand All @@ -32,6 +33,12 @@ public interface Sync {

SafeFuture<Optional<SyncProgress>> getSyncProgress();

long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber);

interface BlocksImportedSubscriber {
void onBlocksImported(SignedBeaconBlock lastImportedBlock);
}

record SyncProgress(
UInt64 fromSlot,
UInt64 toSlot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.events.SyncingStatus;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync.SyncSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
Expand All @@ -30,11 +31,13 @@
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncController {
public class SyncController implements BlocksImportedSubscriber {
// Rate limiting is usually in 1 minute intervals, so it should be reset after such delay
protected static final Duration SYNC_AWAKE_INTERVAL = Duration.ofSeconds(60);

private static final Logger LOG = LogManager.getLogger();

private final Subscribers<SyncSubscriber> subscribers = Subscribers.create(true);
Expand All @@ -44,6 +47,7 @@ public class SyncController {
private final RecentChainData recentChainData;
private final SyncTargetSelector syncTargetSelector;
private final Sync sync;
private final SyncReorgManager syncReorgManager;

/**
* The current sync. When empty, no sync has started, otherwise contains the details of the last
Expand All @@ -60,19 +64,33 @@ public SyncController(
final Executor subscriberExecutor,
final RecentChainData recentChainData,
final SyncTargetSelector syncTargetSelector,
final SyncReorgManager syncReorgManager,
final Sync sync) {
this.eventThread = eventThread;
this.subscriberExecutor = subscriberExecutor;
this.recentChainData = recentChainData;
this.syncTargetSelector = syncTargetSelector;
this.syncReorgManager = syncReorgManager;
this.sync = sync;
sync.subscribeToBlocksImportedEvent(this);
// Try to restart sync, could recover stalled sync
asyncRunner.runWithFixedDelay(
() -> eventThread.execute(this::onTargetChainsUpdated),
SYNC_AWAKE_INTERVAL,
ex -> LOG.error("Unexpected error when trying to awake peer sync", ex));
}

@Override
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) {
eventThread.execute(
() -> {
if (isSyncSpeculative()) {
return;
}
syncReorgManager.onBlocksImported(lastImportedBlock);
});
}

/**
* Notify that chains have been updated.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.forward.multipeer;

import java.util.Optional;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.storage.client.ChainHead;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncReorgManager implements BlocksImportedSubscriber {
static final int REORG_SLOT_THRESHOLD = 10;

private final RecentChainData recentChainData;
private final ForkChoiceTrigger forkChoiceTrigger;

public SyncReorgManager(
final RecentChainData recentChainData, final ForkChoiceTrigger forkChoiceTrigger) {
this.recentChainData = recentChainData;
this.forkChoiceTrigger = forkChoiceTrigger;
}

@Override
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) {

final Optional<ChainHead> currentHead = recentChainData.getChainHead();

if (currentHead.isEmpty()) {
return;
}

if (lastImportedBlock.getRoot().equals(currentHead.get().getRoot())) {
return;
}

if (currentHead
.get()
.getSlot()
.plus(REORG_SLOT_THRESHOLD)
.isGreaterThan(lastImportedBlock.getSlot())) {
return;
}

forkChoiceTrigger.reorgWhileSyncing(currentHead.get().getRoot(), lastImportedBlock.getRoot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.beacon.sync.events.SyncPreImportBlockChannel;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.StubBatchFactory;
Expand Down Expand Up @@ -662,6 +663,95 @@ void shouldRemoveBatchFromActiveSetWhenImportCompletesSuccessfully() {
assertBatchNotActive(batch0);
}

@Test
void shouldNotifyOnBlocksImported() {
assertThat(sync.syncToChain(targetChain)).isNotDone();
final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class);

sync.subscribeToBlocksImportedEvent(subscriber);

final Batch batch0 = batches.get(0);
final Batch batch1 = batches.get(1);
batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock());
batches.receiveBlocks(
batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock());

assertBatchImported(batch0);
verifyNoInteractions(subscriber);
batches.getImportResult(batch0).complete(IMPORTED_ALL_BLOCKS);

verify(subscriber).onBlocksImported(batch0.getLastBlock().orElseThrow());
verifyNoMoreInteractions(subscriber);
}

@Test
void shouldNotNotifyOnBlocksImportedWhenImportFails() {
assertThat(sync.syncToChain(targetChain)).isNotDone();
final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class);

sync.subscribeToBlocksImportedEvent(subscriber);

final Batch batch0 = batches.get(0);
final Batch batch1 = batches.get(1);
batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock());
batches.receiveBlocks(
batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock());

assertBatchImported(batch0);
batches.getImportResult(batch0).complete(IMPORT_FAILED);

verifyNoInteractions(subscriber);
}

@Test
void shouldNotNotifyOnBlocksImportedWhenExecutionClientOffline() {
assertThat(sync.syncToChain(targetChain)).isNotDone();
final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class);

sync.subscribeToBlocksImportedEvent(subscriber);

final Batch batch0 = batches.get(0);
final Batch batch1 = batches.get(1);
batches.receiveBlocks(
batch0, chainBuilder.generateBlockAtSlot(batch0.getFirstSlot()).getBlock());
batches.receiveBlocks(
batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock());

assertBatchImported(batch0);
batches.getImportResult(batch0).complete(EXECUTION_CLIENT_OFFLINE);

verifyNoInteractions(subscriber);
}

@Test
void shouldNotNotifyOnBlocksImportedWhenSwitchingBranches() {
assertThat(sync.syncToChain(targetChain)).isNotDone();
final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class);

sync.subscribeToBlocksImportedEvent(subscriber);

final Batch batch0 = batches.get(0);
final Batch batch1 = batches.get(1);
batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock());
batches.receiveBlocks(
batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock());

assertBatchImported(batch0);

// Switch to a shorter chain while batch0 is importing, forcing a restart
final Batch lastBatch = batches.get(batches.size() - 1);
targetChain =
chainWith(
new SlotAndBlockRoot(
lastBatch.getLastSlot().minus(1), dataStructureUtil.randomBytes32()),
syncSource);
assertThat(sync.syncToChain(targetChain)).isNotDone();

batches.getImportResult(batch0).complete(IMPORTED_ALL_BLOCKS);

verifyNoInteractions(subscriber);
}

@Test
void shouldSwitchChains() {
// Start sync to first chain
Expand Down
Loading
Loading