Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions crates/flashblocks-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use jsonrpsee::{
core::{async_trait, RpcResult},
proc_macros::rpc,
};
use jsonrpsee_types::error::INVALID_PARAMS_CODE;
use jsonrpsee_types::ErrorObjectOwned;
use op_alloy_network::Optimism;
use op_alloy_rpc_types::OpTransactionRequest;
use reth::providers::CanonStateSubscriptions;
use reth::rpc::server_types::eth::EthApiError::TransactionConfirmationTimeout;
use reth_rpc_eth_api::helpers::EthState;
use reth_rpc_eth_api::helpers::EthTransactions;
use reth_rpc_eth_api::helpers::{EthBlocks, EthCall};
Expand All @@ -28,6 +29,15 @@ use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::{debug, trace, warn};

/// Max configured timeout for `eth_sendRawTransactionSync` in milliseconds.
pub const MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS: u64 = 6_000;

/// Error code for timeout error.
///
/// Specs <https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7966.md#method-name>.
// todo: replace with <https://github.com/paradigmxyz/reth/issues/18251>
pub const ETH_ERROR_CODE_TIMEOUT: i32 = 4;

/// Core API for accessing flashblock state and data.
pub trait FlashblocksAPI {
/// Retrieves the current block. If `full` is true, includes full transaction details.
Expand Down Expand Up @@ -88,6 +98,7 @@ pub trait EthApiOverride {
async fn send_raw_transaction_sync(
&self,
transaction: alloy_primitives::Bytes,
timeout_ms: Option<u64>,
) -> RpcResult<RpcReceipt<Optimism>>;

#[method(name = "call")]
Expand Down Expand Up @@ -236,9 +247,22 @@ where
async fn send_raw_transaction_sync(
&self,
transaction: alloy_primitives::Bytes,
timeout_ms: Option<u64>,
) -> RpcResult<RpcReceipt<Optimism>> {
debug!(message = "rpc::send_raw_transaction_sync");

let timeout_ms = match timeout_ms {
Some(ms) if ms > MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS => {
return Err(ErrorObjectOwned::owned(
INVALID_PARAMS_CODE,
format!("time out too long, timeout: {ms} ms, max: {MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS} ms"),
None::<()>,
))
},
Some(ms) => ms,
_ => MAX_TIMEOUT_SEND_RAW_TX_SYNC_MS,
};

let tx_hash = match EthTransactions::send_raw_transaction(&self.eth_api, transaction).await
{
Ok(hash) => hash,
Expand All @@ -247,10 +271,11 @@ where

debug!(
message = "rpc::send_raw_transaction_sync::sent_transaction",
tx_hash = %tx_hash
tx_hash = %tx_hash,
timeout_ms = timeout_ms,
);

const TIMEOUT_DURATION: Duration = Duration::from_secs(6);
let timeout = Duration::from_millis(timeout_ms);
loop {
tokio::select! {
receipt = self.wait_for_flashblocks_receipt(tx_hash) => {
Expand All @@ -267,11 +292,12 @@ where
continue
}
}
_ = time::sleep(TIMEOUT_DURATION) => {
return Err(TransactionConfirmationTimeout {
hash: tx_hash,
duration: TIMEOUT_DURATION,
}.into_rpc_err());
_ = time::sleep(timeout) => {
return Err(ErrorObjectOwned::owned(
ETH_ERROR_CODE_TIMEOUT,
format!("transaction confirmation timed out after {timeout_ms} ms"),
None::<()>,
));
}
}
}
Expand Down
29 changes: 25 additions & 4 deletions crates/flashblocks-rpc/src/tests/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[cfg(test)]
mod tests {
use crate::rpc::ETH_ERROR_CODE_TIMEOUT;
use crate::rpc::{EthApiExt, EthApiOverrideServer};
use crate::state::FlashblocksState;
use crate::subscription::{Flashblock, FlashblocksReceiver, Metadata};
Expand Down Expand Up @@ -72,12 +73,13 @@ mod tests {
pub async fn send_raw_transaction_sync(
&self,
tx: Bytes,
timeout_ms: Option<u64>,
) -> eyre::Result<RpcReceipt<Optimism>> {
let url = format!("http://{}", self.http_api_addr);
let client = RpcClient::new_http(url.parse()?);

let receipt = client
.request::<_, RpcReceipt<Optimism>>("eth_sendRawTransactionSync", (tx,))
.request::<_, RpcReceipt<Optimism>>("eth_sendRawTransactionSync", (tx, timeout_ms))
.await?;

Ok(receipt)
Expand Down Expand Up @@ -526,16 +528,35 @@ mod tests {
node.send_payload(create_first_payload()).await?;

// run the Tx sync and, in parallel, deliver the payload that contains the Tx
let (receipt_result, payload_result) =
tokio::join!(node.send_raw_transaction_sync(TRANSFER_ETH_TX), async {
let (receipt_result, payload_result) = tokio::join!(
node.send_raw_transaction_sync(TRANSFER_ETH_TX, None),
async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
node.send_payload(create_second_payload()).await
});
}
);

payload_result?;
let receipt = receipt_result?;

assert_eq!(receipt.transaction_hash(), TRANSFER_ETH_HASH);
Ok(())
}

#[tokio::test]
async fn test_send_raw_transaction_sync_timeout() {
reth_tracing::init_test_tracing();
let node = setup_node().await.unwrap();

// fail request immediately by passing timeout of 0 ms
let receipt_result = node
.send_raw_transaction_sync(TRANSFER_ETH_TX, Some(0))
.await;

assert!(receipt_result
.err()
.unwrap()
.to_string()
.contains(&ETH_ERROR_CODE_TIMEOUT.to_string()));
}
}