Skip to content

Commit 045c97b

Browse files
author
Daniel Kogtev
committed
relayer metrics
1 parent 99d0dce commit 045c97b

16 files changed

Lines changed: 512 additions & 62 deletions

File tree

nil/cmd/exporter/internal/clickhouse/clickhouse.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"database/sql"
77
"errors"
88
"fmt"
9+
910
"github.com/ClickHouse/clickhouse-go/v2"
1011
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
1112
"github.com/NilFoundation/nil/nil/cmd/exporter/internal"

nil/services/relayer/internal/l1/event_listener.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type EventListener struct {
5151
clock clockwork.Clock
5252

5353
config *EventListenerConfig
54+
metrics EventListenerMetrics
5455
eventStorage *EventStorage
5556

5657
state struct {
@@ -70,6 +71,7 @@ func NewEventListener(
7071
ethClient EthClient,
7172
contractClient L1Contract,
7273
storage *EventStorage,
74+
metrics EventListenerMetrics,
7375
logger logging.Logger,
7476
) (*EventListener, error) {
7577
el := &EventListener{
@@ -78,6 +80,7 @@ func NewEventListener(
7880
clock: clock,
7981
config: config,
8082
eventStorage: storage,
83+
metrics: metrics,
8184
}
8285

8386
el.state.emitter = make(chan struct{}, config.EmitEventCapacity)
@@ -161,8 +164,6 @@ func (el *EventListener) subscriber(ctx context.Context, eventCh chan<- *L1Messa
161164
Err(err).
162165
Msg("failed to subscribe to updates from L1 contract")
163166
return err
164-
165-
// TODO(oclaw) metrics
166167
}
167168
defer sub.Unsubscribe()
168169

@@ -181,7 +182,7 @@ func (el *EventListener) subscriber(ctx context.Context, eventCh chan<- *L1Messa
181182
Err(err).
182183
Msg("L1 subscription is broken")
183184

184-
// TODO(oclaw) metrics
185+
el.metrics.AddSubscriptionError(ctx)
185186
return fmt.Errorf("%w: %w", ErrSubscriptionIsBroken, err)
186187
}
187188
el.logger.Debug().Msg("subscription channel is closed")
@@ -197,6 +198,9 @@ func (el *EventListener) fetcher(ctx context.Context, eventCh chan<- *L1MessageS
197198

198199
el.logger.Info().Msg("started fetcher")
199200

201+
el.metrics.SetFetcherActive(ctx)
202+
defer el.metrics.SetFetcherIdle(ctx)
203+
200204
// try fetching as long as possible, force exit after large enough attempt number
201205
retrier := common.NewRetryRunner(
202206
common.RetryConfig{
@@ -210,12 +214,9 @@ func (el *EventListener) fetcher(ctx context.Context, eventCh chan<- *L1MessageS
210214
err := el.fetchPastEvents(ctx, eventCh)
211215
if err != nil {
212216
el.logger.Error().Err(err).Msg("historical event fetching failed")
213-
// TODO (oclaw) metrics
214217
}
215218
return err
216219
})
217-
218-
// TODO(oclaw) metrics (this routine is not expected to run for too long, we should now if something is stuck here)
219220
}
220221

221222
func (el *EventListener) fetchPastEvents(ctx context.Context, eventCh chan<- *L1MessageSent) error {
@@ -286,6 +287,7 @@ func (el *EventListener) eventProcessor(
286287
if err := el.processEvent(ctx, event); err != nil {
287288
return err
288289
}
290+
el.metrics.AddEventFromFetcher(ctx)
289291
processedOldEvents++
290292
}
291293

@@ -304,6 +306,7 @@ func (el *EventListener) eventProcessor(
304306
if err := el.processEvent(ctx, event); err != nil {
305307
return err
306308
}
309+
el.metrics.AddEventFromSubscriber(ctx)
307310
case <-ctx.Done():
308311
return ctx.Err()
309312
}

nil/services/relayer/internal/l1/event_listener_test.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/NilFoundation/nil/nil/common/logging"
1010
"github.com/NilFoundation/nil/nil/internal/db"
11+
"github.com/NilFoundation/nil/nil/services/relayer/internal/storage"
1112
ethcommon "github.com/ethereum/go-ethereum/common"
1213
"github.com/ethereum/go-ethereum/core/types"
1314
ethtypes "github.com/ethereum/go-ethereum/core/types"
@@ -21,13 +22,15 @@ type EventListenerTestSuite struct {
2122
suite.Suite
2223

2324
// high level dependencies
24-
database db.DB
25-
storage *EventStorage
26-
logger logging.Logger
27-
clock clockwork.Clock
25+
database db.DB
26+
storage *EventStorage
27+
storageMetrics storage.TableMetrics
28+
logger logging.Logger
29+
clock clockwork.Clock
2830

2931
// testing entity
30-
listener *EventListener
32+
listener *EventListener
33+
listenerMetrics EventListenerMetrics
3134

3235
// mocks
3336
ethClientMock *EthClientMock
@@ -57,19 +60,32 @@ func (s *EventListenerTestSuite) SetupTest() {
5760
s.database, err = db.NewBadgerDbInMemory()
5861
s.Require().NoError(err, "failed to initialize test db")
5962

63+
s.listenerMetrics, err = NewEventListenerMetrics()
64+
s.Require().NoError(err)
65+
66+
s.storageMetrics, err = storage.NewTableMetrics()
67+
s.Require().NoError(err)
68+
6069
s.clock = clockwork.NewRealClock()
6170
s.ethClientMock = &EthClientMock{}
6271
s.l1ContractMock = &L1ContractMock{}
63-
64-
s.storage, err = NewEventStorage(s.ctx, s.database, s.clock, nil, s.logger)
72+
s.storage, err = NewEventStorage(s.ctx, s.database, s.clock, s.storageMetrics, s.logger)
6573
s.Require().NoError(err, "failed to initialize event storage")
6674

6775
cfg := DefaultEventListenerConfig()
6876
cfg.PollInterval = time.Millisecond
6977
cfg.BridgeMessengerContractAddress = "0xDEADBEEF"
7078
cfg.EmitEventCapacity = 100 // do avoid event dropping
7179

72-
s.listener, err = NewEventListener(cfg, s.clock, s.ethClientMock, s.l1ContractMock, s.storage, s.logger)
80+
s.listener, err = NewEventListener(
81+
cfg,
82+
s.clock,
83+
s.ethClientMock,
84+
s.l1ContractMock,
85+
s.storage,
86+
s.listenerMetrics,
87+
s.logger,
88+
)
7389
s.Require().NoError(err, "failed to create listener")
7490
}
7591

nil/services/relayer/internal/l1/finality_ensurer.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type FinalityEnsurer struct {
7676
clock clockwork.Clock
7777
l1Storage *EventStorage
7878
l2Storage *l2.EventStorage
79+
metrics FinalityEnsurerMetrics
7980
eventProvider eventProvider
8081

8182
emitter chan struct{}
@@ -88,6 +89,7 @@ func NewFinalityEnsurer(
8889
logger logging.Logger,
8990
l1Storage *EventStorage,
9091
l2Storage *l2.EventStorage,
92+
metrics FinalityEnsurerMetrics,
9193
eventProvider eventProvider,
9294
) (*FinalityEnsurer, error) {
9395
err := config.Validate()
@@ -103,6 +105,7 @@ func NewFinalityEnsurer(
103105
l1Storage: l1Storage,
104106
l2Storage: l2Storage,
105107
eventProvider: eventProvider,
108+
metrics: metrics,
106109
emitter: make(chan struct{}, config.EventEmitterCapacity),
107110
}
108111

@@ -165,6 +168,12 @@ func (fe *FinalityEnsurer) blockFetcher(ctx context.Context) error {
165168
if fe.finalizedBlock != nil {
166169
log = log.Uint64("local_finalized_block_number", fe.finalizedBlock.BlockNumber)
167170
}
171+
172+
fe.metrics.SetTimeSinceFinalizedBlockNumberUpdate(
173+
ctx,
174+
uint64(now.Sub(lastSuccessfulUpdate).Seconds()),
175+
)
176+
168177
log.Msg("failed to fetch last finalized block number from Etherium")
169178
continue
170179
}
@@ -184,8 +193,7 @@ func (fe *FinalityEnsurer) blockFetcher(ctx context.Context) error {
184193
Msg("refreshed actual finalized block number")
185194
lastSuccessfulUpdate = now
186195

187-
// TODO(oclaw) metrics
188-
// we should alert somehow if we are unable to obtain actual finalized block number long enough
196+
fe.metrics.SetTimeSinceFinalizedBlockNumberUpdate(ctx, 0)
189197
}
190198
}
191199
}
@@ -205,6 +213,7 @@ func (fe *FinalityEnsurer) pendingEventPoller(ctx context.Context) error {
205213
}
206214
if err := fe.forwardFinalizedEvents(ctx); err != nil {
207215
fe.logger.Error().Err(err).Msg("failed to process l1 pending events")
216+
fe.metrics.AddRelayError(ctx)
208217
}
209218
}
210219
}
@@ -265,6 +274,11 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
265274
Int("orphaned_blocks_count", len(orphaned)).
266275
Msg("checked blocks finality")
267276

277+
var (
278+
finalizedEventCount int
279+
orphanedEventCount = len(events)
280+
)
281+
268282
if len(finalized) > 0 {
269283
var l2Events []*l2.Event
270284
for _, finblk := range finalized {
@@ -277,6 +291,9 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
277291
Int("finalized_event_count", len(l2Events)).
278292
Msg("saving messages to L2 event storage")
279293

294+
finalizedEventCount = len(l2Events)
295+
orphanedEventCount = len(events) - finalizedEventCount
296+
280297
err := fe.l2Storage.StoreEvents(ctx, l2Events)
281298
if ignoreErrors(err, storage.ErrKeyExists) != nil {
282299
return fmt.Errorf("failed to forward events to L2 storage: %w", err)
@@ -289,6 +306,9 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
289306
}
290307
}
291308

309+
fe.metrics.AddFinalizedEvents(ctx, uint64(finalizedEventCount))
310+
fe.metrics.AddOrphanedEvents(ctx, uint64(orphanedEventCount))
311+
292312
fe.logger.Info().
293313
Int("dropping_events_count", len(events)).
294314
Msg("dropping pending events from L1 storage")
@@ -302,7 +322,6 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
302322
return fmt.Errorf("failed to cleanup events from l1 storage: %w", err)
303323
}
304324

305-
// TODO(oclaw) metrics
306325
return nil
307326
}
308327

nil/services/relayer/internal/l1/finality_ensurer_test.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/NilFoundation/nil/nil/common/logging"
1111
"github.com/NilFoundation/nil/nil/internal/db"
1212
"github.com/NilFoundation/nil/nil/services/relayer/internal/l2"
13+
"github.com/NilFoundation/nil/nil/services/relayer/internal/storage"
1314
ethcommon "github.com/ethereum/go-ethereum/common"
1415
ethtypes "github.com/ethereum/go-ethereum/core/types"
1516
"github.com/ethereum/go-ethereum/rpc"
@@ -50,13 +51,15 @@ type FinalityEnsurerTestSuite struct {
5051
suite.Suite
5152

5253
// high level dependencies
53-
database db.DB
54-
l1Storage *EventStorage
55-
l2Storage *l2.EventStorage
56-
logger logging.Logger
54+
database db.DB
55+
l1Storage *EventStorage
56+
l2Storage *l2.EventStorage
57+
storageMetrics storage.TableMetrics
58+
logger logging.Logger
5759

5860
// testing entity
59-
ensurer *FinalityEnsurer
61+
ensurer *FinalityEnsurer
62+
ensurerMetrics FinalityEnsurerMetrics
6063

6164
// mocks
6265
ethClientMock *EthClientMock
@@ -86,6 +89,9 @@ func (s *FinalityEnsurerTestSuite) SetupTest() {
8689
s.database, err = db.NewBadgerDbInMemory()
8790
s.Require().NoError(err, "failed to initialize database")
8891

92+
s.storageMetrics, err = storage.NewTableMetrics()
93+
s.Require().NoError(err)
94+
8995
s.clockMock = clockwork.NewFakeClock()
9096

9197
s.ethClientMock = &EthClientMock{}
@@ -104,23 +110,27 @@ func (s *FinalityEnsurerTestSuite) SetupTest() {
104110
return nil, nil
105111
}
106112

107-
s.l1Storage, err = NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger)
113+
s.l1Storage, err = NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger)
108114
s.Require().NoError(err, "failed to initialize L1 storage")
109115

110-
s.l2Storage = l2.NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger)
116+
s.l2Storage = l2.NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger)
111117

112118
cfg := DefaultFinalityEnsurerConfig()
113119
cfg.EventEmitterCapacity = 100
114120

115121
s.eventListenerStub = newEventListenerStub()
116122

123+
s.ensurerMetrics, err = NewFinalityEnsurerMetrics()
124+
s.Require().NoError(err)
125+
117126
s.ensurer, err = NewFinalityEnsurer(
118127
cfg,
119128
s.ethClientMock,
120129
s.clockMock,
121130
s.logger,
122131
s.l1Storage,
123132
s.l2Storage,
133+
s.ensurerMetrics,
124134
s.eventListenerStub,
125135
)
126136
s.Require().NoError(err)

0 commit comments

Comments
 (0)