diff --git a/crates/core/src/runloops/mod.rs b/crates/core/src/runloops/mod.rs index 43b6b242..c162570f 100644 --- a/crates/core/src/runloops/mod.rs +++ b/crates/core/src/runloops/mod.rs @@ -10,7 +10,8 @@ use std::{ }; use agave_geyser_plugin_interface::geyser_plugin_interface::{ - GeyserPlugin, ReplicaTransactionInfoV3, ReplicaTransactionInfoVersions, + GeyserPlugin, ReplicaBlockInfoV4, ReplicaBlockInfoVersions, ReplicaEntryInfoV2, + ReplicaEntryInfoVersions, ReplicaTransactionInfoV3, ReplicaTransactionInfoVersions, SlotStatus, }; use chrono::{Local, Utc}; use crossbeam::select; @@ -32,6 +33,7 @@ use solana_geyser_plugin_manager::geyser_plugin_manager::{ }; use solana_message::SimpleAddressLoader; use solana_transaction::sanitized::{MessageHash, SanitizedTransaction}; +use solana_transaction_status::RewardsAndNumPartitions; #[cfg(feature = "subgraph")] use surfpool_subgraph::SurfpoolSubgraphPlugin; use surfpool_types::{ @@ -171,6 +173,9 @@ pub async fn start_local_surfnet_runloop( let initial_transactions = svm_locker.with_svm_reader(|svm| svm.transactions_processed); let _ = simnet_events_tx_cc.send(SimnetEvent::Ready(initial_transactions)); + // Notify geyser plugins that startup is complete + let _ = svm_locker.with_svm_reader(|svm| svm.geyser_events_tx.send(GeyserEvent::EndOfStartup)); + start_block_production_runloop( clock_event_rx, clock_command_tx, @@ -792,6 +797,94 @@ fn start_geyser_runloop( } } } + Ok(GeyserEvent::EndOfStartup) => { + for plugin in surfpool_plugin_manager.iter() { + if let Err(e) = plugin.notify_end_of_startup() { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e))); + } + } + + #[cfg(feature = "geyser_plugin")] + for plugin in plugin_manager.plugins.iter() { + if let Err(e) = plugin.notify_end_of_startup() { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify end of startup to Geyser plugin: {:?}", e))); + } + } + } + Ok(GeyserEvent::UpdateSlotStatus { slot, parent, status }) => { + let slot_status = match status { + crate::surfnet::GeyserSlotStatus::Processed => SlotStatus::Processed, + crate::surfnet::GeyserSlotStatus::Confirmed => SlotStatus::Confirmed, + crate::surfnet::GeyserSlotStatus::Rooted => SlotStatus::Rooted, + }; + + for plugin in surfpool_plugin_manager.iter() { + if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e))); + } + } + + #[cfg(feature = "geyser_plugin")] + for plugin in plugin_manager.plugins.iter() { + if let Err(e) = plugin.update_slot_status(slot, parent, &slot_status) { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update slot status in Geyser plugin: {:?}", e))); + } + } + } + Ok(GeyserEvent::NotifyBlockMetadata(block_metadata)) => { + let rewards = RewardsAndNumPartitions { + rewards: vec![], + num_partitions: None, + }; + + let block_info = ReplicaBlockInfoV4 { + slot: block_metadata.slot, + blockhash: &block_metadata.blockhash, + rewards: &rewards, + block_time: block_metadata.block_time, + block_height: block_metadata.block_height, + parent_slot: block_metadata.parent_slot, + parent_blockhash: &block_metadata.parent_blockhash, + executed_transaction_count: block_metadata.executed_transaction_count, + entry_count: block_metadata.entry_count, + }; + + for plugin in surfpool_plugin_manager.iter() { + if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e))); + } + } + + #[cfg(feature = "geyser_plugin")] + for plugin in plugin_manager.plugins.iter() { + if let Err(e) = plugin.notify_block_metadata(ReplicaBlockInfoVersions::V0_0_4(&block_info)) { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify block metadata to Geyser plugin: {:?}", e))); + } + } + } + Ok(GeyserEvent::NotifyEntry(entry_info)) => { + let entry_replica = ReplicaEntryInfoV2 { + slot: entry_info.slot, + index: entry_info.index, + num_hashes: entry_info.num_hashes, + hash: &entry_info.hash, + executed_transaction_count: entry_info.executed_transaction_count, + starting_transaction_index: entry_info.starting_transaction_index, + }; + + for plugin in surfpool_plugin_manager.iter() { + if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e))); + } + } + + #[cfg(feature = "geyser_plugin")] + for plugin in plugin_manager.plugins.iter() { + if let Err(e) = plugin.notify_entry(ReplicaEntryInfoVersions::V0_0_2(&entry_replica)) { + let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify entry to Geyser plugin: {:?}", e))); + } + } + } } } }; diff --git a/crates/core/src/surfnet/mod.rs b/crates/core/src/surfnet/mod.rs index 00801bf9..2da40dd8 100644 --- a/crates/core/src/surfnet/mod.rs +++ b/crates/core/src/surfnet/mod.rs @@ -31,6 +31,43 @@ pub const SLOTS_PER_EPOCH: u64 = 432000; pub type AccountFactory = Box GetAccountResult + Send + Sync>; +/// Slot status for geyser plugin notifications. +/// Mirrors `agave_geyser_plugin_interface::geyser_plugin_interface::SlotStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum GeyserSlotStatus { + /// Slot is being processed + Processed, + /// Slot has been rooted (finalized) + Rooted, + /// Slot has been confirmed + Confirmed, +} + +/// Block metadata for geyser plugin notifications. +#[derive(Debug, Clone)] +pub struct GeyserBlockMetadata { + pub slot: Slot, + pub blockhash: String, + pub parent_slot: Slot, + pub parent_blockhash: String, + pub block_time: Option, + pub block_height: Option, + pub executed_transaction_count: u64, + pub entry_count: u64, +} + +/// Entry info for geyser plugin notifications. +/// Surfpool emits one entry per block (simplified model). +#[derive(Debug, Clone)] +pub struct GeyserEntryInfo { + pub slot: Slot, + pub index: usize, + pub num_hashes: u64, + pub hash: Vec, + pub executed_transaction_count: u64, + pub starting_transaction_index: usize, +} + #[allow(clippy::large_enum_variant)] pub enum GeyserEvent { NotifyTransaction(TransactionWithStatusMeta, Option), @@ -38,7 +75,18 @@ pub enum GeyserEvent { /// Account update sent at startup (before block production begins). /// These updates should be sent to geyser plugins with is_startup=true. StartupAccountUpdate(GeyserAccountUpdate), - // todo: add more events + /// Notify plugins that startup is complete. + EndOfStartup, + /// Update slot status (processed, confirmed, rooted/finalized). + UpdateSlotStatus { + slot: Slot, + parent: Option, + status: GeyserSlotStatus, + }, + /// Notify plugins of block metadata. + NotifyBlockMetadata(GeyserBlockMetadata), + /// Notify plugins of entry execution. + NotifyEntry(GeyserEntryInfo), } #[derive(Debug, Eq, PartialEq, Hash, Clone)] diff --git a/crates/core/src/surfnet/svm.rs b/crates/core/src/surfnet/svm.rs index e9e8c7ee..652697bf 100644 --- a/crates/core/src/surfnet/svm.rs +++ b/crates/core/src/surfnet/svm.rs @@ -90,8 +90,9 @@ use uuid::Uuid; use super::{ AccountSubscriptionData, BlockHeader, BlockIdentifier, FINALIZATION_SLOT_THRESHOLD, - GetAccountResult, GeyserEvent, SLOTS_PER_EPOCH, SignatureSubscriptionData, - SignatureSubscriptionType, remote::SurfnetRemoteClient, + GetAccountResult, GeyserBlockMetadata, GeyserEntryInfo, GeyserEvent, GeyserSlotStatus, + SLOTS_PER_EPOCH, SignatureSubscriptionData, SignatureSubscriptionType, + remote::SurfnetRemoteClient, }; use crate::{ error::{SurfpoolError, SurfpoolResult}, @@ -1961,7 +1962,7 @@ impl SurfnetSvm { slot, BlockHeader { hash: self.chain_tip.hash.clone(), - previous_blockhash: previous_chain_tip.hash, + previous_blockhash: previous_chain_tip.hash.clone(), block_time: self.updated_at as i64 / 1_000, block_height: self.chain_tip.index, parent_slot: slot, @@ -2004,6 +2005,46 @@ impl SurfnetSvm { let root = new_slot.saturating_sub(FINALIZATION_SLOT_THRESHOLD); self.notify_slot_subscribers(new_slot, parent_slot, root); + // Notify geyser plugins of slot status (Confirmed) + self.geyser_events_tx + .send(GeyserEvent::UpdateSlotStatus { + slot: new_slot, + parent: Some(parent_slot), + status: GeyserSlotStatus::Confirmed, + }) + .ok(); + + // Notify geyser plugins of block metadata + let block_metadata = GeyserBlockMetadata { + slot: new_slot, + blockhash: self.chain_tip.hash.clone(), + parent_slot, + parent_blockhash: previous_chain_tip.hash.clone(), + block_time: Some(self.updated_at as i64 / 1_000), + block_height: Some(self.chain_tip.index), + executed_transaction_count: num_transactions, + entry_count: 1, // Surfpool produces 1 entry per block + }; + self.geyser_events_tx + .send(GeyserEvent::NotifyBlockMetadata(block_metadata)) + .ok(); + + // Notify geyser plugins of entry (Surfpool emits 1 entry per block) + let entry_hash = solana_hash::Hash::from_str(&self.chain_tip.hash) + .map(|h| h.to_bytes().to_vec()) + .unwrap_or_else(|_| vec![0u8; 32]); + let entry_info = GeyserEntryInfo { + slot: new_slot, + index: 0, // Single entry per block + num_hashes: 1, + hash: entry_hash, + executed_transaction_count: num_transactions, + starting_transaction_index: 0, + }; + self.geyser_events_tx + .send(GeyserEvent::NotifyEntry(entry_info)) + .ok(); + let clock: Clock = Clock { slot: self.latest_epoch_info.absolute_slot, epoch: self.latest_epoch_info.epoch, @@ -2019,6 +2060,18 @@ impl SurfnetSvm { self.finalize_transactions()?; + // Notify geyser plugins of newly rooted (finalized) slot + // Only emit if root is a valid slot (greater than genesis) + if root >= self.genesis_slot { + self.geyser_events_tx + .send(GeyserEvent::UpdateSlotStatus { + slot: root, + parent: root.checked_sub(1), + status: GeyserSlotStatus::Rooted, + }) + .ok(); + } + // Evict the accounts marked as streamed from cache to enforce them to be fetched again let accounts_to_reset: Vec<_> = self.streamed_accounts.into_iter()?.collect(); for (pubkey_str, include_owned_accounts) in accounts_to_reset {