diff --git a/nil/services/synccommittee/core/batches/constraints/checker.go b/nil/services/synccommittee/core/batches/constraints/checker.go index 0883f35db..f276a602c 100644 --- a/nil/services/synccommittee/core/batches/constraints/checker.go +++ b/nil/services/synccommittee/core/batches/constraints/checker.go @@ -57,7 +57,7 @@ func (c *checker) CheckConstraints(ctx context.Context, batch *types.BlockBatch) } else { c.logger.Info(). Stringer(logging.FieldBatchId, batch.Id). - Msgf("Batch constraint(s) are violated, result: %s", batchResult) + Msgf("Batch constraints are violated, result: %s", batchResult) } return batchResult, nil diff --git a/nil/services/synccommittee/core/batches/constraints/result.go b/nil/services/synccommittee/core/batches/constraints/result.go index d51063457..42a771ce9 100644 --- a/nil/services/synccommittee/core/batches/constraints/result.go +++ b/nil/services/synccommittee/core/batches/constraints/result.go @@ -54,6 +54,10 @@ func (r *CheckResult) JoinWith(other *CheckResult) { r.Details = joinedDetails } +func (r *CheckResult) CanBeExtended() bool { + return r.Type == CheckResultTypeCanBeExtended +} + func newCheckResult(resultType CheckResultType, format string, args ...any) CheckResult { return CheckResult{ Type: resultType, diff --git a/nil/services/synccommittee/core/fetching/aggregator.go b/nil/services/synccommittee/core/fetching/aggregator.go index 447fbf1c0..e64f66893 100644 --- a/nil/services/synccommittee/core/fetching/aggregator.go +++ b/nil/services/synccommittee/core/fetching/aggregator.go @@ -42,6 +42,7 @@ type AggregatorBlockStorage interface { GetLatestFetched(ctx context.Context) (types.BlockRefs, error) TryGetProvedStateRoot(ctx context.Context) (*common.Hash, error) TryGetLatestBatch(ctx context.Context) (*types.BlockBatch, error) + HasFreeSpace(ctx context.Context) (bool, error) PutBlockBatch(ctx context.Context, batch *types.BlockBatch) error } @@ -194,6 +195,8 @@ func (agg *aggregator) processBlockRange(ctx context.Context) error { return nil } +// tryPrepareBatch attempts to prepare a batch for processing by either creating a new one (if possible) +// or returning the latest created batch if it can be extended with additional blocks func (agg *aggregator) tryPrepareBatch(ctx context.Context) (*types.BlockBatch, error) { latestBatch, err := agg.blockStorage.TryGetLatestBatch(ctx) if err != nil { @@ -210,13 +213,25 @@ func (agg *aggregator) tryPrepareBatch(ctx context.Context) (*types.BlockBatch, return agg.createAndPutNewBatch(ctx, &latestBatch.Id) } - agg.logger.Debug().Msgf("Latest batch with id=%s is not sealed", latestBatch.Id) + agg.logger.Debug().Msgf("Latest batch with id=%s is not sealed, checking constraints", latestBatch.Id) checkResult, err := agg.batchChecker.CheckConstraints(ctx, latestBatch) if err != nil { return nil, fmt.Errorf("error checking batch constraints, batchId=%s: %w", latestBatch.Id, err) } + if checkResult.CanBeExtended() { + return latestBatch, nil + } + + return nil, agg.handleUnextendableBatch(ctx, latestBatch, checkResult) +} + +func (agg *aggregator) handleUnextendableBatch( + ctx context.Context, + latestBatch *types.BlockBatch, + checkResult *constraints.CheckResult, +) error { switch checkResult.Type { case constraints.CheckResultTypeShouldBeDiscarded: agg.logger.Warn(). @@ -224,35 +239,45 @@ func (agg *aggregator) tryPrepareBatch(ctx context.Context) (*types.BlockBatch, Msgf("Discarding latest batch due to constraint(s) violation: %s", checkResult.Details) if err := agg.resetter.LaunchPartialResetWithSuspension(ctx, agg, latestBatch.Id); err != nil { - return nil, fmt.Errorf("error resetting progress for batch %s: %w", latestBatch.Id, err) + return fmt.Errorf("error resetting progress for batch %s: %w", latestBatch.Id, err) } - return nil, nil + return nil case constraints.CheckResultTypeShouldBeSealed: agg.logger.Info(). Stringer(logging.FieldBatchId, latestBatch.Id). Msgf("Sealing batch: %s", checkResult.Details) - if err := agg.sealBatch(ctx, latestBatch); err != nil { - return nil, fmt.Errorf("error sealing batch: %w", err) + return fmt.Errorf("error sealing batch: %w", err) } - - return nil, nil + return nil case constraints.CheckResultTypeCanBeExtended: - return latestBatch, nil - + return agg.errUnexpectedResult(checkResult, latestBatch.Id) default: - return nil, fmt.Errorf("unexpected batch check result type: %s, batchId=%s", checkResult.Type, latestBatch.Id) + return agg.errUnexpectedResult(checkResult, latestBatch.Id) } } +func (*aggregator) errUnexpectedResult(checkResult *constraints.CheckResult, batchId types.BatchId) error { + return fmt.Errorf("unexpected batch check result type: %s, batchId=%s", checkResult.Type, batchId) +} + func (agg *aggregator) createAndPutNewBatch(ctx context.Context, parentId *types.BatchId) (*types.BlockBatch, error) { + hasFreeSpace, err := agg.blockStorage.HasFreeSpace(ctx) + if err != nil { + return nil, fmt.Errorf("error checking storage capacity: %w", err) + } + if !hasFreeSpace { + agg.logger.Info().Msg("Storage capacity limit reached, new batch cannot be created") + return nil, nil + } + now := agg.clock.Now() nextBatch := types.NewBlockBatch(parentId, now) - err := agg.blockStorage.PutBlockBatch(ctx, nextBatch) + err = agg.blockStorage.PutBlockBatch(ctx, nextBatch) switch { case errors.Is(err, storage.ErrCapacityLimitReached): return nil, fmt.Errorf("%w, cannot create new batch", err) diff --git a/nil/services/synccommittee/core/fetching/aggregator_test.go b/nil/services/synccommittee/core/fetching/aggregator_test.go index e4a4a4a41..26b695f82 100644 --- a/nil/services/synccommittee/core/fetching/aggregator_test.go +++ b/nil/services/synccommittee/core/fetching/aggregator_test.go @@ -235,7 +235,7 @@ func (s *AggregatorTestSuite) Test_Block_Storage_Capacity_Exceeded() { s.Require().NotNil(latestFetchedBeforeNext) err = agg.processBlockRange(s.ctx) - s.Require().ErrorIs(err, storage.ErrCapacityLimitReached) + s.Require().NoError(err) latestFetchedAfterNext, err := blockStorage.GetLatestFetched(s.ctx) s.Require().NoError(err) diff --git a/nil/services/synccommittee/core/task_state_change_handler.go b/nil/services/synccommittee/core/task_state_change_handler.go index 5723e405d..ecdd0eb18 100644 --- a/nil/services/synccommittee/core/task_state_change_handler.go +++ b/nil/services/synccommittee/core/task_state_change_handler.go @@ -61,12 +61,12 @@ func (h *taskStateChangeHandler) OnTaskTerminated( switch { case result.IsSuccess(): log.NewTaskResultEvent(h.logger, zerolog.InfoLevel, result). - Msg("received successful task result") + Msg("Received successful task result") return h.onTaskSuccess(ctx, task, result) case result.HasRetryableError(): log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result). - Msg("task execution failed with retryable error") + Msg("Task execution failed with retryable error") return nil default: @@ -76,7 +76,7 @@ func (h *taskStateChangeHandler) OnTaskTerminated( func (h *taskStateChangeHandler) resetState(ctx context.Context, task *types.Task, result *types.TaskResult) error { log.NewTaskResultEvent(h.logger, zerolog.WarnLevel, result). - Msg("task execution failed with critical error, state will be reset") + Msg("Task execution failed with critical error, state will be reset") err := h.stateResetLauncher.LaunchPartialResetWithSuspension(ctx, nil, task.BatchId) @@ -96,13 +96,13 @@ func (h *taskStateChangeHandler) resetState(ctx context.Context, task *types.Tas func (h *taskStateChangeHandler) onTaskSuccess(ctx context.Context, task *types.Task, result *types.TaskResult) error { if task.TaskType != types.ProofBatch { log.NewTaskEvent(h.logger, zerolog.DebugLevel, task). - Msgf("task has type %s, just update pending dependency", task.TaskType) + Msgf("Task has type %s, just update pending dependency", task.TaskType) return nil } log.NewTaskResultEvent(h.logger, zerolog.InfoLevel, result). Stringer(logging.FieldBatchId, task.BatchId). - Msg("Proof batch completed") + Msg("Batch proof generation is completed successfully") err := h.batchSetter.SetBatchAsProved(ctx, task.BatchId) diff --git a/nil/services/synccommittee/internal/metrics/task_storage_metrics.go b/nil/services/synccommittee/internal/metrics/task_storage_metrics.go index 2da31dd85..0227503c8 100644 --- a/nil/services/synccommittee/internal/metrics/task_storage_metrics.go +++ b/nil/services/synccommittee/internal/metrics/task_storage_metrics.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "time" - "github.com/NilFoundation/nil/nil/common/check" "github.com/NilFoundation/nil/nil/internal/telemetry" "github.com/NilFoundation/nil/nil/internal/telemetry/telattr" "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types" @@ -87,8 +86,7 @@ func (h *taskStorageMetricsHandler) registerStatsCallback(meter telemetry.Meter) _, err := meter.RegisterCallback( func(ctx context.Context, observer metric.Observer) error { provider, ok := h.provider.Load().(types.TaskStatsProvider) - check.PanicIfNot(ok) - if provider == nil { + if !ok || provider == nil { return nil } diff --git a/nil/services/synccommittee/internal/storage/block_storage.go b/nil/services/synccommittee/internal/storage/block_storage.go index e708e38e6..a55fd8214 100644 --- a/nil/services/synccommittee/internal/storage/block_storage.go +++ b/nil/services/synccommittee/internal/storage/block_storage.go @@ -189,6 +189,16 @@ func (bs *BlockStorage) TryGetBlock(ctx context.Context, id scTypes.BlockId) (*j return &entry.Block, nil } +func (bs *BlockStorage) HasFreeSpace(ctx context.Context) (bool, error) { + tx, err := bs.database.CreateRoTx(ctx) + if err != nil { + return false, err + } + defer tx.Rollback() + + return bs.ops.hasFreeSpace(tx, bs.config) +} + // PutBlockBatch creates a new batch in the storage or updates an existing one. func (bs *BlockStorage) PutBlockBatch(ctx context.Context, batch *scTypes.BlockBatch) error { if batch == nil { @@ -545,6 +555,10 @@ func (bs *BlockStorage) resetAllBatchesImpl(ctx context.Context) error { return fmt.Errorf("failed to reset latest fetched block: %w", err) } + if err := bs.ops.putLatestBatchId(tx, nil); err != nil { + return fmt.Errorf("failed to reset latest batch id: %w", err) + } + if err := bs.deleteBatches(tx, func(batch *batchEntry) bool { return false }); err != nil { return fmt.Errorf("failed to delete all batches: %w", err) } diff --git a/nil/services/synccommittee/internal/storage/block_storage_op_batch_count.go b/nil/services/synccommittee/internal/storage/block_storage_op_batch_count.go index 8450e2069..32ded90d4 100644 --- a/nil/services/synccommittee/internal/storage/block_storage_op_batch_count.go +++ b/nil/services/synccommittee/internal/storage/block_storage_op_batch_count.go @@ -41,6 +41,15 @@ func (t batchCountOp) addStoredCount(tx db.RwTx, delta int32, config BlockStorag return t.putBatchesCount(tx, newBatchesCount) } +func (t batchCountOp) hasFreeSpace(tx db.RoTx, config BlockStorageConfig) (bool, error) { + currentBatchesCount, err := t.getBatchesCount(tx) + if err != nil { + return false, err + } + hasSpace := currentBatchesCount < config.StoredBatchesLimit + return hasSpace, nil +} + func (batchCountOp) getBatchesCount(tx db.RoTx) (uint32, error) { bytes, err := tx.Get(storedBatchesCountTable, mainShardKey) switch { diff --git a/nil/services/synccommittee/internal/storage/block_storage_test.go b/nil/services/synccommittee/internal/storage/block_storage_test.go index eee1dc84d..bef2fdc91 100644 --- a/nil/services/synccommittee/internal/storage/block_storage_test.go +++ b/nil/services/synccommittee/internal/storage/block_storage_test.go @@ -108,6 +108,47 @@ func (s *BlockStorageTestSuite) Test_PutBlockBatch_Capacity_Exceeded() { s.Require().ErrorIs(err, ErrCapacityLimitReached) } +func (s *BlockStorageTestSuite) Test_HasFreeSpace_Limit_Reached() { + const batchesCount = 5 + storage := s.newTestBlockStorage(NewBlockStorageConfig(batchesCount)) + batches := testaide.NewBatchesSequence(batchesCount) + + for _, batch := range batches { + hasFreeSpace, err := storage.HasFreeSpace(s.ctx) + s.Require().NoError(err) + s.Require().True(hasFreeSpace) + + err = storage.PutBlockBatch(s.ctx, batch) + s.Require().NoError(err) + } + + hasFreeSpace, err := storage.HasFreeSpace(s.ctx) + s.Require().NoError(err) + s.Require().False(hasFreeSpace) +} + +func (s *BlockStorageTestSuite) Test_HasFreeSpace_AfterReset() { + const batchesCount = 5 + storage := s.newTestBlockStorage(NewBlockStorageConfig(batchesCount)) + batches := testaide.NewBatchesSequence(batchesCount) + + hasFreeSpaceInitially, err := storage.HasFreeSpace(s.ctx) + s.Require().NoError(err) + s.Require().True(hasFreeSpaceInitially) + + for _, batch := range batches { + err := storage.PutBlockBatch(s.ctx, batch) + s.Require().NoError(err) + } + + err = storage.ResetAllBatches(s.ctx) + s.Require().NoError(err) + + hasFreeSpaceAfterReset, err := storage.HasFreeSpace(s.ctx) + s.Require().NoError(err) + s.Require().True(hasFreeSpaceAfterReset) +} + func (s *BlockStorageTestSuite) Test_PutBlockBatch_Free_Capacity_On_SetBatchAsProposed() { batches := testaide.NewBatchesSequence(2) @@ -616,6 +657,26 @@ func (s *BlockStorageTestSuite) Test_ResetAllBatches() { } } +func (s *BlockStorageTestSuite) Test_ResetAllBatches_Latest_Batch_Is_Nil() { + batches := testaide.NewBatchesSequence(resetTestBatchesCount) + + for _, batch := range batches { + err := s.bs.PutBlockBatch(s.ctx, batch) + s.Require().NoError(err) + } + + latestBeforeReset, err := s.bs.TryGetLatestBatch(s.ctx) + s.Require().NoError(err) + s.Require().Equal(batches[len(batches)-1], latestBeforeReset) + + err = s.bs.ResetAllBatches(s.ctx) + s.Require().NoError(err) + + latestAfterReset, err := s.bs.TryGetLatestBatch(s.ctx) + s.Require().NoError(err) + s.Require().Nil(latestAfterReset) +} + func (s *BlockStorageTestSuite) Test_ResetAllBatches_1K_Batches_To_Purge() { capacityLimit := uint32(1_000) config := NewBlockStorageConfig(capacityLimit)