Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions nil/common/concurrent/suspendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package concurrent
import (
"context"
"errors"
"fmt"
"log"
"time"
)
Expand All @@ -15,7 +16,7 @@ const (
workerStatePaused
)

var ErrWorkerStopped = errors.New("worker was stopped")
var ErrSuspendableTerminated = errors.New("suspendable action was terminated")

type stateChangeRequest struct {
newState workerState
Expand All @@ -24,25 +25,25 @@ type stateChangeRequest struct {

// Suspendable provides a mechanism for suspending and resuming periodic execution of an action.
type Suspendable struct {
action func(context.Context)
interval time.Duration
stateCh chan stateChangeRequest
stopped chan struct{}
action func(context.Context) error
interval time.Duration
stateCh chan stateChangeRequest
stoppedCh chan error
}

func NewSuspendable(action func(context.Context), interval time.Duration) *Suspendable {
return &Suspendable{
action: action,
interval: interval,
stateCh: make(chan stateChangeRequest),
stopped: make(chan struct{}),
func NewSuspendable(action func(context.Context) error, interval time.Duration) Suspendable {
return Suspendable{
action: action,
interval: interval,
stateCh: make(chan stateChangeRequest),
stoppedCh: make(chan error, 1),
}
}

// Run executes a suspendable action periodically based on the provided interval until the context is canceled.
// It listens for pause and resume signals, halting and resuming execution accordingly.
func (s *Suspendable) Run(ctx context.Context, started chan<- struct{}) error {
defer close(s.stopped)
defer close(s.stoppedCh)

ticker := time.NewTicker(s.interval)
defer ticker.Stop()
Expand All @@ -55,10 +56,15 @@ func (s *Suspendable) Run(ctx context.Context, started chan<- struct{}) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
err := ctx.Err()
s.stoppedCh <- err
return err

case <-ticker.C:
s.action(ctx)
if err := s.action(ctx); err != nil {
s.stoppedCh <- err
return err
}

case req := <-s.stateCh:
s.onStateChange(ticker, &state, req)
Expand Down Expand Up @@ -117,7 +123,7 @@ func (s *Suspendable) pushAndWait(ctx context.Context, newState workerState) (bo
return stateWasChanged, nil
}

case <-s.stopped:
return false, ErrWorkerStopped
case err := <-s.stoppedCh:
return false, fmt.Errorf("%w: %w", ErrSuspendableTerminated, err)
}
}
45 changes: 38 additions & 7 deletions nil/common/concurrent/suspendable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package concurrent

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -86,9 +87,10 @@ func (s *SuspendableTestSuite) Test_Pause_Not_Running_Timeout() {

func (s *SuspendableTestSuite) Test_Pause_Long_Running_Action() {
var called atomic.Bool
action := func(ctx context.Context) {
action := func(ctx context.Context) error {
called.Store(true)
time.Sleep(500 * time.Millisecond)
return nil
}
suspendable := NewSuspendable(action, testActionInterval)

Expand Down Expand Up @@ -140,7 +142,7 @@ func (s *SuspendableTestSuite) Test_Pause_Worker_Cancelled() {
cancel()

paused, err := suspendable.Pause(s.ctx)
s.Require().ErrorIs(err, ErrWorkerStopped)
s.Require().ErrorIs(err, ErrSuspendableTerminated)
s.Require().False(paused)
}

Expand Down Expand Up @@ -243,26 +245,55 @@ func (s *SuspendableTestSuite) Test_Resume_Worker_Cancelled() {
cancel()

resumed, err := suspendable.Resume(s.ctx)
s.Require().ErrorIs(err, ErrWorkerStopped)
s.Require().ErrorIs(err, ErrSuspendableTerminated)
s.Require().False(resumed)
}

func (s *SuspendableTestSuite) noopAction() func(ctx context.Context) {
func (s *SuspendableTestSuite) Test_Terminated_After_Error() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testErr := errors.New("something went wrong")

action := func(ctx context.Context) error {
time.Sleep(100 * time.Millisecond)
return testErr
}
suspendable := NewSuspendable(action, testActionInterval)

var errGroup errgroup.Group
errGroup.Go(func() error {
return suspendable.Run(ctx, nil)
})

err := errGroup.Wait()
s.Require().ErrorIs(err, testErr)

paused, err := suspendable.Pause(s.ctx)
s.Require().ErrorIs(err, ErrSuspendableTerminated)
s.Require().ErrorIs(err, testErr)
s.Require().False(paused)
}

func (s *SuspendableTestSuite) noopAction() func(ctx context.Context) error {
s.T().Helper()
return func(ctx context.Context) {}
return func(ctx context.Context) error {
return nil
}
}

type testSuspendable struct {
*Suspendable
Suspendable
numOfCalls *atomic.Int32
}

func (s *SuspendableTestSuite) newRunningSuspendable(ctx context.Context) testSuspendable {
s.T().Helper()

var numOfCalls atomic.Int32
action := func(ctx context.Context) {
action := func(ctx context.Context) error {
numOfCalls.Add(1)
return nil
}
suspendable := NewSuspendable(action, testActionInterval)

Expand Down
1 change: 1 addition & 0 deletions nil/common/logging/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
FieldTaskParentId = "taskParentId"
FieldTaskType = "taskType"
FieldTaskExecutorId = "taskExecutorId"
FieldWorkerName = "workerName"

FieldTokenId = "TokenId"

Expand Down
61 changes: 5 additions & 56 deletions nil/services/synccommittee/core/fetching/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
v1 "github.com/NilFoundation/nil/nil/services/synccommittee/core/batches/encode/v1"
"github.com/NilFoundation/nil/nil/services/synccommittee/core/reset"
"github.com/NilFoundation/nil/nil/services/synccommittee/core/rollupcontract"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
Expand All @@ -26,7 +25,7 @@ import (
)

type AggregatorMetrics interface {
metrics.BasicMetrics
srv.WorkerMetrics
RecordBatchCreated(ctx context.Context, batch *types.BlockBatch)
}

Expand Down Expand Up @@ -63,6 +62,8 @@ func NewDefaultAggregatorConfig() AggregatorConfig {
}

type aggregator struct {
concurrent.Suspendable

fetcher *Fetcher
batchChecker BatchConstraintChecker
blockStorage AggregatorBlockStorage
Expand All @@ -75,7 +76,6 @@ type aggregator struct {
clock clockwork.Clock
metrics AggregatorMetrics
config AggregatorConfig
workerAction *concurrent.Suspendable
logger logging.Logger
}

Expand Down Expand Up @@ -106,7 +106,8 @@ func NewAggregator(
config: config,
}

agg.workerAction = concurrent.NewSuspendable(agg.runIteration, config.RpcPollingInterval)
iteration := srv.NewWorkerIteration(logger, metrics, agg.Name(), agg.processBlocksAndHandleErr)
agg.Suspendable = concurrent.NewSuspendable(iteration.Run, config.RpcPollingInterval)
agg.logger = srv.WorkerLogger(logger, agg)
return agg
}
Expand All @@ -115,53 +116,6 @@ func (agg *aggregator) Name() string {
return "aggregator"
}

func (agg *aggregator) Run(ctx context.Context, started chan<- struct{}) error {
agg.logger.Info().Msg("Starting block fetching")

err := agg.workerAction.Run(ctx, started)

if err == nil || errors.Is(err, context.Canceled) {
agg.logger.Info().Msg("Block fetching stopped")
} else {
agg.logger.Error().Err(err).Msg("Error running aggregator, stopped")
}

return err
}

func (agg *aggregator) Pause(ctx context.Context) error {
paused, err := agg.workerAction.Pause(ctx)
if err != nil {
return err
}
if paused {
agg.logger.Info().Msg("Block fetching paused")
} else {
agg.logger.Warn().Msg("Block fetching already paused")
}
return nil
}

func (agg *aggregator) Resume(ctx context.Context) error {
resumed, err := agg.workerAction.Resume(ctx)
if err != nil {
return err
}
if resumed {
agg.logger.Info().Msg("Block fetching resumed")
} else {
agg.logger.Warn().Msg("Block fetching already running")
}
return nil
}

func (agg *aggregator) runIteration(ctx context.Context) {
err := agg.processBlocksAndHandleErr(ctx)
if err != nil {
agg.metrics.RecordError(ctx, agg.Name())
}
}

// processBlocksAndHandleErr fetches and processes new blocks for all shards.
// It handles the overall flow of block synchronization and proof creation.
func (agg *aggregator) processBlocksAndHandleErr(ctx context.Context) error {
Expand Down Expand Up @@ -189,12 +143,7 @@ func (agg *aggregator) handleProcessingErr(ctx context.Context, err error) error
agg.logger.Info().Err(err).Msg("Storage capacity limit reached, skipping")
return nil

case errors.Is(err, context.Canceled):
agg.logger.Info().Err(err).Msg("Block processing cancelled")
return err

default:
agg.logger.Error().Err(err).Msg("Unexpected error during block aggregation")
return err
}
}
Expand Down
27 changes: 14 additions & 13 deletions nil/services/synccommittee/core/fetching/lag_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package fetching

import (
"context"
"errors"
"fmt"
"time"

"github.com/NilFoundation/nil/nil/common/logging"
coreTypes "github.com/NilFoundation/nil/nil/internal/types"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
)

type LagTrackerMetrics interface {
metrics.BasicMetrics
srv.WorkerMetrics
RecordFetchingLag(ctx context.Context, shardId coreTypes.ShardId, blocksCount int64)
}

Expand All @@ -37,7 +37,6 @@ type lagTracker struct {
fetcher *Fetcher
storage LagTrackerStorage
metrics LagTrackerMetrics
logger logging.Logger
config LagTrackerConfig
}

Expand All @@ -55,27 +54,29 @@ func NewLagTracker(
config: config,
}

tracker.WorkerLoop = srv.NewWorkerLoop("lag_tracker", tracker.config.CheckInterval, tracker.runIteration)
tracker.logger = srv.WorkerLogger(logger, tracker)
loopConfig := srv.NewWorkerLoopConfig("lag_tracker", tracker.config.CheckInterval, tracker.runIteration)
tracker.WorkerLoop = srv.NewWorkerLoop(loopConfig, metrics, logger)
return tracker
}

func (t *lagTracker) runIteration(ctx context.Context) {
t.logger.Debug().Msg("running lag tracker iteration")
func (t *lagTracker) runIteration(ctx context.Context) error {
t.Logger.Debug().Msg("running lag tracker iteration")

lagPerShard, err := t.getLagForAllShards(ctx)
if err != nil {
t.logger.Error().Err(err).Msg("failed to fetch lag per shard")
t.metrics.RecordError(ctx, t.Name())
return
switch {
case errors.Is(err, context.Canceled):
return err
case err != nil:
return fmt.Errorf("failed to fetch lag per shard: %w", err)
}

for shardId, blocksCount := range lagPerShard {
t.metrics.RecordFetchingLag(ctx, shardId, blocksCount)
t.logger.Trace().Stringer(logging.FieldShardId, shardId).Msgf("lag in shard %s: %d", shardId, blocksCount)
t.Logger.Trace().Stringer(logging.FieldShardId, shardId).Msgf("lag in shard %s: %d", shardId, blocksCount)
}

t.logger.Debug().Msg("lag tracker iteration completed")
t.Logger.Debug().Msg("lag tracker iteration completed")
return nil
}

func (t *lagTracker) getLagForAllShards(ctx context.Context) (map[coreTypes.ShardId]int64, error) {
Expand Down
Loading