From c677d4c71d580f27a33865d28fc96f0fce9e6576 Mon Sep 17 00:00:00 2001 From: Andreas Bigger Date: Fri, 12 Dec 2025 14:50:17 -0500 Subject: [PATCH] chore(rpc): subscription kind --- crates/rpc/src/base/pubsub.rs | 14 +++++---- crates/rpc/src/base/types.rs | 55 +++++++++++++++++++++++------------ crates/rpc/src/lib.rs | 2 +- 3 files changed, 45 insertions(+), 26 deletions(-) diff --git a/crates/rpc/src/base/pubsub.rs b/crates/rpc/src/base/pubsub.rs index 3f1beb06..73eb41b0 100644 --- a/crates/rpc/src/base/pubsub.rs +++ b/crates/rpc/src/base/pubsub.rs @@ -24,7 +24,7 @@ use serde::Serialize; use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream}; use tracing::error; -use crate::ExtendedSubscriptionKind; +use crate::{BaseSubscriptionKind, ExtendedSubscriptionKind}; /// Eth pub-sub RPC extension for flashblocks and standard subscriptions. /// @@ -138,17 +138,21 @@ where } // Handle flashblocks-specific subscriptions + let ExtendedSubscriptionKind::Base(base_kind) = kind else { + unreachable!("Standard subscription types should be delegated to inner"); + }; + let sink = pending.accept().await?; - match kind { - ExtendedSubscriptionKind::NewFlashblocks => { + match base_kind { + BaseSubscriptionKind::NewFlashblocks => { let stream = Self::new_flashblocks_stream(Arc::clone(&self.flashblocks_state)); tokio::spawn(async move { pipe_from_stream(sink, stream).await; }); } - ExtendedSubscriptionKind::PendingLogs => { + BaseSubscriptionKind::PendingLogs => { // Extract filter from params, default to empty filter (match all) let filter = match params { Some(Params::Logs(filter)) => *filter, @@ -161,8 +165,6 @@ where pipe_from_stream(sink, stream).await; }); } - // Standard types are handled above, this branch is unreachable - _ => unreachable!("Standard subscription types should be delegated to inner"), } Ok(()) diff --git a/crates/rpc/src/base/types.rs b/crates/rpc/src/base/types.rs index 37a9e8e0..52e40ad0 100644 --- a/crates/rpc/src/base/types.rs +++ b/crates/rpc/src/base/types.rs @@ -22,28 +22,36 @@ pub struct TransactionStatusResponse { /// Extended subscription kind that includes both standard Ethereum subscription types /// and flashblocks-specific types. /// -/// This enum wraps the standard `SubscriptionKind` from alloy and adds flashblocks support, -/// allowing `eth_subscribe` to handle both standard subscriptions (newHeads, logs, etc.) +/// This enum encapsulates the standard [`SubscriptionKind`] from alloy and adds flashblocks +/// support, allowing `eth_subscribe` to handle both standard subscriptions (newHeads, logs, etc.) /// and custom flashblocks subscriptions. +/// +/// By encapsulating [`SubscriptionKind`] rather than redefining its variants, we automatically +/// inherit support for any new variants added upstream, or get a compile error if the signature +/// changes. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +#[serde(untagged)] pub enum ExtendedSubscriptionKind { - /// New block headers subscription (standard). - NewHeads, - /// Logs subscription (standard). - Logs, - /// New pending transactions subscription (standard). - NewPendingTransactions, - /// Node syncing status subscription (standard). - Syncing, - /// New flashblocks subscription (Base-specific). + /// Standard Ethereum subscription types (newHeads, logs, newPendingTransactions, syncing). + /// + /// These are proxied to reth's underlying `EthPubSub` implementation. + Standard(SubscriptionKind), + /// Base-specific subscription types for flashblocks. + Base(BaseSubscriptionKind), +} + +/// Base-specific subscription types for flashblocks. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BaseSubscriptionKind { + /// New flashblocks subscription. /// /// Fires a notification each time a new flashblock is processed, providing the current /// pending block state. Each flashblock represents an incremental update to the pending /// block, so multiple notifications may be emitted for the same block height as new /// flashblocks arrive. NewFlashblocks, - /// Pending logs subscription (Base-specific). + /// Pending logs subscription. /// /// Returns logs from flashblocks pending state that match the given filter criteria. /// Unlike standard `logs` subscription which only includes logs from confirmed blocks, @@ -55,16 +63,25 @@ impl ExtendedSubscriptionKind { /// Returns the standard subscription kind if this is a standard subscription type. pub const fn as_standard(&self) -> Option { match self { - Self::NewHeads => Some(SubscriptionKind::NewHeads), - Self::Logs => Some(SubscriptionKind::Logs), - Self::NewPendingTransactions => Some(SubscriptionKind::NewPendingTransactions), - Self::Syncing => Some(SubscriptionKind::Syncing), - Self::NewFlashblocks | Self::PendingLogs => None, + Self::Standard(kind) => Some(*kind), + Self::Base(_) => None, } } /// Returns true if this is a flashblocks-specific subscription. pub const fn is_flashblocks(&self) -> bool { - matches!(self, Self::NewFlashblocks | Self::PendingLogs) + matches!(self, Self::Base(_)) + } +} + +impl From for ExtendedSubscriptionKind { + fn from(kind: SubscriptionKind) -> Self { + Self::Standard(kind) + } +} + +impl From for ExtendedSubscriptionKind { + fn from(kind: BaseSubscriptionKind) -> Self { + Self::Base(kind) } } diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 8243b30f..5a5db0a0 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -13,7 +13,7 @@ pub use base::{ pubsub::{EthPubSub, EthPubSubApiServer}, traits::{MeteringApiServer, TransactionStatusApiServer}, transaction_rpc::TransactionStatusApiImpl, - types::{ExtendedSubscriptionKind, Status, TransactionStatusResponse}, + types::{BaseSubscriptionKind, ExtendedSubscriptionKind, Status, TransactionStatusResponse}, }; mod eth;