Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ base-tracex.workspace = true
# reth
reth.workspace = true
reth-db.workspace = true
reth-exex.workspace = true
reth-optimism-node.workspace = true
reth-optimism-chainspec.workspace = true

# misc
eyre.workspace = true
futures-util.workspace = true
once_cell.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true
derive_more = { workspace = true, features = ["debug"] }
69 changes: 43 additions & 26 deletions crates/runner/src/extensions/canon.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
//! Contains the [FlashblocksCanonExtension] which wires up the `flashblocks-canon`
//! execution extension on the Base node builder.
//! Contains the [FlashblocksCanonExtension] which wires up the canonical block
//! subscription for flashblocks on the Base node builder.

use std::sync::Arc;

use base_reth_flashblocks::FlashblocksState;
use futures_util::TryStreamExt;
use reth_exex::ExExEvent;
use reth::providers::CanonStateSubscriptions;
use tokio::sync::broadcast::error::RecvError;
use tracing::{info, warn};

use crate::{
BaseNodeConfig, FlashblocksConfig,
extensions::{BaseNodeExtension, ConfigurableBaseNodeExtension, FlashblocksCell, OpBuilder},
};

/// Helper struct that wires the Flashblocks canon ExEx into the node builder.
/// Helper struct that wires the Flashblocks canonical subscription into the node builder.
#[derive(Debug, Clone)]
pub struct FlashblocksCanonExtension {
/// Shared Flashblocks state cache.
Expand All @@ -35,34 +36,50 @@ impl BaseNodeExtension for FlashblocksCanonExtension {
let flashblocks_enabled = flashblocks.is_some();
let flashblocks_cell = self.cell;

builder.install_exex_if(flashblocks_enabled, "flashblocks-canon", move |mut ctx| {
let flashblocks_cell = flashblocks_cell.clone();
async move {
let fb_config =
flashblocks.as_ref().expect("flashblocks config checked above").clone();
let fb = flashblocks_cell
.get_or_init(|| {
Arc::new(FlashblocksState::new(
ctx.provider().clone(),
fb_config.max_pending_blocks_depth,
))
})
.clone();
if !flashblocks_enabled {
return builder;
}

Ok(async move {
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let tip = committed.tip().num_hash();
builder.extend_rpc_modules(move |ctx| {
let fb_config = flashblocks.as_ref().expect("checked above").clone();

let fb = flashblocks_cell
.get_or_init(|| {
Arc::new(FlashblocksState::new(
ctx.provider().clone(),
fb_config.max_pending_blocks_depth,
))
})
.clone();

// Subscribe to canonical state notifications
info!(message = "Starting flashblocks canonical subscription");
let mut receiver = ctx.provider().subscribe_to_canonical_state();
tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(notification) => {
let committed = notification.committed();
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(tip));
}
Err(RecvError::Lagged(n)) => {
warn!(
message = "Canonical subscription lagged",
missed_notifications = n
);
}
Err(RecvError::Closed) => {
warn!(message = "Canonical state subscription ended");
break;
}
}
Ok(())
})
}
}
});

Ok(())
})
}
}
Expand Down
16 changes: 11 additions & 5 deletions crates/runner/src/extensions/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! Contains the [TransactionTracingExtension] which wires up the `tracex`
//! execution extension on the Base node builder.
//! subscription on the Base node builder.

use base_tracex::tracex_exex;
use base_tracex::start_tracex;

use crate::{
BaseNodeConfig, TracingConfig,
extensions::{BaseNodeExtension, ConfigurableBaseNodeExtension, OpBuilder},
};

/// Helper struct that wires the transaction tracing ExEx into the node builder.
/// Helper struct that wires the transaction tracing subscription into the node builder.
#[derive(Debug, Clone, Copy)]
pub struct TransactionTracingExtension {
/// Transaction tracing configuration flags.
Expand All @@ -26,8 +26,14 @@ impl BaseNodeExtension for TransactionTracingExtension {
/// Applies the extension to the supplied builder.
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
let tracing = self.config;
builder.install_exex_if(tracing.enabled, "tracex", move |ctx| async move {
Ok(tracex_exex(ctx, tracing.logs_enabled))

if !tracing.enabled {
return builder;
}

builder.extend_rpc_modules(move |ctx| {
start_tracex(ctx.pool().clone(), ctx.provider().clone(), tracing.logs_enabled);
Ok(())
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ reth-primitives-traits.workspace = true
reth-db.workspace = true
reth-e2e-test-utils.workspace = true
reth-node-core.workspace = true
reth-exex.workspace = true
reth-tracing.workspace = true
reth-rpc-layer.workspace = true
reth-ipc.workspace = true
Expand Down Expand Up @@ -65,6 +64,7 @@ jsonrpsee.workspace = true

# misc
derive_more = { workspace = true, features = ["deref"] }
tracing.workspace = true
tracing-subscriber.workspace = true
serde_json.workspace = true
eyre.workspace = true
Expand Down
137 changes: 70 additions & 67 deletions crates/test-utils/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ use reth_db::{
test_utils::{ERROR_DB_CREATION, TempDatabase, tempdir_path},
};
use reth_e2e_test_utils::{Adapter, TmpDB};
use reth_exex::ExExEvent;
use reth_node_core::{
args::DatadirArgs,
dirs::{DataDirPath, MaybePlatformPath},
};
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_node::{OpNode, args::RollupArgs};
use reth_provider::{CanonStateSubscriptions, providers::BlockchainProvider};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tokio::sync::{broadcast::error::RecvError, mpsc, oneshot};
use tracing::{info, warn};

use crate::engine::EngineApi;

Expand Down Expand Up @@ -129,76 +128,80 @@ impl FlashblocksNodeExtensions {
let receiver = self.inner.receiver.clone();
let process_canonical = self.inner.process_canonical;

let fb_cell_for_exex = fb_cell.clone();

builder
.install_exex("flashblocks-canon", move |mut ctx| {
let fb_cell = fb_cell_for_exex.clone();
let process_canonical = process_canonical;
async move {
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);
Ok(async move {
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let hash = committed.tip().num_hash();
if process_canonical {
// Many suites drive canonical updates manually to reproduce race conditions, so
// allowing this to be disabled keeps canonical replay deterministic.
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
builder.extend_rpc_modules(move |ctx| {
let fb_cell = fb_cell.clone();
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);

// Subscribe to canonical state for flashblocks if enabled
if process_canonical {
info!(message = "Starting flashblocks canonical subscription");
let fb_for_canon = fb.clone();
let mut receiver = provider.subscribe_to_canonical_state();
tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(notification) => {
let committed = notification.committed();
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb_for_canon.on_canonical_block_received(block);
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
}
Err(RecvError::Lagged(n)) => {
warn!(
message = "Canonical subscription lagged",
missed_notifications = n
);
}
Err(RecvError::Closed) => {
warn!(message = "Canonical state subscription ended");
break;
}
}
Ok(())
})
}
})
.extend_rpc_modules(move |ctx| {
let fb_cell = fb_cell.clone();
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);

let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
ctx.provider().subscribe_to_canonical_state(),
);
tokio::spawn(async move {
use tokio_stream::StreamExt;
while let Some(Ok(notification)) = canon_stream.next().await {
provider.canonical_in_memory_state().notify_canon_state(notification);
}
});
let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
}
});
}

// Keep existing subscription that forwards to provider's in-memory state
let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
ctx.provider().subscribe_to_canonical_state(),
);
tokio::spawn(async move {
use tokio_stream::StreamExt;
while let Some(Ok(notification)) = canon_stream.next().await {
provider.canonical_in_memory_state().notify_canon_state(notification);
}
});

let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
}
});

Ok(())
})
Ok(())
})
}

fn wrap_launcher<L, LRet>(&self, launcher: L) -> impl FnOnce(OpBuilder) -> LRet
Expand Down
2 changes: 0 additions & 2 deletions crates/tracex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ workspace = true
[dependencies]
# reth
reth.workspace = true
reth-exex.workspace = true
reth-tracing.workspace = true

# alloy
Expand All @@ -26,7 +25,6 @@ futures.workspace = true

# misc
lru.workspace = true
eyre.workspace = true
chrono.workspace = true
metrics.workspace = true
derive_more = { workspace = true, features = ["display"] }
Loading