Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/6338.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/cometbft/apps/scheduler: Run elections at epoch end
23 changes: 15 additions & 8 deletions go/consensus/cometbft/apps/roothash/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,15 @@ func (app *Application) tryFinalizeRoundInsideTx( //nolint: gocyclo
}

// Generate the final block.
return app.finalizeBlock(ctx, rtState, block.Normal, &sc.Commitment.Header.Header)
app.finalizeBlock(ctx, rtState, block.Normal, &sc.Commitment.Header.Header)
if err := resetCommitments(ctx, rtState, false); err != nil {
return fmt.Errorf("failed to reset commitments: %w", err)
}

return nil
}

func (app *Application) finalizeBlock(ctx *tmapi.Context, rtState *roothash.RuntimeState, hdrType block.HeaderType, hdr *commitment.ComputeResultsHeader) error {
func (app *Application) finalizeBlock(ctx *tmapi.Context, rtState *roothash.RuntimeState, hdrType block.HeaderType, hdr *commitment.ComputeResultsHeader) {
// Generate a new block.
blk := block.NewEmptyBlock(rtState.LastBlock, uint64(ctx.Now().Unix()), hdrType)

Expand Down Expand Up @@ -315,20 +320,21 @@ func (app *Application) finalizeBlock(ctx *tmapi.Context, rtState *roothash.Runt
TypedAttribute(&roothash.FinalizedEvent{Round: blk.Header.Round}).
TypedAttribute(&roothash.RuntimeIDAttribute{ID: rtState.Runtime.ID}),
)
}

func resetCommitments(ctx *tmapi.Context, rtState *roothash.RuntimeState, suspended bool) error {
// Reset scheduler commitments.
switch hdrType {
case block.Suspended:
if suspended {
rtState.CommitmentPool = nil
default:
} else {
rtState.CommitmentPool = commitment.NewPool()
}

// Re-arm round timeout. Give schedulers unlimited time to submit commitments.
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = roothash.TimeoutNever

return rearmRoundTimeout(ctx, rtState.Runtime.ID, blk.Header.Round, prevTimeout, rtState.NextTimeout)
return rearmRoundTimeout(ctx, rtState.Runtime.ID, rtState.LastBlock.Header.Round, prevTimeout, rtState.NextTimeout)
}

func (app *Application) failRound(
Expand All @@ -353,8 +359,9 @@ func (app *Application) failRound(

rtState.LivenessStatistics.MissedProposals[firstSchedulerIdx]++

if err := app.finalizeBlock(ctx, rtState, block.RoundFailed, nil); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
app.finalizeBlock(ctx, rtState, block.RoundFailed, nil)
if err := resetCommitments(ctx, rtState, false); err != nil {
return fmt.Errorf("failed to reset commitments: %w", err)
}

return nil
Expand Down
19 changes: 19 additions & 0 deletions go/consensus/cometbft/apps/roothash/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
registryState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/registry/state"
roothashApi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/roothash/api"
roothashState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/roothash/state"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/features"
governance "github.com/oasisprotocol/oasis-core/go/governance/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
"github.com/oasisprotocol/oasis-core/go/upgrade/migrations"
)

func fetchRuntimeMessages(
Expand Down Expand Up @@ -171,6 +173,23 @@ func (app *Application) processRuntimeMessages(
func (app *Application) doBeforeSchedule(ctx *tmapi.Context, msg any) (any, error) {
epoch := msg.(beacon.EpochTime)

ok, err := features.IsFeatureVersion(ctx, migrations.Version242)
if err != nil {
return nil, err
}
if ok {
ctx.Logger().Debug("finalizing rounds before scheduling",
"epoch", epoch,
)

if err := app.tryFinalizeRounds(ctx); err != nil {
return nil, err
}
if err := app.processRoundTimeouts(ctx); err != nil {
return nil, err
}
}

ctx.Logger().Debug("processing liveness statistics before scheduling",
"epoch", epoch,
)
Expand Down
138 changes: 106 additions & 32 deletions go/consensus/cometbft/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

"github.com/cometbft/cometbft/abci/types"

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
beaconState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/beacon/state"
governanceApi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/governance/api"
registryApi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/registry/api"
registryState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/registry/state"
Expand All @@ -20,11 +20,13 @@ import (
schedulerState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/scheduler/state"
stakingapp "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/staking"
stakingState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/staking/state"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/features"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
schedulerAPI "github.com/oasisprotocol/oasis-core/go/scheduler/api"
stakingAPI "github.com/oasisprotocol/oasis-core/go/staking/api"
"github.com/oasisprotocol/oasis-core/go/upgrade/migrations"
)

// Application is a roothash application.
Expand Down Expand Up @@ -86,22 +88,45 @@ func (app *Application) OnCleanup() {

// BeginBlock implements api.Application.
func (app *Application) BeginBlock(ctx *api.Context) error {
return app.maybeChangeCommitteeInBeginBlock(ctx)
}

func (app *Application) maybeChangeCommitteeInBeginBlock(ctx *api.Context) error {
ok, err := app.shouldChangeCommitteeInBeginBlock(ctx)
if err != nil {
return err
}
if !ok {
return nil
}
return app.changeCommittee(ctx)
}

func (app *Application) shouldChangeCommitteeInBeginBlock(ctx *api.Context) (bool, error) {
isFeatureVersion242, err := features.IsFeatureVersion(ctx, migrations.Version242)
if err != nil {
return false, err
}
if isFeatureVersion242 {
return false, nil
}

// Check if there was an epoch transition.
epochChanged, epoch := app.state.EpochChanged(ctx)
epochChanged, _ := app.state.EpochChanged(ctx)
if epochChanged {
return app.onCommitteeChanged(ctx, epoch)
return true, nil
}

// Check if rescheduling has taken place.
rescheduled := ctx.HasEvent(schedulerapp.AppName, &schedulerAPI.ElectedEvent{})
if rescheduled {
return app.onCommitteeChanged(ctx, epoch)
return true, nil
}

return nil
return false, nil
}

func (app *Application) onCommitteeChanged(ctx *api.Context, epoch beacon.EpochTime) error {
func (app *Application) changeCommittee(ctx *api.Context) error {
roothash := roothashState.NewImmutableState(ctx.State())
registry := registryState.NewImmutableState(ctx.State())

Expand All @@ -118,18 +143,17 @@ func (app *Application) onCommitteeChanged(ctx *api.Context, epoch beacon.EpochT

runtimes, _ := registry.Runtimes(ctx)
for _, rt := range runtimes {
if err := app.onRuntimeCommitteeChanged(ctx, rt, epoch, params, stake); err != nil {
if err := app.changeRuntimeCommittee(ctx, rt, params, stake); err != nil {
return err
}
}

return nil
}

func (app *Application) onRuntimeCommitteeChanged(
func (app *Application) changeRuntimeCommittee(
ctx *api.Context,
rt *registry.Runtime,
epoch beacon.EpochTime,
params *roothash.ConsensusParameters,
stake *stakingState.StakeAccumulatorCache,
) error {
Expand Down Expand Up @@ -170,13 +194,13 @@ func (app *Application) onRuntimeCommitteeChanged(
}

// Suspend the runtime if needed.
var suspend bool
var suspended bool
switch {
case committee == nil:
logger.Warn("no executor committee")
logger.Debug("suspending runtime: no executor committee")
// If there are no committees for this runtime, suspend the runtime
// as this means that there is no one to pay the maintenance fees.
suspend = true
suspended = true
case params.DebugDoNotSuspendRuntimes, params.DebugBypassStake:
// If the debug flag is set, do not suspend the runtime.
default:
Expand All @@ -195,50 +219,57 @@ func (app *Application) onRuntimeCommitteeChanged(
case nil:
// Sufficient stake is available.
case stakingAPI.ErrInsufficientStake:
logger.Debug("insufficient stake for runtime operation",
logger.Debug("suspending runtime: insufficient stake",
"entity", rt.EntityID,
"account", *addr,
)
suspend = true
suspended = true
default:
return fmt.Errorf("failed to check stake claims: %w", err)
}
}

switch suspend {
case true:
logger.Debug("suspending runtime, maintenance fees not paid or owner debonded",
"epoch", epoch,
)
isFeatureVersion242, err := features.IsFeatureVersion(ctx, migrations.Version242)
if err != nil {
logger.Error("failed to get feature version", "err", err)
return err
}

switch suspended {
case true:
if err = registry.SuspendRuntime(ctx, rt.ID); err != nil {
return err
}

// Emit an empty block signalling that the runtime was suspended.
if err = app.finalizeBlock(ctx, rtState, block.Suspended, nil); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
// Check if we need to emit suspended block signalling that the runtime
// was suspended for deprecated suspension logic.
if !isFeatureVersion242 {
app.finalizeBlock(ctx, rtState, block.Suspended, nil)
}

rtState.Suspended = true
rtState.Committee = nil
committee = nil
case false:
logger.Debug("updating committee for runtime",
"epoch", epoch,
"committee", committee,
)

// Emit an empty block signaling epoch transition. This is required so that
// the clients can be sure what state is final when an epoch transition occurs.
if err = app.finalizeBlock(ctx, rtState, block.EpochTransition, nil); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
// Check if we need to emit epoch transition block on epoch changes
// for deprecated election logic.
if !isFeatureVersion242 {
// Emit an empty block signaling epoch transition. This is required so that
// the clients can be sure what state is final when an epoch transition occurs.
app.finalizeBlock(ctx, rtState, block.EpochTransition, nil)
}
}

// Warning: Non-suspended runtimes can still have a nil committee.
rtState.Suspended = false
rtState.Committee = committee
if err := resetCommitments(ctx, rtState, suspended); err != nil {
return fmt.Errorf("failed to reset commitments: %w", err)
}

// Warning: Non-suspended runtimes can still have a nil committee.
rtState.Suspended = suspended
rtState.Committee = committee

// Clear liveness statistics.
rtState.LivenessStatistics = nil
// Update the runtime descriptor to the latest per-epoch value.
Expand Down Expand Up @@ -424,5 +455,48 @@ func (app *Application) EndBlock(ctx *api.Context) (types.ResponseEndBlock, erro
return types.ResponseEndBlock{}, err
}

if err := app.maybeChangeCommitteeInEndBlock(ctx); err != nil {
return types.ResponseEndBlock{}, err
}

return types.ResponseEndBlock{}, nil
}

func (app *Application) maybeChangeCommitteeInEndBlock(ctx *api.Context) error {
ok, err := app.shouldChangeCommitteeInEndBlock(ctx)
if err != nil {
return err
}
if !ok {
return nil
}
return app.changeCommittee(ctx)
}

func (app *Application) shouldChangeCommitteeInEndBlock(ctx *api.Context) (bool, error) {
isFeatureVersion242, err := features.IsFeatureVersion(ctx, migrations.Version242)
if err != nil {
return false, err
}
if !isFeatureVersion242 {
return false, nil
}

// Check if we are at the end of an epoch.
beaconState := beaconState.NewMutableState(ctx.State())
future, err := beaconState.GetFutureEpoch(ctx)
if err != nil {
return false, err
}
if future != nil && future.Height == ctx.CurrentHeight()+1 {
return true, nil
}

// Check if rescheduling has taken place.
rescheduled := ctx.HasEvent(schedulerapp.AppName, &schedulerAPI.ElectedEvent{})
if rescheduled {
return true, nil
}

return false, nil
}
1 change: 1 addition & 0 deletions go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (app *Application) executorCommit(
"round", commit.Header.Header.Round,
"node_id", commit.NodeID,
"scheduler_id", commit.Header.SchedulerID,
"hash", commit.Header.Header.EncodedHash(),
"failure", commit.IsIndicatingFailure(),
)
}
Expand Down
Loading
Loading