Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions crates/rpc/src/base/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde::Serialize;
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use tracing::error;

use crate::ExtendedSubscriptionKind;
use crate::{BaseSubscriptionKind, ExtendedSubscriptionKind};

/// Eth pub-sub RPC extension for flashblocks and standard subscriptions.
///
Expand Down Expand Up @@ -138,17 +138,21 @@ where
}

// Handle flashblocks-specific subscriptions
let ExtendedSubscriptionKind::Base(base_kind) = kind else {
unreachable!("Standard subscription types should be delegated to inner");
};

let sink = pending.accept().await?;

match kind {
ExtendedSubscriptionKind::NewFlashblocks => {
match base_kind {
BaseSubscriptionKind::NewFlashblocks => {
let stream = Self::new_flashblocks_stream(Arc::clone(&self.flashblocks_state));

tokio::spawn(async move {
pipe_from_stream(sink, stream).await;
});
}
ExtendedSubscriptionKind::PendingLogs => {
BaseSubscriptionKind::PendingLogs => {
// Extract filter from params, default to empty filter (match all)
let filter = match params {
Some(Params::Logs(filter)) => *filter,
Expand All @@ -161,8 +165,6 @@ where
pipe_from_stream(sink, stream).await;
});
}
// Standard types are handled above, this branch is unreachable
_ => unreachable!("Standard subscription types should be delegated to inner"),
}

Ok(())
Expand Down
55 changes: 36 additions & 19 deletions crates/rpc/src/base/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,36 @@ pub struct TransactionStatusResponse {
/// Extended subscription kind that includes both standard Ethereum subscription types
/// and flashblocks-specific types.
///
/// This enum wraps the standard `SubscriptionKind` from alloy and adds flashblocks support,
/// allowing `eth_subscribe` to handle both standard subscriptions (newHeads, logs, etc.)
/// This enum encapsulates the standard [`SubscriptionKind`] from alloy and adds flashblocks
/// support, allowing `eth_subscribe` to handle both standard subscriptions (newHeads, logs, etc.)
/// and custom flashblocks subscriptions.
///
/// By encapsulating [`SubscriptionKind`] rather than redefining its variants, we automatically
/// inherit support for any new variants added upstream, or get a compile error if the signature
/// changes.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(untagged)]
pub enum ExtendedSubscriptionKind {
/// New block headers subscription (standard).
NewHeads,
/// Logs subscription (standard).
Logs,
/// New pending transactions subscription (standard).
NewPendingTransactions,
/// Node syncing status subscription (standard).
Syncing,
/// New flashblocks subscription (Base-specific).
/// Standard Ethereum subscription types (newHeads, logs, newPendingTransactions, syncing).
///
/// These are proxied to reth's underlying `EthPubSub` implementation.
Standard(SubscriptionKind),
/// Base-specific subscription types for flashblocks.
Base(BaseSubscriptionKind),
}

/// Base-specific subscription types for flashblocks.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BaseSubscriptionKind {
/// New flashblocks subscription.
///
/// Fires a notification each time a new flashblock is processed, providing the current
/// pending block state. Each flashblock represents an incremental update to the pending
/// block, so multiple notifications may be emitted for the same block height as new
/// flashblocks arrive.
NewFlashblocks,
/// Pending logs subscription (Base-specific).
/// Pending logs subscription.
///
/// Returns logs from flashblocks pending state that match the given filter criteria.
/// Unlike standard `logs` subscription which only includes logs from confirmed blocks,
Expand All @@ -55,16 +63,25 @@ impl ExtendedSubscriptionKind {
/// Returns the standard subscription kind if this is a standard subscription type.
pub const fn as_standard(&self) -> Option<SubscriptionKind> {
match self {
Self::NewHeads => Some(SubscriptionKind::NewHeads),
Self::Logs => Some(SubscriptionKind::Logs),
Self::NewPendingTransactions => Some(SubscriptionKind::NewPendingTransactions),
Self::Syncing => Some(SubscriptionKind::Syncing),
Self::NewFlashblocks | Self::PendingLogs => None,
Self::Standard(kind) => Some(*kind),
Self::Base(_) => None,
}
}

/// Returns true if this is a flashblocks-specific subscription.
pub const fn is_flashblocks(&self) -> bool {
matches!(self, Self::NewFlashblocks | Self::PendingLogs)
matches!(self, Self::Base(_))
}
}

impl From<SubscriptionKind> for ExtendedSubscriptionKind {
fn from(kind: SubscriptionKind) -> Self {
Self::Standard(kind)
}
}

impl From<BaseSubscriptionKind> for ExtendedSubscriptionKind {
fn from(kind: BaseSubscriptionKind) -> Self {
Self::Base(kind)
}
}
2 changes: 1 addition & 1 deletion crates/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use base::{
pubsub::{EthPubSub, EthPubSubApiServer},
traits::{MeteringApiServer, TransactionStatusApiServer},
transaction_rpc::TransactionStatusApiImpl,
types::{ExtendedSubscriptionKind, Status, TransactionStatusResponse},
types::{BaseSubscriptionKind, ExtendedSubscriptionKind, Status, TransactionStatusResponse},
};

mod eth;
Expand Down