diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index 093a1a6e..1e33fd4f 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -12,10 +12,11 @@ use jsonrpsee::{ core::{async_trait, RpcResult}, proc_macros::rpc, }; +use jsonrpsee_types::error::INVALID_PARAMS_CODE; +use jsonrpsee_types::ErrorObjectOwned; use op_alloy_network::Optimism; use op_alloy_rpc_types::OpTransactionRequest; use reth::providers::CanonStateSubscriptions; -use reth::rpc::server_types::eth::EthApiError::TransactionConfirmationTimeout; use reth_rpc_eth_api::helpers::EthState; use reth_rpc_eth_api::helpers::EthTransactions; use reth_rpc_eth_api::helpers::{EthBlocks, EthCall}; @@ -28,6 +29,15 @@ use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use tracing::{debug, trace, warn}; +/// Max configured timeout for `eth_sendRawTransactionSync` in milliseconds. +pub const MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS: u64 = 6_000; + +/// Error code for timeout error. +/// +/// Specs . +// todo: replace with +pub const ETH_ERROR_CODE_TIMEOUT: i32 = 4; + /// Core API for accessing flashblock state and data. pub trait FlashblocksAPI { /// Retrieves the current block. If `full` is true, includes full transaction details. @@ -88,6 +98,7 @@ pub trait EthApiOverride { async fn send_raw_transaction_sync( &self, transaction: alloy_primitives::Bytes, + timeout_ms: Option, ) -> RpcResult>; #[method(name = "call")] @@ -236,9 +247,22 @@ where async fn send_raw_transaction_sync( &self, transaction: alloy_primitives::Bytes, + timeout_ms: Option, ) -> RpcResult> { debug!(message = "rpc::send_raw_transaction_sync"); + let timeout_ms = match timeout_ms { + Some(ms) if ms > MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS => { + return Err(ErrorObjectOwned::owned( + INVALID_PARAMS_CODE, + format!("time out too long, timeout: {ms} ms, max: {MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS} ms"), + None::<()>, + )) + }, + Some(ms) => ms, + _ => MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS, + }; + let tx_hash = match EthTransactions::send_raw_transaction(&self.eth_api, transaction).await { Ok(hash) => hash, @@ -247,10 +271,11 @@ where debug!( message = "rpc::send_raw_transaction_sync::sent_transaction", - tx_hash = %tx_hash + tx_hash = %tx_hash, + timeout_ms = timeout_ms, ); - const TIMEOUT_DURATION: Duration = Duration::from_secs(6); + let timeout = Duration::from_millis(timeout_ms); loop { tokio::select! { receipt = self.wait_for_flashblocks_receipt(tx_hash) => { @@ -267,11 +292,12 @@ where continue } } - _ = time::sleep(TIMEOUT_DURATION) => { - return Err(TransactionConfirmationTimeout { - hash: tx_hash, - duration: TIMEOUT_DURATION, - }.into_rpc_err()); + _ = time::sleep(timeout) => { + return Err(ErrorObjectOwned::owned( + ETH_ERROR_CODE_TIMEOUT, + format!("transaction confirmation timed out after {timeout_ms} ms"), + None::<()>, + )); } } } diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index a53fc374..2ce0118b 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -1,5 +1,6 @@ #[cfg(test)] mod tests { + use crate::rpc::ETH_ERROR_CODE_TIMEOUT; use crate::rpc::{EthApiExt, EthApiOverrideServer}; use crate::state::FlashblocksState; use crate::subscription::{Flashblock, FlashblocksReceiver, Metadata}; @@ -72,12 +73,13 @@ mod tests { pub async fn send_raw_transaction_sync( &self, tx: Bytes, + timeout_ms: Option, ) -> eyre::Result> { let url = format!("http://{}", self.http_api_addr); let client = RpcClient::new_http(url.parse()?); let receipt = client - .request::<_, RpcReceipt>("eth_sendRawTransactionSync", (tx,)) + .request::<_, RpcReceipt>("eth_sendRawTransactionSync", (tx, timeout_ms)) .await?; Ok(receipt) @@ -526,11 +528,13 @@ mod tests { node.send_payload(create_first_payload()).await?; // run the Tx sync and, in parallel, deliver the payload that contains the Tx - let (receipt_result, payload_result) = - tokio::join!(node.send_raw_transaction_sync(TRANSFER_ETH_TX), async { + let (receipt_result, payload_result) = tokio::join!( + node.send_raw_transaction_sync(TRANSFER_ETH_TX, None), + async { tokio::time::sleep(std::time::Duration::from_millis(100)).await; node.send_payload(create_second_payload()).await - }); + } + ); payload_result?; let receipt = receipt_result?; @@ -538,4 +542,21 @@ mod tests { assert_eq!(receipt.transaction_hash(), TRANSFER_ETH_HASH); Ok(()) } + + #[tokio::test] + async fn test_send_raw_transaction_sync_timeout() { + reth_tracing::init_test_tracing(); + let node = setup_node().await.unwrap(); + + // fail request immediately by passing timeout of 0 ms + let receipt_result = node + .send_raw_transaction_sync(TRANSFER_ETH_TX, Some(0)) + .await; + + assert!(receipt_result + .err() + .unwrap() + .to_string() + .contains(Ð_ERROR_CODE_TIMEOUT.to_string())); + } }