Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
BLSSignature,
isBlindedBeaconBlock,
isBlindedBlockContents,
phase0,
} from "@lodestar/types";
import {ExecutionStatus} from "@lodestar/fork-choice";
import {toHex, racePromisesWithCutoff, RaceEvent} from "@lodestar/utils";
Expand Down Expand Up @@ -172,12 +173,17 @@ export function getValidatorApi({

/**
* This function is called 1s before next epoch, usually at that time PrepareNextSlotScheduler finishes
* so we should have checkpoint state, otherwise wait for up to `timeoutMs`.
* so we should have checkpoint state, otherwise wait for up to the slot 1 of epoch.
* slot epoch 0 1
* |------------|------------|
* ^ ^
* | |
* | |
* | waitForCheckpointState (1s before slot 0 of epoch, wait until slot 1 of epoch)
* |
* prepareNextSlot (4s before next slot)
*/
async function waitForCheckpointState(
cpHex: CheckpointHex,
timeoutMs: number
): Promise<CachedBeaconStateAllForks | null> {
async function waitForCheckpointState(cpHex: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const cpState = chain.regen.getCheckpointStateSync(cpHex);
if (cpState) {
return cpState;
Expand All @@ -186,16 +192,30 @@ export function getValidatorApi({
epoch: cpHex.epoch,
root: fromHexString(cpHex.rootHex),
};
// if not, wait for ChainEvent.checkpoint event until timeoutMs
return new Promise<CachedBeaconStateAllForks | null>((resolve) => {
const timer = setTimeout(() => resolve(null), timeoutMs);
chain.emitter.on(ChainEvent.checkpoint, (eventCp, cpState) => {
if (ssz.phase0.Checkpoint.equals(eventCp, cp)) {
clearTimeout(timer);
resolve(cpState);
}
});
});
const slot0 = computeStartSlotAtEpoch(cp.epoch);
// if not, wait for ChainEvent.checkpoint event until slot 1 of epoch
let listener: ((eventCp: phase0.Checkpoint) => void) | null = null;
const foundCPState = await Promise.race([
new Promise((resolve) => {
listener = (eventCp) => {
resolve(ssz.phase0.Checkpoint.equals(eventCp, cp));
};
chain.emitter.once(ChainEvent.checkpoint, listener);
}),
// in rare case, checkpoint state cache may happen up to 6s of slot 0 of epoch
// so we wait for it until the slot 1 of epoch
chain.clock.waitForSlot(slot0 + 1),
]);

if (listener != null) {
chain.emitter.off(ChainEvent.checkpoint, listener);
}

if (foundCPState === true) {
return chain.regen.getCheckpointStateSync(cpHex);
}

return null;
}

/**
Expand Down Expand Up @@ -721,7 +741,7 @@ export function getValidatorApi({
// this is to avoid missed block proposal due to 0 epoch look ahead
if (epoch === nextEpoch && toNextEpochMs < prepareNextSlotLookAheadMs) {
// wait for maximum 1 slot for cp state which is the timeout of validator api
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch}, slotMs);
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch});
if (cpState) {
state = cpState;
metrics?.duties.requestNextEpochProposalDutiesHit.inc();
Expand Down