Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions nil/services/relayer/internal/l1/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type EventListener struct {
clock clockwork.Clock

config *EventListenerConfig
metrics EventListenerMetrics
eventStorage *EventStorage

state struct {
Expand All @@ -70,6 +71,7 @@ func NewEventListener(
ethClient EthClient,
contractClient L1Contract,
storage *EventStorage,
metrics EventListenerMetrics,
logger logging.Logger,
) (*EventListener, error) {
el := &EventListener{
Expand All @@ -78,6 +80,7 @@ func NewEventListener(
clock: clock,
config: config,
eventStorage: storage,
metrics: metrics,
}

el.state.emitter = make(chan struct{}, config.EmitEventCapacity)
Expand Down Expand Up @@ -161,8 +164,6 @@ func (el *EventListener) subscriber(ctx context.Context, eventCh chan<- *L1Messa
Err(err).
Msg("failed to subscribe to updates from L1 contract")
return err

// TODO(oclaw) metrics
}
defer sub.Unsubscribe()

Expand All @@ -181,7 +182,7 @@ func (el *EventListener) subscriber(ctx context.Context, eventCh chan<- *L1Messa
Err(err).
Msg("L1 subscription is broken")

// TODO(oclaw) metrics
el.metrics.AddSubscriptionError(ctx)
return fmt.Errorf("%w: %w", ErrSubscriptionIsBroken, err)
}
el.logger.Debug().Msg("subscription channel is closed")
Expand All @@ -197,6 +198,9 @@ func (el *EventListener) fetcher(ctx context.Context, eventCh chan<- *L1MessageS

el.logger.Info().Msg("started fetcher")

el.metrics.SetFetcherActive(ctx)
defer el.metrics.SetFetcherIdle(ctx)

// try fetching as long as possible, force exit after large enough attempt number
retrier := common.NewRetryRunner(
common.RetryConfig{
Expand All @@ -210,12 +214,9 @@ func (el *EventListener) fetcher(ctx context.Context, eventCh chan<- *L1MessageS
err := el.fetchPastEvents(ctx, eventCh)
if err != nil {
el.logger.Error().Err(err).Msg("historical event fetching failed")
// TODO (oclaw) metrics
}
return err
})

// TODO(oclaw) metrics (this routine is not expected to run for too long, we should now if something is stuck here)
}

func (el *EventListener) fetchPastEvents(ctx context.Context, eventCh chan<- *L1MessageSent) error {
Expand Down Expand Up @@ -286,6 +287,7 @@ func (el *EventListener) eventProcessor(
if err := el.processEvent(ctx, event); err != nil {
return err
}
el.metrics.AddEventFromFetcher(ctx)
processedOldEvents++
}

Expand All @@ -304,6 +306,7 @@ func (el *EventListener) eventProcessor(
if err := el.processEvent(ctx, event); err != nil {
return err
}
el.metrics.AddEventFromSubscriber(ctx)
case <-ctx.Done():
return ctx.Err()
}
Expand Down
32 changes: 24 additions & 8 deletions nil/services/relayer/internal/l1/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/internal/db"
"github.com/NilFoundation/nil/nil/services/relayer/internal/storage"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand All @@ -21,13 +22,15 @@ type EventListenerTestSuite struct {
suite.Suite

// high level dependencies
database db.DB
storage *EventStorage
logger logging.Logger
clock clockwork.Clock
database db.DB
storage *EventStorage
storageMetrics storage.TableMetrics
logger logging.Logger
clock clockwork.Clock

// testing entity
listener *EventListener
listener *EventListener
listenerMetrics EventListenerMetrics

// mocks
ethClientMock *EthClientMock
Expand Down Expand Up @@ -57,19 +60,32 @@ func (s *EventListenerTestSuite) SetupTest() {
s.database, err = db.NewBadgerDbInMemory()
s.Require().NoError(err, "failed to initialize test db")

s.listenerMetrics, err = NewEventListenerMetrics()
s.Require().NoError(err)

s.storageMetrics, err = storage.NewTableMetrics()
s.Require().NoError(err)

s.clock = clockwork.NewRealClock()
s.ethClientMock = &EthClientMock{}
s.l1ContractMock = &L1ContractMock{}

s.storage, err = NewEventStorage(s.ctx, s.database, s.clock, nil, s.logger)
s.storage, err = NewEventStorage(s.ctx, s.database, s.clock, s.storageMetrics, s.logger)
s.Require().NoError(err, "failed to initialize event storage")

cfg := DefaultEventListenerConfig()
cfg.PollInterval = time.Millisecond
cfg.BridgeMessengerContractAddress = "0xDEADBEEF"
cfg.EmitEventCapacity = 100 // do avoid event dropping

s.listener, err = NewEventListener(cfg, s.clock, s.ethClientMock, s.l1ContractMock, s.storage, s.logger)
s.listener, err = NewEventListener(
cfg,
s.clock,
s.ethClientMock,
s.l1ContractMock,
s.storage,
s.listenerMetrics,
s.logger,
)
s.Require().NoError(err, "failed to create listener")
}

Expand Down
25 changes: 22 additions & 3 deletions nil/services/relayer/internal/l1/finality_ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type FinalityEnsurer struct {
clock clockwork.Clock
l1Storage *EventStorage
l2Storage *l2.EventStorage
metrics FinalityEnsurerMetrics
eventProvider eventProvider

emitter chan struct{}
Expand All @@ -88,6 +89,7 @@ func NewFinalityEnsurer(
logger logging.Logger,
l1Storage *EventStorage,
l2Storage *l2.EventStorage,
metrics FinalityEnsurerMetrics,
eventProvider eventProvider,
) (*FinalityEnsurer, error) {
err := config.Validate()
Expand All @@ -103,6 +105,7 @@ func NewFinalityEnsurer(
l1Storage: l1Storage,
l2Storage: l2Storage,
eventProvider: eventProvider,
metrics: metrics,
emitter: make(chan struct{}, config.EventEmitterCapacity),
}

Expand Down Expand Up @@ -165,6 +168,12 @@ func (fe *FinalityEnsurer) blockFetcher(ctx context.Context) error {
if fe.finalizedBlock != nil {
log = log.Uint64("local_finalized_block_number", fe.finalizedBlock.BlockNumber)
}

fe.metrics.SetTimeSinceFinalizedBlockNumberUpdate(
ctx,
uint64(now.Sub(lastSuccessfulUpdate).Seconds()),
)

log.Msg("failed to fetch last finalized block number from Etherium")
continue
}
Expand All @@ -184,8 +193,7 @@ func (fe *FinalityEnsurer) blockFetcher(ctx context.Context) error {
Msg("refreshed actual finalized block number")
lastSuccessfulUpdate = now

// TODO(oclaw) metrics
// we should alert somehow if we are unable to obtain actual finalized block number long enough
fe.metrics.SetTimeSinceFinalizedBlockNumberUpdate(ctx, 0)
}
}
}
Expand All @@ -205,6 +213,7 @@ func (fe *FinalityEnsurer) pendingEventPoller(ctx context.Context) error {
}
if err := fe.forwardFinalizedEvents(ctx); err != nil {
fe.logger.Error().Err(err).Msg("failed to process l1 pending events")
fe.metrics.AddRelayError(ctx)
}
}
}
Expand Down Expand Up @@ -265,6 +274,11 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
Int("orphaned_blocks_count", len(orphaned)).
Msg("checked blocks finality")

var (
finalizedEventCount int
orphanedEventCount = len(events)
)

if len(finalized) > 0 {
var l2Events []*l2.Event
for _, finblk := range finalized {
Expand All @@ -277,6 +291,9 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
Int("finalized_event_count", len(l2Events)).
Msg("saving messages to L2 event storage")

finalizedEventCount = len(l2Events)
orphanedEventCount = len(events) - finalizedEventCount

err := fe.l2Storage.StoreEvents(ctx, l2Events)
if ignoreErrors(err, storage.ErrKeyExists) != nil {
return fmt.Errorf("failed to forward events to L2 storage: %w", err)
Expand All @@ -289,6 +306,9 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
}
}

fe.metrics.AddFinalizedEvents(ctx, uint64(finalizedEventCount))
fe.metrics.AddOrphanedEvents(ctx, uint64(orphanedEventCount))

fe.logger.Info().
Int("dropping_events_count", len(events)).
Msg("dropping pending events from L1 storage")
Expand All @@ -302,7 +322,6 @@ func (fe *FinalityEnsurer) forwardFinalizedEvents(ctx context.Context) error {
return fmt.Errorf("failed to cleanup events from l1 storage: %w", err)
}

// TODO(oclaw) metrics
return nil
}

Expand Down
24 changes: 17 additions & 7 deletions nil/services/relayer/internal/l1/finality_ensurer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/NilFoundation/nil/nil/common/logging"
"github.com/NilFoundation/nil/nil/internal/db"
"github.com/NilFoundation/nil/nil/services/relayer/internal/l2"
"github.com/NilFoundation/nil/nil/services/relayer/internal/storage"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -50,13 +51,15 @@ type FinalityEnsurerTestSuite struct {
suite.Suite

// high level dependencies
database db.DB
l1Storage *EventStorage
l2Storage *l2.EventStorage
logger logging.Logger
database db.DB
l1Storage *EventStorage
l2Storage *l2.EventStorage
storageMetrics storage.TableMetrics
logger logging.Logger

// testing entity
ensurer *FinalityEnsurer
ensurer *FinalityEnsurer
ensurerMetrics FinalityEnsurerMetrics

// mocks
ethClientMock *EthClientMock
Expand Down Expand Up @@ -86,6 +89,9 @@ func (s *FinalityEnsurerTestSuite) SetupTest() {
s.database, err = db.NewBadgerDbInMemory()
s.Require().NoError(err, "failed to initialize database")

s.storageMetrics, err = storage.NewTableMetrics()
s.Require().NoError(err)

s.clockMock = clockwork.NewFakeClock()

s.ethClientMock = &EthClientMock{}
Expand All @@ -104,23 +110,27 @@ func (s *FinalityEnsurerTestSuite) SetupTest() {
return nil, nil
}

s.l1Storage, err = NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger)
s.l1Storage, err = NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger)
s.Require().NoError(err, "failed to initialize L1 storage")

s.l2Storage = l2.NewEventStorage(s.ctx, s.database, s.clockMock, nil, s.logger)
s.l2Storage = l2.NewEventStorage(s.ctx, s.database, s.clockMock, s.storageMetrics, s.logger)

cfg := DefaultFinalityEnsurerConfig()
cfg.EventEmitterCapacity = 100

s.eventListenerStub = newEventListenerStub()

s.ensurerMetrics, err = NewFinalityEnsurerMetrics()
s.Require().NoError(err)

s.ensurer, err = NewFinalityEnsurer(
cfg,
s.ethClientMock,
s.clockMock,
s.logger,
s.l1Storage,
s.l2Storage,
s.ensurerMetrics,
s.eventListenerStub,
)
s.Require().NoError(err)
Expand Down
Loading