Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions crates/core/src/runloops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,20 @@ fn start_geyser_runloop(
// Map between each plugin's UUID to its entry (index, plugin_name)
let mut plugin_map: HashMap<crate::Uuid, (usize, String)> = HashMap::new();

// helper to log errors that can't be propagated
let log_error = |msg:String|{
let _ = simnet_events_tx.send(SimnetEvent::error(msg));
};

let log_warn = |msg:String|{
let _ = simnet_events_tx.send(SimnetEvent::warn(msg));
};

let log_info = |msg:String|{
let _ = simnet_events_tx.send(SimnetEvent::info(msg));
};


#[cfg(feature = "geyser_plugin")]
for plugin_config_path in plugin_config_paths.into_iter() {
let plugin_manifest_location = FileLocation::from_path(plugin_config_path);
Expand Down Expand Up @@ -508,7 +522,7 @@ fn start_geyser_runloop(
let lib = match Library::new(&plugin_dylib_location.to_string()) {
Ok(lib) => lib,
Err(e) => {
let _ = simnet_events_tx.send(SimnetEvent::ErrorLog(Local::now(), format!("Unable to load plugin {}: {}", plugin_dylib_location.to_string(), e.to_string())));
log_error(format!("Unable to load plugin {}: {}", plugin_dylib_location.to_string(), e.to_string()));
continue;
}
};
Expand Down Expand Up @@ -541,11 +555,14 @@ fn start_geyser_runloop(
plugin_map: &mut HashMap<uuid::Uuid, (usize, String)>,
indexing_enabled: &mut bool|
-> Result<(), String> {
let _ = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(
if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::CreateCollection(
uuid,
config.data.clone(),
notifier,
));
)){
return Err(format!("Failed to send CreateCollection command: {:?}", e));
};

let mut plugin = SurfpoolSubgraphPlugin::default();

let (server, ipc_token) =
Expand All @@ -564,11 +581,18 @@ fn start_geyser_runloop(
.on_load(&config_file, false)
.map_err(|e| format!("Failed to load Geyser plugin: {:?}", e))?;

if let Ok((_, rx)) = server.accept() {
let subgraph_rx = ipc_router
.route_ipc_receiver_to_new_crossbeam_receiver::<DataIndexingCommand>(rx);
let _ = subgraph_commands_tx.send(SubgraphCommand::ObserveCollection(subgraph_rx));
};
match server.accept() {
Ok((_, rx)) => {
let subgraph_rx = ipc_router
.route_ipc_receiver_to_new_crossbeam_receiver::<DataIndexingCommand>(rx);
if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::ObserveCollection(subgraph_rx)) {
return Err(format!("Failed to send ObserveCollection command: {:?}", e));
}
}
Err(e) => {
return Err(format!("Failed to accept IPC connection for subgraph {}: {:?}", uuid, e));
}
};

*indexing_enabled = true;

Expand Down Expand Up @@ -597,7 +621,9 @@ fn start_geyser_runloop(
}

// Destroy database/schema for this collection
let _ = subgraph_commands_tx.send(SubgraphCommand::DestroyCollection(uuid));
if let Err(e) = subgraph_commands_tx.send(SubgraphCommand::DestroyCollection(uuid)){
return Err(format!("Failed to send DestroyCollection command for {}: {:?}", uuid, e));
}

// Unload the plugin
surfpool_plugin_manager[plugin_index].on_unload();
Expand All @@ -616,6 +642,8 @@ fn start_geyser_runloop(
// Disable indexing if no plugins remain
if surfpool_plugin_manager.is_empty() {
*indexing_enabled = false;
// Add Logging When Indexing Disabled
log_info("All plugins unloaded,indexing disabled".to_string())
}

Ok(())
Expand Down Expand Up @@ -647,8 +675,16 @@ fn start_geyser_runloop(
}
#[cfg(feature = "subgraph")]
PluginManagerCommand::UnloadPlugin(uuid, notifier) => {
let result = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled);
let _ = notifier.send(result);
match unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
Ok(_)=>{
log_info(format!("Successfully unloaded plugin with UUID {}", uuid));
let _ = notifier.send(Ok(()));
}
Err(e)=>{
log_error(format!("Failed to unload plugin {}: {}", uuid, e));
let _ = notifier.send(Err(e));
}
}
}
#[cfg(not(feature = "subgraph"))]
PluginManagerCommand::ReloadPlugin(_, _, _) => {
Expand All @@ -657,16 +693,28 @@ fn start_geyser_runloop(
#[cfg(feature = "subgraph")]
PluginManagerCommand::ReloadPlugin(uuid, config, notifier) => {
// Unload the old plugin
if let Err(e) = unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
let _ = simnet_events_tx.try_send(SimnetEvent::error(format!("Failed to unload plugin during reload: {}", e)));
continue;
}

let _ = simnet_events_tx.try_send(SimnetEvent::info(format!("Unloaded plugin with UUID - {}", uuid)));

// Load the new plugin with the same UUID
if let Err(e) = load_subgraph_plugin(uuid, config, notifier, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
let _ = simnet_events_tx.try_send(SimnetEvent::error(format!("Failed to reload plugin: {}", e)));
match unload_plugin_by_uuid(uuid, &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
Ok(_)=>{
log_info(format!("Unloaded plugin with UUID {} for reload", uuid));

// Load the new plugin with the same UUID
match load_subgraph_plugin(uuid, config, notifier.clone(), &mut surfpool_plugin_manager, &mut plugin_map, &mut indexing_enabled) {
Ok(_)=>{
log_info(format!("Successfully reloaded plugin with UUID {}", uuid));
let _ = notifier.send(format!("Plugin {} reloaded successfully", uuid));
}
Err(e)=>{
let error_msg = format!("Failed to reload plugin {}: {}", uuid, e);
log_error(error_msg.clone());
let _ = notifier.send(error_msg);
}
}
}
Err(e)=>{
let error_msg = format!("Failed to unload plugin {} during reload: {}", uuid, e);
log_error(error_msg.clone());
let _ = notifier.send(error_msg);
}
}
}
PluginManagerCommand::ListPlugins(notifier) => {
Expand Down Expand Up @@ -698,7 +746,7 @@ fn start_geyser_runloop(
let transaction = match versioned_transaction {
Some(tx) => tx,
None => {
let _ = simnet_events_tx.send(SimnetEvent::warn("Unable to index sanitized transaction".to_string()));
log_warn("Unable to index sanitized transaction".to_string());
continue;
}
};
Expand All @@ -714,14 +762,14 @@ fn start_geyser_runloop(

for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_3(&transaction_replica), transaction_with_status_meta.slot) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
log_error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e))
};
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.notify_transaction(ReplicaTransactionInfoVersions::V0_0_3(&transaction_replica), transaction_with_status_meta.slot) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e)));
log_error(format!("Failed to notify Geyser plugin of new transaction: {:?}", e))
};
}
}
Expand All @@ -747,14 +795,14 @@ fn start_geyser_runloop(

for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, false) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update account in Geyser plugin: {:?}", e)));
log_error(format!("Failed to update account in Geyser plugin: {:?}", e));
}
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, false) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to update account in Geyser plugin: {:?}", e)));
log_error(format!("Failed to update account in Geyser plugin: {:?}", e))
}
}
}
Expand All @@ -781,14 +829,14 @@ fn start_geyser_runloop(
// Send startup account updates with is_startup=true
for plugin in surfpool_plugin_manager.iter() {
if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, true) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to send startup account update to Geyser plugin: {:?}", e)));
log_error(format!("Failed to send startup account update to Geyser plugin: {:?}", e));
}
}

#[cfg(feature = "geyser_plugin")]
for plugin in plugin_manager.plugins.iter() {
if let Err(e) = plugin.update_account(ReplicaAccountInfoVersions::V0_0_3(&account_replica), slot, true) {
let _ = simnet_events_tx.send(SimnetEvent::error(format!("Failed to send startup account update to Geyser plugin: {:?}", e)));
log_error(format!("Failed to send startup account update to Geyser plugin: {:?}", e))
}
}
}
Expand Down