Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/relayer/proxy/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,35 @@ func (server *relayMinerHTTPServer) Ping(ctx context.Context) error {
func (server *relayMinerHTTPServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()

// Create a request-specific logger to avoid concurrent access issues
logger := server.logger.With(
"request_id", request.Header.Get("X-Request-ID"),
"user_agent", request.Header.Get("User-Agent"),
"remote_addr", request.RemoteAddr,
)

// Determine whether the request is upgrading to websocket.
if isWebSocketRequest(request) {
server.logger.ProbabilisticDebugInfo(relayProbabilisticDebugProb).Msg("🔍 detected asynchronous relay request")
logger.ProbabilisticDebugInfo(relayProbabilisticDebugProb).Msg("🔍 detected asynchronous relay request")

if err := server.handleAsyncConnection(ctx, writer, request); err != nil {
// Reply with an error if the relay could not be served.
server.replyWithError(err, nil, writer)
server.logger.Warn().Err(err).Msg("❌ failed serving asynchronous relay request")
logger.Warn().Err(err).Msg("❌ failed serving asynchronous relay request")
return
}
} else {
server.logger.ProbabilisticDebugInfo(relayProbabilisticDebugProb).Msg("🔍 detected synchronous relay request")
logger.ProbabilisticDebugInfo(relayProbabilisticDebugProb).Msg("🔍 detected synchronous relay request")

if relayRequest, err := server.serveSyncRequest(ctx, writer, request); err != nil {
// Reply with an error if the relay could not be served.
server.replyWithError(err, relayRequest, writer)

// Do not alarm the RelayMiner operator if the error is a client error
if ErrRelayerProxyInternalError.Is(err) {
server.logger.Error().Err(err).Msgf("❌ Failed serving synchronous relay request. This COULD be a configuration issue on the RelayMiner! Please check your setup. ⚙️🛠️")
logger.Error().Err(err).Msgf("❌ Failed serving synchronous relay request. This COULD be a configuration issue on the RelayMiner! Please check your setup. ⚙️🛠️")
} else {
server.logger.Error().Err(err).Msgf("⚠️ Failed serving synchronous relay request. This MIGHT be a client error.")
logger.Error().Err(err).Msgf("⚠️ Failed serving synchronous relay request. This MIGHT be a client error.")
}
return
}
Expand Down
32 changes: 22 additions & 10 deletions pkg/relayer/proxy/relay_meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,17 @@ func (rmtr *ProxyRelayMeter) IsOverServicing(
rmtr.relayMeterMu.Lock()
defer rmtr.relayMeterMu.Unlock()

// Create a context-specific logger to avoid concurrent access issues
logger := rmtr.logger.With(
"method", "IsOverServicing",
"session_id", reqMeta.GetSessionHeader().GetSessionId(),
)

// Ensure that the served application has a relay meter and update the consumed
// stake amount.
appRelayMeter, err := rmtr.ensureRequestSessionRelayMeter(ctx, reqMeta)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to set up relay meter in session %s. Relay will continue without rate limiting: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -127,7 +133,7 @@ func (rmtr *ProxyRelayMeter) IsOverServicing(

sharedParams, err := rmtr.sharedQuerier.GetParams(ctx)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to set up relay meter in session %s. Relay will continue without rate limiting: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -137,7 +143,7 @@ func (rmtr *ProxyRelayMeter) IsOverServicing(

service, err := rmtr.serviceQuerier.GetService(ctx, reqMeta.SessionHeader.ServiceId)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to set up relay meter in session %s. Relay will continue without rate limiting: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -148,7 +154,7 @@ func (rmtr *ProxyRelayMeter) IsOverServicing(
// Get the cost of the relay based on the service and shared parameters.
relayCostCoin, err := getSingleRelayCostCoin(sharedParams, &service)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to calculate relay cost in session %s. Relay will continue without rate limiting: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -168,7 +174,7 @@ func (rmtr *ProxyRelayMeter) IsOverServicing(

// Exponential backoff, only log over-servicing when numOverServicedRelays is a power of 2
if shouldLogOverServicing(appRelayMeter.numOverServicedRelays) {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"overservicing enabled, application %q over-serviced %s",
appRelayMeter.app.GetAddress(),
appRelayMeter.numOverServicedRelays,
Expand All @@ -186,9 +192,15 @@ func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, re
rmtr.relayMeterMu.Lock()
defer rmtr.relayMeterMu.Unlock()

// Create a context-specific logger to avoid concurrent access issues
logger := rmtr.logger.With(
"method", "SetNonApplicableRelayReward",
"session_id", reqMeta.GetSessionHeader().GetSessionId(),
)

sessionRelayMeter, ok := rmtr.sessionToRelayMeterMap[reqMeta.GetSessionHeader().GetSessionId()]
if !ok {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to find session relay meter for session %s. Application may be rate limited more than intended: %v",
reqMeta.GetSessionHeader().GetSessionId(),
ErrRelayerProxyUnknownSession.Wrap("session relay meter not found"),
Expand All @@ -199,7 +211,7 @@ func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, re

sharedParams, err := rmtr.sharedQuerier.GetParams(ctx)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to set up relay meter in session %s. Relay will continue without rate limiting: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -209,7 +221,7 @@ func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, re

service, err := rmtr.serviceQuerier.GetService(ctx, reqMeta.SessionHeader.ServiceId)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to set up relay meter in session %s. Relay will continue without rate limiting: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -220,7 +232,7 @@ func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, re
// Get the cost of the relay based on the service and shared parameters.
relayCost, err := getSingleRelayCostCoin(sharedParams, &service)
if err != nil {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"[Non critical] Unable to calculate relay cost in session %s. Application may be rate limited more than intended: %v",
reqMeta.GetSessionHeader().GetSessionId(),
err,
Expand All @@ -230,7 +242,7 @@ func (rmtr *ProxyRelayMeter) SetNonApplicableRelayReward(ctx context.Context, re
// TODO_FOLLOWUP(@red-0ne): Consider fixing the relay meter logic to never have
// a less than relay cost consumed amount.
if sessionRelayMeter.consumedCoin.IsLT(relayCost) {
rmtr.logger.Warn().Msgf(
logger.Warn().Msgf(
"(SHOULD NEVER HAPPEN) Your session earned less than the cost of a single relay. Not submitting a claim for application (%s), service id: (%s), session id: (%s), with consumed amount: (%s), relay cost: (%s)",
sessionRelayMeter.app.GetAddress(),
sessionRelayMeter.sessionHeader.GetServiceId(),
Expand Down
31 changes: 23 additions & 8 deletions pkg/relayer/relayminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ func (rel *relayMiner) ServeMetrics(addr string) error {

// If no error, start the server in a new goroutine
go func() {
rel.logger.Info().Str("endpoint", addr).Msg("serving metrics")
// Create a context-specific logger to avoid concurrent access issues
logger := rel.logger.With("service", "metrics", "endpoint", addr)
logger.Info().Msg("serving metrics")
if err := http.Serve(ln, promhttp.Handler()); err != nil {
rel.logger.Error().Err(err).Msg("metrics server failed")
logger.Error().Err(err).Msg("metrics server failed")
return
}
}()
Expand All @@ -124,15 +126,19 @@ func (rel *relayMiner) ServePprof(ctx context.Context, addr string) error {
}
// If no error, start the server in a new goroutine
go func() {
rel.logger.Info().Str("endpoint", addr).Msg("starting a pprof endpoint")
// Create a context-specific logger to avoid concurrent access issues
logger := rel.logger.With("service", "pprof", "endpoint", addr)
logger.Info().Msg("starting a pprof endpoint")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
rel.logger.Error().Str("endpoint", addr).Msg("unable to start a pprof endpoint")
logger.Error().Msg("unable to start a pprof endpoint")
}
}()

go func() {
<-ctx.Done()
rel.logger.Info().Str("endpoint", addr).Msg("stopping a pprof endpoint")
// Create a context-specific logger to avoid concurrent access issues
logger := rel.logger.With("service", "pprof", "endpoint", addr)
logger.Info().Msg("stopping a pprof endpoint")
_ = server.Shutdown(ctx)
}()

Expand All @@ -153,14 +159,18 @@ func (rel *relayMiner) ServePing(ctx context.Context, network, addr string) erro
// - Handles ping requests by broadcasting health checks to all backing services
// - Tests connectivity to all configured data nodes
go func() {
// Create a context-specific logger to avoid concurrent access issues
logger := rel.logger.With("service", "ping", "endpoint", addr)
if err := http.Serve(ln, rel.newPingHandlerFn(ctx)); err != nil && !errors.Is(err, http.ErrServerClosed) {
rel.logger.Error().Err(err).Msg("ping server unexpectedly closed")
logger.Error().Err(err).Msg("ping server unexpectedly closed")
}
}()

go func() {
<-ctx.Done() // A message a receive when we stop the relayminer.
rel.logger.Info().Str("endpoint", addr).Msg("stopping ping server")
// Create a context-specific logger to avoid concurrent access issues
logger := rel.logger.With("service", "ping", "endpoint", addr)
logger.Info().Msg("stopping ping server")
_ = ln.Close()
}()

Expand All @@ -169,7 +179,12 @@ func (rel *relayMiner) ServePing(ctx context.Context, network, addr string) erro

func (rel *relayMiner) newPingHandlerFn(ctx context.Context) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
rel.logger.Debug().Msg("pinging relay servers...")
// Create a request-specific logger to avoid concurrent access issues
logger := rel.logger.With(
"handler", "ping",
"remote_addr", req.RemoteAddr,
)
logger.Debug().Msg("pinging relay servers...")

if err := rel.relayerProxy.PingAll(ctx); err != nil {
var urlError *url.Error
Expand Down
4 changes: 3 additions & 1 deletion pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,9 @@ func (rs *relayerSessionsManager) deleteSessionTrees(
numSessionTreesDeleted := 0
for _, sessionTree := range sessionTrees {
sessionId := sessionTree.GetSessionHeader().GetSessionId()
logger.Info().Str("session_id", sessionId).Msg("🗑️ Deleting session tree - cleaning up outdated or unclaimable session")
// Create a new logger instance for each iteration to avoid concurrent access issues
sessionLogger := logger.With("session_id", sessionId)
sessionLogger.Info().Msg("🗑️ Deleting session tree - cleaning up outdated or unclaimable session")

// Remove the session tree from the relayerSessions.
rs.deleteSessionTree(sessionTree)
Expand Down
9 changes: 7 additions & 2 deletions testutil/testrelayer/relays.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,20 @@ func NewSignedRandRelay(
_, err := rand.Read(randBz) //nolint:staticcheck // Using rand.Read in tests as a deterministic pseudo-random source is okay.
require.NoError(t, err)

// Prepare an empty relay with the given req & res headers.
relay := NewEmptyRelay(reqHeader, resHeader, supplierOperatorAddr)
// Populate the relay request and response payloads with random data.
relay.Req.Payload = randBz
relay.Res.Payload = randBz
SignRelayRequest(ctx, t, relay, reqHeader.GetApplicationAddress(), keyRing, ringClient)
SignRelayResponse(ctx, t, relay, supplierOperatorKeyUid, supplierOperatorAddr, keyRing)

// Update the relay response with the payload hash
err = relay.Res.UpdatePayloadHash()
require.NoError(t, err)

// Sign both the request and response.
SignRelayRequest(ctx, t, relay, reqHeader.GetApplicationAddress(), keyRing, ringClient)
SignRelayResponse(ctx, t, relay, supplierOperatorKeyUid, supplierOperatorAddr, keyRing)

return relay
}

Expand Down
8 changes: 8 additions & 0 deletions testutil/testtree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func FillSessionTree(
keyRing,
ringClient,
)

// TODO(v0.1.26): Remove this if structure once all actors (miners, validators, etc.)
// update to the version of the protocol that enforce the payload hash
// and a nil payload.
if len(relay.Res.PayloadHash) > 0 {
relay.Res.Payload = nil
}

relayBz, err := relay.Marshal()
require.NoError(t, err)

Expand Down
Loading