11package core
22
33import (
4+ "bytes"
45 "context"
56 "errors"
67 "fmt"
@@ -10,14 +11,16 @@ import (
1011 "github.com/NilFoundation/nil/nil/common/concurrent"
1112 "github.com/NilFoundation/nil/nil/common/logging"
1213 coreTypes "github.com/NilFoundation/nil/nil/internal/types"
13- "github.com/NilFoundation/nil/nil/services/synccommittee/core/batches"
1414 "github.com/NilFoundation/nil/nil/services/synccommittee/core/batches/blob"
15+ "github.com/NilFoundation/nil/nil/services/synccommittee/core/batches/encode"
1516 v1 "github.com/NilFoundation/nil/nil/services/synccommittee/core/batches/encode/v1"
1617 "github.com/NilFoundation/nil/nil/services/synccommittee/core/reset"
1718 "github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
19+ "github.com/NilFoundation/nil/nil/services/synccommittee/internal/rollupcontract"
1820 "github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv"
1921 "github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage"
2022 "github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
23+ ethtypes "github.com/ethereum/go-ethereum/core/types"
2124 "github.com/jonboulle/clockwork"
2225)
2326
@@ -41,11 +44,13 @@ type AggregatorBlockStorage interface {
4144
4245type AggregatorConfig struct {
4346 RpcPollingInterval time.Duration
47+ MaxBlobsInTx uint
4448}
4549
4650func NewAggregatorConfig (rpcPollingInterval time.Duration ) AggregatorConfig {
4751 return AggregatorConfig {
4852 RpcPollingInterval : rpcPollingInterval ,
53+ MaxBlobsInTx : 6 ,
4954 }
5055}
5156
@@ -58,10 +63,13 @@ type aggregator struct {
5863 blockStorage AggregatorBlockStorage
5964 taskStorage AggregatorTaskStorage
6065 subgraphFetcher * subgraphFetcher
61- batchCommitter batches.BatchCommitter
66+ batchEncoder encode.BatchEncoder
67+ blobBuilder blob.Builder
68+ rollupContract rollupcontract.Wrapper
6269 resetter * reset.StateResetter
6370 clock clockwork.Clock
6471 metrics AggregatorMetrics
72+ config AggregatorConfig
6573 workerAction * concurrent.Suspendable
6674 logger logging.Logger
6775}
@@ -71,6 +79,7 @@ func NewAggregator(
7179 blockStorage AggregatorBlockStorage ,
7280 taskStorage AggregatorTaskStorage ,
7381 resetter * reset.StateResetter ,
82+ rollupContractWrapper rollupcontract.Wrapper ,
7483 clock clockwork.Clock ,
7584 logger logging.Logger ,
7685 metrics AggregatorMetrics ,
@@ -81,16 +90,13 @@ func NewAggregator(
8190 blockStorage : blockStorage ,
8291 taskStorage : taskStorage ,
8392 subgraphFetcher : newSubgraphFetcher (rpcClient , logger ),
84- batchCommitter : batches .NewBatchCommitter (
85- v1 .NewEncoder (logger ),
86- blob .NewBuilder (),
87- nil , // TODO
88- logger ,
89- batches .DefaultCommitOptions (),
90- ),
91- resetter : resetter ,
92- clock : clock ,
93- metrics : metrics ,
93+ batchEncoder : v1 .NewEncoder (logger ),
94+ blobBuilder : blob .NewBuilder (),
95+ rollupContract : rollupContractWrapper ,
96+ resetter : resetter ,
97+ clock : clock ,
98+ metrics : metrics ,
99+ config : config ,
94100 }
95101
96102 agg .workerAction = concurrent .NewSuspendable (agg .runIteration , config .RpcPollingInterval )
@@ -357,13 +363,18 @@ func (agg *aggregator) handleBlockBatch(ctx context.Context, batch *types.BlockB
357363 return err
358364 }
359365
366+ sidecar , dataProofs , err := agg .prepareForBatchCommit (ctx , batch )
367+ if err != nil {
368+ return err
369+ }
370+ batch .SetDataProofs (dataProofs )
371+
360372 if err := agg .blockStorage .SetBlockBatch (ctx , batch ); err != nil {
361373 return fmt .Errorf ("error storing block batch, latestMainHash=%s: %w" , batch .LatestMainBlock ().Hash , err )
362374 }
363375
364- prunedBatch := types .NewPrunedBatch (batch )
365- if err := agg .batchCommitter .Commit (ctx , prunedBatch ); err != nil {
366- return err
376+ if err := agg .rollupContract .CommitBatch (ctx , sidecar , batch .Id .String ()); err != nil {
377+ return fmt .Errorf ("error committing batch, latestMainHash=%s: %w" , batch .LatestMainBlock ().Hash , err )
367378 }
368379
369380 if err := agg .createProofTasks (ctx , batch ); err != nil {
@@ -392,3 +403,20 @@ func (agg *aggregator) createProofTasks(ctx context.Context, batch *types.BlockB
392403
393404 return nil
394405}
406+
407+ func (agg * aggregator ) prepareForBatchCommit (
408+ ctx context.Context , batch * types.BlockBatch ,
409+ ) (* ethtypes.BlobTxSidecar , types.DataProofs , error ) {
410+ var binTransactions bytes.Buffer
411+ if err := agg .batchEncoder .Encode (types .NewPrunedBatch (batch ), & binTransactions ); err != nil {
412+ return nil , nil , err
413+ }
414+ agg .logger .Debug ().Int ("compressed_batch_len" , binTransactions .Len ()).Msg ("encoded transaction" )
415+
416+ blobs , err := agg .blobBuilder .MakeBlobs (& binTransactions , agg .config .MaxBlobsInTx )
417+ if err != nil {
418+ return nil , nil , err
419+ }
420+
421+ return agg .rollupContract .PrepareBlobs (ctx , blobs )
422+ }
0 commit comments