diff --git a/nil/services/relayer/internal/l1/event_listener.go b/nil/services/relayer/internal/l1/event_listener.go index b3b51c4cf..ee3a50bc6 100644 --- a/nil/services/relayer/internal/l1/event_listener.go +++ b/nil/services/relayer/internal/l1/event_listener.go @@ -51,6 +51,7 @@ type EventListener struct { clock clockwork.Clock config *EventListenerConfig + metrics EventListenerMetrics eventStorage *EventStorage state struct { @@ -70,6 +71,7 @@ func NewEventListener( ethClient EthClient, contractClient L1Contract, storage *EventStorage, + metrics EventListenerMetrics, logger logging.Logger, ) (*EventListener, error) { el := &EventListener{ @@ -78,6 +80,7 @@ func NewEventListener( clock: clock, config: config, eventStorage: storage, + metrics: metrics, } el.state.emitter = make(chan struct{}, config.EmitEventCapacity) @@ -161,8 +164,6 @@ func (el *EventListener) subscriber(ctx context.Context, eventCh chan<- *L1Messa Err(err). Msg("failed to subscribe to updates from L1 contract") return err - - // TODO(oclaw) metrics } defer sub.Unsubscribe() @@ -181,7 +182,7 @@ func (el *EventListener) subscriber(ctx context.Context, eventCh chan<- *L1Messa Err(err). Msg("L1 subscription is broken") - // TODO(oclaw) metrics + el.metrics.AddSubscriptionError(ctx) return fmt.Errorf("%w: %w", ErrSubscriptionIsBroken, err) } el.logger.Debug().Msg("subscription channel is closed") @@ -197,6 +198,9 @@ func (el *EventListener) fetcher(ctx context.Context, eventCh chan<- *L1MessageS el.logger.Info().Msg("started fetcher") + el.metrics.SetFetcherActive(ctx) + defer el.metrics.SetFetcherIdle(ctx) + // try fetching as long as possible, force exit after large enough attempt number retrier := common.NewRetryRunner( common.RetryConfig{ @@ -210,12 +214,9 @@ func (el *EventListener) fetcher(ctx context.Context, eventCh chan<- *L1MessageS err := el.fetchPastEvents(ctx, eventCh) if err != nil { el.logger.Error().Err(err).Msg("historical event fetching failed") - // TODO (oclaw) metrics } return err }) - - // TODO(oclaw) metrics (this routine is not expected to run for too long, we should now if something is stuck here) } func (el *EventListener) fetchPastEvents(ctx context.Context, eventCh chan<- *L1MessageSent) error { @@ -286,6 +287,7 @@ func (el *EventListener) eventProcessor( if err := el.processEvent(ctx, event); err != nil { return err } + el.metrics.AddEventFromFetcher(ctx) processedOldEvents++ } @@ -304,6 +306,7 @@ func (el *EventListener) eventProcessor( if err := el.processEvent(ctx, event); err != nil { return err } + el.metrics.AddEventFromSubscriber(ctx) case <-ctx.Done(): return ctx.Err() } diff --git a/nil/services/relayer/internal/l1/event_listener_test.go b/nil/services/relayer/internal/l1/event_listener_test.go index 62265fa4d..e8d2e1d5b 100644 --- a/nil/services/relayer/internal/l1/event_listener_test.go +++ b/nil/services/relayer/internal/l1/event_listener_test.go @@ -8,6 +8,7 @@ import ( "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" + "github.com/NilFoundation/nil/nil/services/relayer/internal/storage" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -21,13 +22,15 @@ type EventListenerTestSuite struct { suite.Suite // high level dependencies - database db.DB - storage *EventStorage - logger logging.Logger - clock clockwork.Clock + database db.DB + storage *EventStorage + storageMetrics storage.TableMetrics + logger logging.Logger + clock clockwork.Clock // testing entity - listener *EventListener + listener *EventListener + listenerMetrics EventListenerMetrics // mocks ethClientMock *EthClientMock @@ -57,11 +60,16 @@ func (s *EventListenerTestSuite) SetupTest() { s.database, err = db.NewBadgerDbInMemory() s.Require().NoError(err, "failed to initialize test db") + s.listenerMetrics, err = NewEventListenerMetrics() + s.Require().NoError(err) + + s.storageMetrics, err = storage.NewTableMetrics() + s.Require().NoError(err) + s.clock = clockwork.NewRealClock() s.ethClientMock = &EthClientMock{} s.l1ContractMock = &L1ContractMock{} - - s.storage, err = NewEventStorage(s.ctx, s.database, s.clock, nil, s.logger) + s.storage, err = NewEventStorage(s.ctx, s.database, s.clock, s.storageMetrics, s.logger) s.Require().NoError(err, "failed to initialize event storage") cfg := DefaultEventListenerConfig() @@ -69,7 +77,15 @@ func (s *EventListenerTestSuite) SetupTest() { cfg.BridgeMessengerContractAddress = "0xDEADBEEF" cfg.EmitEventCapacity = 100 // do avoid event dropping - s.listener, err = NewEventListener(cfg, s.clock, s.ethClientMock, s.l1ContractMock, s.storage, s.logger) + s.listener, err = NewEventListener( + cfg, + s.clock, + s.ethClientMock, + s.l1ContractMock, + s.storage, + s.listenerMetrics, + s.logger, + ) s.Require().NoError(err, "failed to create listener") } diff --git a/nil/services/relayer/internal/l1/finality_ensurer.go b/nil/services/relayer/internal/l1/finality_ensurer.go index f4f32723f..b0556206f 100644 --- a/nil/services/relayer/internal/l1/finality_ensurer.go +++ b/nil/services/relayer/internal/l1/finality_ensurer.go @@ -76,6 +76,7 @@ type FinalityEnsurer struct { clock clockwork.Clock l1Storage *EventStorage l2Storage *l2.EventStorage + metrics FinalityEnsurerMetrics eventProvider eventProvider emitter chan struct{} @@ -88,6 +89,7 @@ func NewFinalityEnsurer( logger logging.Logger, l1Storage *EventStorage, l2Storage *l2.EventStorage, + metrics FinalityEnsurerMetrics, eventProvider eventProvider, ) (*FinalityEnsurer, error) { err := config.Validate() @@ -103,6 +105,7 @@ func NewFinalityEnsurer( l1Storage: l1Storage, l2Storage: l2Storage, eventProvider: eventProvider, + metrics: metrics, emitter: make(chan struct{}, config.EventEmitterCapacity), } @@ -165,6 +168,12 @@ func (fe *FinalityEnsurer) blockFetcher(ctx context.Context) error { if fe.finalizedBlock != nil { log = log.Uint64("local_finalized_block_number", fe.finalizedBlock.BlockNumber) } + + fe.metrics.SetTimeSinceFinalizedBlockNumberUpdate( + ctx, + uint64(now.Sub(lastSuccessfulUpdate).Seconds()), + ) + log.Msg("failed to fetch last finalized block number from Etherium") continue } @@ -184,8 +193,7 @@ func (fe *FinalityEnsurer) blockFetcher(ctx context.Context) error { Msg("refreshed actual finalized block number") lastSuccessfulUpdate = now - // TODO(oclaw) metrics - // we should alert somehow if we are unable to obtain actual finalized block number long enough + fe.metrics.SetTimeSinceFinalizedBlockNumberUpdate(ctx, 0) } } } @@ -205,6 +213,7 @@ func (fe *FinalityEnsurer) pendingEventPoller(ctx context.Context) error { } if err := fe.forwardFinalizedEvents(ctx); err != nil { fe.logger.Error().Err(err).Msg("failed to process l1 pending events") + fe.metrics.AddRelayError(ctx) } } } @@ -265,6 +274,11 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error { Int("orphaned_blocks_count", len(orphaned)). Msg("checked blocks finality") + var ( + finalizedEventCount int + orphanedEventCount = len(events) + ) + if len(finalized) > 0 { var l2Events []*l2.Event for _, finblk := range finalized { @@ -277,6 +291,9 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error { Int("finalized_event_count", len(l2Events)). Msg("saving messages to L2 event storage") + finalizedEventCount = len(l2Events) + orphanedEventCount = len(events) - finalizedEventCount + err := fe.l2Storage.StoreEvents(ctx, l2Events) if ignoreErrors(err, storage.ErrKeyExists) != nil { return fmt.Errorf("failed to forward events to L2 storage: %w", err) @@ -289,6 +306,9 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error { } } + fe.metrics.AddFinalizedEvents(ctx, uint64(finalizedEventCount)) + fe.metrics.AddOrphanedEvents(ctx, uint64(orphanedEventCount)) + fe.logger.Info(). Int("dropping_events_count", len(events)). Msg("dropping pending events from L1 storage") @@ -302,7 +322,6 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error { return fmt.Errorf("failed to cleanup events from l1 storage: %w", err) } - // TODO(oclaw) metrics return nil } diff --git a/nil/services/relayer/internal/l1/finality_ensurer_test.go b/nil/services/relayer/internal/l1/finality_ensurer_test.go index 221c560ff..c3f3deaf6 100644 --- a/nil/services/relayer/internal/l1/finality_ensurer_test.go +++ b/nil/services/relayer/internal/l1/finality_ensurer_test.go @@ -10,6 +10,7 @@ import ( "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" "github.com/NilFoundation/nil/nil/services/relayer/internal/l2" + "github.com/NilFoundation/nil/nil/services/relayer/internal/storage" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" @@ -50,13 +51,15 @@ type FinalityEnsurerTestSuite struct { suite.Suite // high level dependencies - database db.DB - l1Storage *EventStorage - l2Storage *l2.EventStorage - logger logging.Logger + database db.DB + l1Storage *EventStorage + l2Storage *l2.EventStorage + storageMetrics storage.TableMetrics + logger logging.Logger // testing entity - ensurer *FinalityEnsurer + ensurer *FinalityEnsurer + ensurerMetrics FinalityEnsurerMetrics // mocks ethClientMock *EthClientMock @@ -86,6 +89,9 @@ func (s *FinalityEnsurerTestSuite) SetupTest() { s.database, err = db.NewBadgerDbInMemory() s.Require().NoError(err, "failed to initialize database") + s.storageMetrics, err = storage.NewTableMetrics() + s.Require().NoError(err) + s.clockMock = clockwork.NewFakeClock() s.ethClientMock = &EthClientMock{} @@ -104,16 +110,19 @@ func (s *FinalityEnsurerTestSuite) SetupTest() { return nil, nil } - s.l1Storage, err = NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger) + s.l1Storage, err = NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger) s.Require().NoError(err, "failed to initialize L1 storage") - s.l2Storage = l2.NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger) + s.l2Storage = l2.NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger) cfg := DefaultFinalityEnsurerConfig() cfg.EventEmitterCapacity = 100 s.eventListenerStub = newEventListenerStub() + s.ensurerMetrics, err = NewFinalityEnsurerMetrics() + s.Require().NoError(err) + s.ensurer, err = NewFinalityEnsurer( cfg, s.ethClientMock, @@ -121,6 +130,7 @@ func (s *FinalityEnsurerTestSuite) SetupTest() { s.logger, s.l1Storage, s.l2Storage, + s.ensurerMetrics, s.eventListenerStub, ) s.Require().NoError(err) diff --git a/nil/services/relayer/internal/l1/metrics.go b/nil/services/relayer/internal/l1/metrics.go new file mode 100644 index 000000000..459c13670 --- /dev/null +++ b/nil/services/relayer/internal/l1/metrics.go @@ -0,0 +1,156 @@ +package l1 + +import ( + "context" + + "github.com/NilFoundation/nil/nil/internal/telemetry" + "github.com/NilFoundation/nil/nil/internal/telemetry/telattr" + "github.com/NilFoundation/nil/nil/services/relayer/internal/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type EventListenerMetrics interface { + SetFetcherActive(ctx context.Context) + SetFetcherIdle(ctx context.Context) + AddEventFromFetcher(ctx context.Context) + AddEventFromSubscriber(ctx context.Context) + AddSubscriptionError(ctx context.Context) +} + +const ( + eventSourceLabel = "event_source" + eventSourceFetcher = "fetcher" + eventSourceSubscriber = "subscriber" + + eventStatusLabel = "event_status" + eventStatusFinalized = "finalized" + eventStatusOrphaned = "orphaned" +) + +type eventListenerMetrics struct { + attrs metric.MeasurementOption + + fetcherRunStatus telemetry.Gauge // 0 if fetcher is inactive + subsciptionError telemetry.Counter + eventsProcessed telemetry.Counter +} + +func NewEventListenerMetrics() (EventListenerMetrics, error) { + elm := &eventListenerMetrics{} + if err := metrics.InitMetrics(elm, "relayer", "event_listener"); err != nil { + return nil, err + } + return elm, nil +} + +func (elm *eventListenerMetrics) Init(name string, meter telemetry.Meter, attrs metric.MeasurementOption) error { + var err error + + elm.fetcherRunStatus, err = meter.Int64Gauge(name + ".fetcher_active") + if err != nil { + return err + } + + elm.subsciptionError, err = meter.Int64Counter(name + ".subscription_failure") + if err != nil { + return err + } + + elm.eventsProcessed, err = meter.Int64Counter(name + ".events_processed") + if err != nil { + return err + } + + elm.attrs = attrs + return nil +} + +func (elm *eventListenerMetrics) SetFetcherActive(ctx context.Context) { + elm.fetcherRunStatus.Record(ctx, 1, elm.attrs) +} + +func (elm *eventListenerMetrics) SetFetcherIdle(ctx context.Context) { + elm.fetcherRunStatus.Record(ctx, 0, elm.attrs) +} + +func (elm *eventListenerMetrics) AddEventFromFetcher(ctx context.Context) { + sourceAttr := telattr.With(attribute.String(eventSourceLabel, eventSourceFetcher)) + elm.eventsProcessed.Add(ctx, 1, elm.attrs, sourceAttr) +} + +func (elm *eventListenerMetrics) AddEventFromSubscriber(ctx context.Context) { + sourceAttr := telattr.With(attribute.String(eventSourceLabel, eventSourceSubscriber)) + elm.eventsProcessed.Add(ctx, 1, elm.attrs, sourceAttr) +} + +func (elm *eventListenerMetrics) AddSubscriptionError(ctx context.Context) { + elm.subsciptionError.Add(ctx, 1, elm.attrs) +} + +type FinalityEnsurerMetrics interface { + SetTimeSinceFinalizedBlockNumberUpdate(ctx context.Context, sec uint64) + AddRelayError(ctx context.Context) + AddFinalizedEvents(ctx context.Context, count uint64) + AddOrphanedEvents(ctx context.Context, count uint64) +} + +type finalityEnsurerMetrics struct { + attrs metric.MeasurementOption + + finalizedBlockUpdateLag telemetry.Gauge + relayErrors telemetry.Counter + processedEvents telemetry.Counter +} + +func NewFinalityEnsurerMetrics() (FinalityEnsurerMetrics, error) { + fem := &finalityEnsurerMetrics{} + if err := metrics.InitMetrics(fem, "relayer", "finality_ensurer"); err != nil { + return nil, err + } + return fem, nil +} + +func (fem *finalityEnsurerMetrics) Init(name string, meter telemetry.Meter, attrs metric.MeasurementOption) error { + var err error + + fem.finalizedBlockUpdateLag, err = meter.Int64Gauge(name + ".finalized_block_update_lag_sec") + if err != nil { + return err + } + + fem.relayErrors, err = meter.Int64Counter(name + ".relay_error") + if err != nil { + return err + } + + fem.processedEvents, err = meter.Int64Counter(name + ".processed_events") + if err != nil { + return err + } + + fem.attrs = attrs + return nil +} + +func (fem *finalityEnsurerMetrics) SetTimeSinceFinalizedBlockNumberUpdate(ctx context.Context, sec uint64) { + fem.finalizedBlockUpdateLag.Record(ctx, int64(sec), fem.attrs) +} + +func (fem *finalityEnsurerMetrics) AddRelayError(ctx context.Context) { + fem.relayErrors.Add(ctx, 1, fem.attrs) +} + +func (fem *finalityEnsurerMetrics) AddFinalizedEvents(ctx context.Context, count uint64) { + fem.processedEvents.Add(ctx, int64(count), + telattr.With(attribute.String(eventStatusLabel, eventStatusFinalized)), + fem.attrs, + ) +} + +func (fem *finalityEnsurerMetrics) AddOrphanedEvents(ctx context.Context, count uint64) { + fem.processedEvents.Add(ctx, int64(count), + telattr.With(attribute.String(eventStatusLabel, eventStatusOrphaned)), + fem.attrs, + ) +} diff --git a/nil/services/relayer/internal/l1/storage.go b/nil/services/relayer/internal/l1/storage.go index eabb9dd56..121f3e432 100644 --- a/nil/services/relayer/internal/l1/storage.go +++ b/nil/services/relayer/internal/l1/storage.go @@ -28,13 +28,8 @@ const ( lastProcessedBlockKey = "last_processed_block_key" ) -type EventStorageMetrics interface { - // TODO(oclaw) -} - type EventStorage struct { *storage.BaseStorage - metrics EventStorageMetrics eventsSequencer db.Sequence } @@ -42,12 +37,11 @@ func NewEventStorage( ctx context.Context, database db.DB, clock clockwork.Clock, - metrics EventStorageMetrics, + metrics storage.TableMetrics, logger logging.Logger, ) (*EventStorage, error) { es := &EventStorage{ - BaseStorage: storage.NewBaseStorage(ctx, database, clock, logger), - metrics: metrics, + BaseStorage: storage.NewBaseStorage(ctx, database, clock, logger, metrics), } var err error es.eventsSequencer, err = database.GetSequence(ctx, []byte(pendingEventsSequencer), 100) @@ -73,8 +67,6 @@ func (es *EventStorage) StoreEvent(ctx context.Context, evt *Event) error { writer := storage.NewJSONWriter[*Event](pendingEventsTable, es.BaseStorage, false) return writer.PutTx(ctx, evt.Hash.Bytes(), evt) - - // TODO (oclaw) metrics }) } @@ -88,12 +80,15 @@ func (es *EventStorage) IterateEventsByBatch( if err != nil { return err } + defer tx.Rollback() iter, err := tx.Range(pendingEventsTable, nil, nil) if err != nil { return err } + defer iter.Close() + count := 0 batch := make([]*Event, batchSize) idx := 0 for iter.HasNext() { @@ -101,6 +96,7 @@ func (es *EventStorage) IterateEventsByBatch( if err != nil { return err } + count++ if err := json.Unmarshal(val, &batch[idx]); err != nil { return fmt.Errorf("%w: %w", storage.ErrSerializationFailed, err) } @@ -117,6 +113,8 @@ func (es *EventStorage) IterateEventsByBatch( return callback(batch[:idx]) } + es.Metrics.SetTableSize(ctx, pendingEventsTable, count) + return nil }) } @@ -135,7 +133,9 @@ func (es *EventStorage) DeleteEvents(ctx context.Context, hashes []ethcommon.Has } } - return es.Commit(tx) + return es.Commit(tx, func() { + es.Metrics.RecordDeletes(ctx, pendingEventsTable, len(hashes)) + }) }) } @@ -146,6 +146,7 @@ func (es *EventStorage) GetLastProcessedBlock(ctx context.Context) (*ProcessedBl if err != nil { return err } + defer tx.Rollback() data, err := tx.Get(lastProcessedBlockTable, []byte(lastProcessedBlockKey)) if errors.Is(err, db.ErrKeyNotFound) { @@ -182,7 +183,5 @@ func (es *EventStorage) SetLastProcessedBlock(ctx context.Context, blk *Processe return es.RetryRunner.Do(ctx, func(ctx context.Context) error { writer := storage.NewJSONWriter[*ProcessedBlock](lastProcessedBlockTable, es.BaseStorage, true) return writer.PutTx(ctx, []byte(lastProcessedBlockKey), blk) - - // TODO(oclaw) metrics }) } diff --git a/nil/services/relayer/internal/l2/metrics.go b/nil/services/relayer/internal/l2/metrics.go new file mode 100644 index 000000000..c8e9497c0 --- /dev/null +++ b/nil/services/relayer/internal/l2/metrics.go @@ -0,0 +1,54 @@ +package l2 + +import ( + "context" + + "github.com/NilFoundation/nil/nil/internal/telemetry" + "github.com/NilFoundation/nil/nil/services/relayer/internal/metrics" + "go.opentelemetry.io/otel/metric" +) + +type TransactionSenderMetrics interface { + AddRelayedEvents(ctx context.Context, count uint64) + AddRelayError(ctx context.Context) +} + +type transactionSenderMetrics struct { + attrs metric.MeasurementOption + + relayErrors telemetry.Counter + relayedEvents telemetry.Counter +} + +func NewTransactionSenderMetrics() (TransactionSenderMetrics, error) { + tsm := &transactionSenderMetrics{} + if err := metrics.InitMetrics(tsm, "relayer", "transaction_sender"); err != nil { + return nil, err + } + return tsm, nil +} + +func (tsm *transactionSenderMetrics) Init(name string, meter telemetry.Meter, attrs metric.MeasurementOption) error { + var err error + + tsm.relayedEvents, err = meter.Int64Counter(name + ".relayed_events") + if err != nil { + return err + } + + tsm.relayErrors, err = meter.Int64Counter(name + ".relay_error") + if err != nil { + return err + } + + tsm.attrs = attrs + return nil +} + +func (tsm *transactionSenderMetrics) AddRelayError(ctx context.Context) { + tsm.relayErrors.Add(ctx, 1, tsm.attrs) +} + +func (tsm *transactionSenderMetrics) AddRelayedEvents(ctx context.Context, count uint64) { + tsm.relayedEvents.Add(ctx, int64(count), tsm.attrs) +} diff --git a/nil/services/relayer/internal/l2/storage.go b/nil/services/relayer/internal/l2/storage.go index 9a343c10a..6bb6beec7 100644 --- a/nil/services/relayer/internal/l2/storage.go +++ b/nil/services/relayer/internal/l2/storage.go @@ -19,25 +19,19 @@ const ( pendingEventsTable = "pending_l2_events" ) -type EventStorageMetrics interface { - // TODO(oclaw) -} - type EventStorage struct { *storage.BaseStorage - metrics EventStorageMetrics } func NewEventStorage( ctx context.Context, database db.DB, clock clockwork.Clock, - metrics EventStorageMetrics, + metrics storage.TableMetrics, logger logging.Logger, ) *EventStorage { es := &EventStorage{ - BaseStorage: storage.NewBaseStorage(ctx, database, clock, logger), - metrics: metrics, + BaseStorage: storage.NewBaseStorage(ctx, database, clock, logger, metrics), } return es } @@ -51,7 +45,11 @@ func (es *EventStorage) StoreEvents(ctx context.Context, evts []*Event) error { } return es.RetryRunner.Do(ctx, func(ctx context.Context) error { - writer := storage.NewJSONWriter[*Event](pendingEventsTable, es.BaseStorage, false) + writer := storage.NewJSONWriter[*Event]( + pendingEventsTable, + es.BaseStorage, + false, + ) reqs := storage.MakeInsertRequests( evts, func(e *Event) []byte { @@ -59,8 +57,6 @@ func (es *EventStorage) StoreEvents(ctx context.Context, evts []*Event) error { }, ) return writer.PutManyTx(ctx, reqs) - - // TODO (oclaw) metrics }) } @@ -74,14 +70,17 @@ func (es *EventStorage) IterateEventsByBatch( if err != nil { return err } + defer tx.Rollback() iter, err := tx.Range(pendingEventsTable, nil, nil) if err != nil { return err } + defer iter.Close() batch := make([]*Event, batchSize) idx := 0 + count := 0 for iter.HasNext() { _, val, err := iter.Next() if err != nil { @@ -92,6 +91,7 @@ func (es *EventStorage) IterateEventsByBatch( } idx++ + count++ if idx >= batchSize { if err := callback(batch); err != nil { return err @@ -103,6 +103,8 @@ func (es *EventStorage) IterateEventsByBatch( return callback(batch[:idx]) } + es.Metrics.SetTableSize(ctx, pendingEventsTable, count) + return nil }) } @@ -121,6 +123,8 @@ func (es *EventStorage) DeleteEvents(ctx context.Context, hashes []ethcommon.Has } } - return es.Commit(tx) + return es.Commit(tx, func() { + es.Metrics.RecordDeletes(ctx, pendingEventsTable, len(hashes)) + }) }) } diff --git a/nil/services/relayer/internal/l2/transaction_sender.go b/nil/services/relayer/internal/l2/transaction_sender.go index 1216b86ac..d0be644c8 100644 --- a/nil/services/relayer/internal/l2/transaction_sender.go +++ b/nil/services/relayer/internal/l2/transaction_sender.go @@ -44,6 +44,7 @@ type TransactionSender struct { logger logging.Logger storage *EventStorage eventFinProvider eventFinalizedProvider + metrics TransactionSenderMetrics contractBinding L2Contract } @@ -53,6 +54,7 @@ func NewTransactionSender( logger logging.Logger, clock clockwork.Clock, eventFinProvider eventFinalizedProvider, + metrics TransactionSenderMetrics, contractBinding L2Contract, ) (*TransactionSender, error) { if err := config.Validate(); err != nil { @@ -64,6 +66,7 @@ func NewTransactionSender( clock: clock, storage: storage, eventFinProvider: eventFinProvider, + metrics: metrics, contractBinding: contractBinding, } ts.logger = logger.With().Str(logging.FieldComponent, ts.Name()).Logger() @@ -91,6 +94,7 @@ func (ts *TransactionSender) Run(ctx context.Context, started chan<- struct{}) e } if err := ts.relayEvents(ctx); err != nil { ts.logger.Error().Err(err).Msg("error occurred during relaying events to L2") + ts.metrics.AddRelayError(ctx) } } } @@ -153,6 +157,8 @@ func (ts *TransactionSender) relayEvents(ctx context.Context) error { } ts.logger.Debug().Stringer("event_hash", evt.Hash).Msg("event relayed to L2") droppingEvents = append(droppingEvents, evt.Hash) + + ts.metrics.AddRelayedEvents(ctx, 1) } return nil diff --git a/nil/services/relayer/internal/l2/transaction_sender_test.go b/nil/services/relayer/internal/l2/transaction_sender_test.go index 39eeef94c..37580662d 100644 --- a/nil/services/relayer/internal/l2/transaction_sender_test.go +++ b/nil/services/relayer/internal/l2/transaction_sender_test.go @@ -8,6 +8,7 @@ import ( "github.com/NilFoundation/nil/nil/common" "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" + "github.com/NilFoundation/nil/nil/services/relayer/internal/storage" "github.com/jonboulle/clockwork" "github.com/rs/zerolog" "github.com/stretchr/testify/suite" @@ -45,12 +46,14 @@ type TransactionSenderTestSuite struct { suite.Suite // high-level dependencies - database db.DB - logger logging.Logger - l2Storage *EventStorage + database db.DB + logger logging.Logger + l2Storage *EventStorage + storageMetrics storage.TableMetrics // testing entity - transactionSender *TransactionSender + transactionSender *TransactionSender + transactionSenderMetrics TransactionSenderMetrics // mocks contractMock *L2ContractMock @@ -81,18 +84,25 @@ func (s *TransactionSenderTestSuite) SetupTest() { s.contractMock = &L2ContractMock{} - s.l2Storage = NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger) + s.storageMetrics, err = storage.NewTableMetrics() + s.Require().NoError(err) + + s.l2Storage = NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger) cfg := DefaultTransactionSenderConfig() s.eventFinalizer = newEventFinalizerStub() + s.transactionSenderMetrics, err = NewTransactionSenderMetrics() + s.Require().NoError(err) + s.transactionSender, err = NewTransactionSender( cfg, s.l2Storage, s.logger, s.clockMock, s.eventFinalizer, + s.transactionSenderMetrics, s.contractMock, ) s.Require().NoError(err, "failed to initialize transaction sender") diff --git a/nil/services/relayer/internal/metrics/init.go b/nil/services/relayer/internal/metrics/init.go new file mode 100644 index 000000000..c1d2c1f50 --- /dev/null +++ b/nil/services/relayer/internal/metrics/init.go @@ -0,0 +1,33 @@ +package metrics + +import ( + "os" + + "github.com/NilFoundation/nil/nil/internal/telemetry" + "github.com/NilFoundation/nil/nil/internal/telemetry/telattr" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type Metrics interface { + Init(name string, meter telemetry.Meter, attrs metric.MeasurementOption) error +} + +func InitMetrics(mt Metrics, namespace, component string) error { + meter := telemetry.NewMeter(namespace) + + hostName, err := os.Hostname() + if err != nil { + return err + } + + attr := telattr.With( + attribute.String("host.name", hostName), + ) + + return mt.Init( + namespace+"."+component, + meter, + attr, + ) +} diff --git a/nil/services/relayer/internal/storage/json_writer.go b/nil/services/relayer/internal/storage/json_writer.go index 13efce789..05da40472 100644 --- a/nil/services/relayer/internal/storage/json_writer.go +++ b/nil/services/relayer/internal/storage/json_writer.go @@ -14,7 +14,11 @@ type jsonDbWriter[T any] struct { upsert bool } -func NewJSONWriter[T any](tableName db.TableName, storage *BaseStorage, upsert bool) *jsonDbWriter[T] { +func NewJSONWriter[T any]( + tableName db.TableName, + storage *BaseStorage, + upsert bool, +) *jsonDbWriter[T] { return &jsonDbWriter[T]{ table: tableName, storage: storage, @@ -48,7 +52,9 @@ func (jdwr *jsonDbWriter[T]) PutTx(ctx context.Context, key []byte, value T) err return err } - return jdwr.storage.Commit(tx) + return jdwr.storage.Commit(tx, func() { + jdwr.storage.Metrics.RecordInserts(ctx, jdwr.table, 1) + }) } func (jdwr *jsonDbWriter[T]) PutManyTx(ctx context.Context, reqs []InsertRequest[T]) error { @@ -78,7 +84,9 @@ func (jdwr *jsonDbWriter[T]) PutManyTx(ctx context.Context, reqs []InsertRequest } } - return jdwr.storage.Commit(tx) + return jdwr.storage.Commit(tx, func() { + jdwr.storage.Metrics.RecordInserts(ctx, jdwr.table, len(reqs)) + }) } type InsertRequest[T any] struct { diff --git a/nil/services/relayer/internal/storage/metrics.go b/nil/services/relayer/internal/storage/metrics.go new file mode 100644 index 000000000..999196875 --- /dev/null +++ b/nil/services/relayer/internal/storage/metrics.go @@ -0,0 +1,92 @@ +package storage + +import ( + "context" + + "github.com/NilFoundation/nil/nil/internal/db" + "github.com/NilFoundation/nil/nil/internal/telemetry" + "github.com/NilFoundation/nil/nil/internal/telemetry/telattr" + "github.com/NilFoundation/nil/nil/services/relayer/internal/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type TableMetrics interface { + RecordInserts(context.Context, db.TableName, int) + RecordDeletes(context.Context, db.TableName, int) + SetTableSize(context.Context, db.TableName, int) +} + +const ( + opLabel = "operation" + + rwOpDel = "delete" + rwOpInsert = "insert" +) + +type tableMetrics struct { + attrs metric.MeasurementOption + sizeGauge telemetry.Gauge + opCounter telemetry.Counter +} + +func NewTableMetrics() (TableMetrics, error) { + tm := &tableMetrics{} + if err := metrics.InitMetrics(tm, "relayer", "storage"); err != nil { + return nil, err + } + return tm, nil +} + +func (tm *tableMetrics) Init(name string, meter telemetry.Meter, attrs metric.MeasurementOption) error { + var err error + + tm.sizeGauge, err = meter.Int64Gauge(name + ".table_size") + if err != nil { + return err + } + + tm.opCounter, err = meter.Int64Counter(name + ".table_operations") + if err != nil { + return err + } + + tm.attrs = attrs + + return nil +} + +func (tm *tableMetrics) RecordInserts(ctx context.Context, table db.TableName, val int) { + tm.opCounter.Add( + ctx, + int64(val), + tm.attrs, + telattr.With( + attribute.String(opLabel, rwOpInsert), + attribute.String("table_name", string(table)), + ), + ) +} + +func (tm *tableMetrics) RecordDeletes(ctx context.Context, table db.TableName, val int) { + tm.opCounter.Add( + ctx, + int64(val), + tm.attrs, + telattr.With( + attribute.String(opLabel, rwOpDel), + attribute.String("table_name", string(table)), + ), + ) +} + +func (tm *tableMetrics) SetTableSize(ctx context.Context, table db.TableName, size int) { + tm.sizeGauge.Record( + ctx, + int64(size), + tm.attrs, + telattr.With( + attribute.String("table_name", string(table)), + ), + ) +} diff --git a/nil/services/relayer/internal/storage/storage.go b/nil/services/relayer/internal/storage/storage.go index bb08e0da8..a74701bcb 100644 --- a/nil/services/relayer/internal/storage/storage.go +++ b/nil/services/relayer/internal/storage/storage.go @@ -16,6 +16,7 @@ type BaseStorage struct { RetryRunner common.RetryRunner Clock clockwork.Clock Logger logging.Logger + Metrics TableMetrics } func NewBaseStorage( @@ -23,6 +24,7 @@ func NewBaseStorage( database db.DB, clock clockwork.Clock, logger logging.Logger, + metrics TableMetrics, ) *BaseStorage { return &BaseStorage{ Database: database, @@ -36,14 +38,18 @@ func NewBaseStorage( }, logger, ), - Clock: clock, - Logger: logger, + Clock: clock, + Logger: logger, + Metrics: metrics, } } -func (*BaseStorage) Commit(tx db.RwTx) error { +func (*BaseStorage) Commit(tx db.RwTx, onCommitted func()) error { if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } + if onCommitted != nil { + onCommitted() + } return nil } diff --git a/nil/services/relayer/service.go b/nil/services/relayer/service.go index c66177a98..aedec6d0d 100644 --- a/nil/services/relayer/service.go +++ b/nil/services/relayer/service.go @@ -9,8 +9,10 @@ import ( "github.com/NilFoundation/nil/nil/client/rpc" "github.com/NilFoundation/nil/nil/common/logging" "github.com/NilFoundation/nil/nil/internal/db" + "github.com/NilFoundation/nil/nil/internal/telemetry" "github.com/NilFoundation/nil/nil/services/relayer/internal/l1" "github.com/NilFoundation/nil/nil/services/relayer/internal/l2" + "github.com/NilFoundation/nil/nil/services/relayer/internal/storage" "github.com/ethereum/go-ethereum/crypto" "github.com/jonboulle/clockwork" "golang.org/x/sync/errgroup" @@ -21,6 +23,7 @@ type RelayerConfig struct { FinalityEnsurerConfig *l1.FinalityEnsurerConfig TransactionSenderConfig *l2.TransactionSenderConfig L2ContractConfig *l2.ContractConfig + TelemetryConfig *telemetry.Config } func DefaultRelayerConfig() *RelayerConfig { @@ -29,6 +32,9 @@ func DefaultRelayerConfig() *RelayerConfig { FinalityEnsurerConfig: l1.DefaultFinalityEnsurerConfig(), TransactionSenderConfig: l2.DefaultTransactionSenderConfig(), L2ContractConfig: l2.DefaultContractConfig(), + TelemetryConfig: &telemetry.Config{ + ServiceName: "relayer", + }, } } @@ -50,11 +56,20 @@ func New( Logger: logging.NewLogger("relayer"), } + if err := telemetry.Init(ctx, config.TelemetryConfig); err != nil { + return nil, fmt.Errorf("failed to init telemetry: %w", err) + } + + storageMetrics, err := storage.NewTableMetrics() + if err != nil { + return nil, err + } + l1Storage, err := l1.NewEventStorage( ctx, database, clock, - nil, // TODO(oclaw) metrics + storageMetrics, rs.Logger, ) if err != nil { @@ -70,12 +85,18 @@ func New( return nil, err } + eventListenerMetrics, err := l1.NewEventListenerMetrics() + if err != nil { + return nil, err + } + rs.L1EventListener, err = l1.NewEventListener( config.EventListenerConfig, clock, l1Client, l1Contract, l1Storage, + eventListenerMetrics, rs.Logger, ) if err != nil { @@ -86,10 +107,15 @@ func New( ctx, database, clock, - nil, // TODO(oclaw) metrics + storageMetrics, rs.Logger, ) + finalityEnsurerMetrics, err := l1.NewFinalityEnsurerMetrics() + if err != nil { + return nil, err + } + rs.L1FinalityEnsurer, err = l1.NewFinalityEnsurer( config.FinalityEnsurerConfig, l1Client, @@ -97,6 +123,7 @@ func New( rs.Logger, l1Storage, l2Storage, + finalityEnsurerMetrics, rs.L1EventListener, ) if err != nil { @@ -117,12 +144,18 @@ func New( return nil, err } + transactionSenderMetrics, err := l2.NewTransactionSenderMetrics() + if err != nil { + return nil, err + } + rs.L2TransactionSender, err = l2.NewTransactionSender( config.TransactionSenderConfig, l2Storage, rs.Logger, clock, rs.L1FinalityEnsurer, + transactionSenderMetrics, l2Contract, ) if err != nil {