Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions crates/flashblocks-rpc/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use alloy_rpc_types_eth::Header as RPCHeader;
use eyre::eyre;
use op_alloy_network::Optimism;
use op_alloy_rpc_types::{OpTransactionReceipt, Transaction};
use reth::revm::db::{Cache, TransitionState};
use reth_rpc_eth_api::RpcBlock;

pub struct PendingBlockBuilder {
Expand All @@ -20,6 +21,8 @@ pub struct PendingBlockBuilder {
pub transactions_by_hash: HashMap<B256, Transaction>,
pub transactions: Vec<Transaction>,
pub state_overrides: Option<StateOverride>,
pub state_cache: Cache,
pub transition_state: Option<TransitionState>,
}

impl PendingBlockBuilder {
Expand All @@ -33,6 +36,8 @@ impl PendingBlockBuilder {
transaction_receipts: HashMap::default(),
transactions_by_hash: HashMap::default(),
state_overrides: None,
state_cache: Cache::default(),
transition_state: None,
}
}

Expand Down Expand Up @@ -85,6 +90,21 @@ impl PendingBlockBuilder {
self
}

#[inline]
pub(crate) fn with_state_cache(&mut self, cache: Cache) -> &Self {
self.state_cache = cache;
self
}

#[inline]
pub(crate) fn with_transition_state(
&mut self,
transition_state: Option<TransitionState>,
) -> &Self {
self.transition_state = transition_state;
self
}

pub(crate) fn build(self) -> eyre::Result<PendingBlock> {
let header = self.header.ok_or_else(|| eyre!("missing header"))?;

Expand All @@ -101,6 +121,8 @@ impl PendingBlockBuilder {
transactions: self.transactions,
flashblocks: self.flashblocks,
state_overrides: self.state_overrides,
state_cache: self.state_cache,
transition_state: self.transition_state,
})
}
}
Expand All @@ -115,6 +137,8 @@ pub struct PendingBlock {
transactions_by_hash: HashMap<B256, Transaction>,
transactions: Vec<Transaction>,
state_overrides: Option<StateOverride>,
state_cache: Cache,
transition_state: Option<TransitionState>,
}

impl PendingBlock {
Expand Down Expand Up @@ -168,4 +192,11 @@ impl PendingBlock {
pub fn get_state_overrides(&self) -> Option<StateOverride> {
self.state_overrides.clone()
}

pub fn get_state_cache(&self) -> Cache {
self.state_cache.clone()
}
pub fn get_state_transitions(&self) -> Option<TransitionState> {
self.transition_state.clone()
}
}
36 changes: 30 additions & 6 deletions crates/flashblocks-rpc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use reth::chainspec::{ChainSpecProvider, EthChainSpec};
use reth::providers::{BlockReaderIdExt, StateProviderFactory};
use reth::revm::context::result::ResultAndState;
use reth::revm::database::StateProviderDatabase;
use reth::revm::db::CacheDB;
use reth::revm::{DatabaseCommit, State};
use reth_evm::{ConfigureEvm, Evm};
use reth_optimism_chainspec::OpHardforks;
Expand All @@ -30,6 +31,7 @@ use reth_rpc_convert::transaction::ConvertReceiptInput;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_eth_api::{RpcBlock, RpcReceipt};
use std::borrow::Cow;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -91,10 +93,17 @@ where

fn process_flashblock(&self, flashblocks: Vec<Flashblock>) -> eyre::Result<PendingBlock> {
let mut pending_block_builder = PendingBlockBuilder::new();
// Number of txs in the block until last flashblock txs start
let txs_offset = flashblocks
.iter()
.rev()
.skip(1)
.map(|flashblock| flashblock.diff.transactions.len())
.sum::<usize>();

let base = flashblocks
.first()
.ok_or(eyre!("cannot build a pendingblock from no flashblocks"))?
.ok_or(eyre!("cannot build a pending block from no flashblocks"))?
.base
.clone()
.ok_or(eyre!("first flashblock does not contain a base"))?;
Expand Down Expand Up @@ -171,7 +180,11 @@ where
.with_database(state)
.with_bundle_update()
.build();

let mut db = CacheDB::new(db);
if let Some(pending_block) = self.pending_block.load().deref() {
db.cache = pending_block.get_state_cache();
db.db.transition_state = pending_block.get_state_transitions();
}
let block_env_attributes = OpNextBlockEnvAttributes {
timestamp: base.timestamp,
suggested_fee_recipient: base.fee_recipient,
Expand All @@ -189,8 +202,8 @@ where
let evm_env = evm_config.next_evm_env(&previous_header, &block_env_attributes)?;

let mut evm = evm_config.evm_with_env(db, evm_env);
let mut state_cache_builder = StateOverridesBuilder::default();

let mut last_fb_recovered_txs = Vec::with_capacity(block.body.transactions.len());
for (idx, transaction) in block.body.transactions.iter().enumerate() {
let sender = match transaction.recover_signer() {
Ok(signer) => signer,
Expand All @@ -206,6 +219,11 @@ where

let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender);
let envelope = recovered_transaction.clone().convert::<OpTxEnvelope>();
// Preserve recovered transaction from the last flashblock
// +1 to account for idx being the index
if idx + 1 > txs_offset {
last_fb_recovered_txs.push(recovered_transaction);
}

// Build Transaction
let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() {
Expand Down Expand Up @@ -280,9 +298,12 @@ where

gas_used = receipt.cumulative_gas_used();
next_log_index += receipt.logs().len();

}
let mut state_cache_builder = StateOverridesBuilder::default();
// Execute recovered transaction that belongs to the last flashblocks
for tx in last_fb_recovered_txs {
// EVM Transaction
let ResultAndState { state, .. } = evm.transact(recovered_transaction)?;
let ResultAndState { state, .. } = evm.transact(tx)?;
for (addr, acc) in &state {
let state_diff = B256HashMap::<B256>::from_iter(
acc.storage
Expand All @@ -302,8 +323,11 @@ where
evm.db_mut().commit(state);
// End EVM Transaction
}

pending_block_builder.with_state_overrides(state_cache_builder.build());
// Preserve current state transition and cache from cachedb
let CacheDB { cache, db } = evm.into_db();
pending_block_builder.with_transition_state(db.transition_state);
pending_block_builder.with_state_cache(cache);

for (address, balance) in updated_balances {
pending_block_builder.with_account_balance(address, balance);
Expand Down
3 changes: 3 additions & 0 deletions crates/flashblocks-rpc/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ where
);
}
},
Ok(Message::Text(_)) => {
error!("Received flashblock as plaintext, only compressed flashblocks supported. Set up websocket-proxy to use compressed flashblocks.");
}
Ok(Message::Close(_)) => {
info!(message = "WebSocket connection closed by upstream");
break;
Expand Down