Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.15.4"
version = "0.15.5"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand Down
2 changes: 2 additions & 0 deletions ractor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub mod actor_ref;
pub mod derived_actor;
mod supervision;

#[cfg(test)]
mod supervision_tests;
#[cfg(test)]
mod tests;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,9 +1112,9 @@ async fn test_supervisor_captures_dead_childs_state() {
}
}

let mut temp_state = super::super::messages::BoxedState::new(123u64);
let mut temp_state = super::messages::BoxedState::new(123u64);
assert_eq!(Err(BoxedDowncastErr), temp_state.take::<u32>());
let mut temp_state = super::super::messages::BoxedState::new(123u64);
let mut temp_state = super::messages::BoxedState::new(123u64);
assert_eq!(Ok(123u64), temp_state.take::<u64>());
assert_eq!(Err(BoxedDowncastErr), temp_state.take::<u64>());

Expand Down
2 changes: 0 additions & 2 deletions ractor/src/actor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use crate::{
Actor, ActorCell, ActorProcessingErr, ActorRef, ActorStatus, SpawnErr, SupervisionEvent,
};

mod supervisor;

struct EmptyMessage;
#[cfg(feature = "cluster")]
impl crate::Message for EmptyMessage {}
Expand Down
99 changes: 69 additions & 30 deletions ractor/src/thread_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::future::Future;

use crate::concurrency::JoinHandle;
use crate::Actor as TraditionalActor;
use crate::Actor as SendActor;
use crate::ActorCell;
use crate::ActorName;
use crate::ActorProcessingErr;
Expand All @@ -25,15 +25,10 @@

mod inner;
#[cfg(test)]
mod supervision_tests;
#[cfg(test)]
mod tests;

/// Represents the state of an thread-local actor. Separate
/// from the traditional [State] trait, this does NOT require
/// that the state be [Send] and therefore does NOT need to be
/// safe to send between threads.
pub trait ThreadLocalState: std::any::Any + 'static {}
impl<T: std::any::Any + 'static> ThreadLocalState for T {}

/// [ThreadLocalActor] defines the behavior of an Actor. It specifies the
/// Message type, State type, and all processing logic for the actor
///
Expand All @@ -56,13 +51,15 @@
/// The message type for this actor
type Msg: Message;

/// The type of state this actor manages internally
type State: ThreadLocalState;
/// The type of state this actor manages internally. This type
/// has no bound requirements, and needs to neither be [Send] nor
/// [Sync] when used in a [ThreadLocalActor] context.
type State;

/// Initialization arguments. These must be [Send] as they are
/// sent to the pinned thread in order to startup the actor.
/// However the actor's local `State` does NOT need to be
/// [Send] and neither does the actor instance.
/// However the actor's local [ThreadLocalActor::State] does
/// NOT need to be [Send] and neither does the actor instance.
type Arguments: State;

/// Invoked when an actor is being started by the system.
Expand All @@ -71,14 +68,14 @@
/// performed here hence why it returns the initial state.
///
/// Panics in `pre_start` do not invoke the
/// supervision strategy and the actor won't be started. [Actor]::`spawn`
/// supervision strategy and the actor won't be started. [ThreadLocalActor]::`spawn`
/// will return an error to the caller
///
/// * `myself` - A handle to the [ActorCell] representing this actor
/// * `args` - Arguments that are passed in the spawning of the actor which might
/// be necessary to construct the initial state
///
/// Returns an initial [Actor::State] to bootstrap the actor
/// Returns an initial [ThreadLocalActor::State] to bootstrap the actor
fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
Expand All @@ -104,7 +101,7 @@
}

/// Invoked after an actor has been stopped to perform final cleanup. In the
/// event the actor is terminated with [Signal::Kill] or has self-panicked,
/// event the actor is terminated with `Signal::Kill` or has self-panicked,
/// `post_stop` won't be called.
///
/// Panics in `post_stop` follow the supervision strategy.
Expand Down Expand Up @@ -183,7 +180,7 @@
///
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// * `startup_args`: Arguments passed to the `pre_start` call of the [ThreadLocalActor] to facilitate startup and
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
Expand All @@ -200,8 +197,8 @@
///
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and
/// initial state creation
/// * `startup_args`: Arguments passed to the `pre_start` call of the [ThreadLocalActor] to
/// facilitate startup and initial state creation
/// * `supervisor`: The [ActorCell] which is to become the supervisor (parent) of this actor
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
Expand All @@ -224,26 +221,26 @@

impl<T> ThreadLocalActor for T
where
T: TraditionalActor + Default,
T: SendActor + Default,
{
type Msg = <T as TraditionalActor>::Msg;
type State = <T as TraditionalActor>::State;
type Arguments = <T as TraditionalActor>::Arguments;
type Msg = <T as SendActor>::Msg;
type State = <T as SendActor>::State;
type Arguments = <T as SendActor>::Arguments;

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
<Self as TraditionalActor>::pre_start(self, myself, args).await
<Self as SendActor>::pre_start(self, myself, args).await

Check warning on line 235 in ractor/src/thread_local.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/thread_local.rs#L235

Added line #L235 was not covered by tests
}

async fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as TraditionalActor>::post_start(self, myself, state).await
<Self as SendActor>::post_start(self, myself, state).await

Check warning on line 243 in ractor/src/thread_local.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/thread_local.rs#L243

Added line #L243 was not covered by tests
}

async fn handle(
Expand All @@ -252,7 +249,7 @@
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as TraditionalActor>::handle(self, myself, message, state).await
<Self as SendActor>::handle(self, myself, message, state).await

Check warning on line 252 in ractor/src/thread_local.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/thread_local.rs#L252

Added line #L252 was not covered by tests
}

#[cfg(feature = "cluster")]
Expand All @@ -262,7 +259,7 @@
message: crate::message::SerializedMessage,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as TraditionalActor>::handle_serialized(self, myself, message, state).await
<Self as SendActor>::handle_serialized(self, myself, message, state).await

Check warning on line 262 in ractor/src/thread_local.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/thread_local.rs#L262

Added line #L262 was not covered by tests
}

async fn handle_supervisor_evt(
Expand All @@ -271,7 +268,7 @@
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as TraditionalActor>::handle_supervisor_evt(self, myself, message, state).await
<Self as SendActor>::handle_supervisor_evt(self, myself, message, state).await

Check warning on line 271 in ractor/src/thread_local.rs

View check run for this annotation

Codecov / codecov/patch

ractor/src/thread_local.rs#L271

Added line #L271 was not covered by tests
}
}

Expand All @@ -282,6 +279,7 @@
+ Send,
>,
reply: RpcReplyPort<JoinHandle<Result<JoinHandle<()>, SpawnErr>>>,
name: Option<String>,
}

/// The [ThreadLocalActorSpawner] is responsible for spawning [ThreadLocalActor] instances
Expand Down Expand Up @@ -320,10 +318,27 @@

// TODO (seanlawlor): Supported named spawn
local.spawn_local(async move {
while let Some(SpawnArgs { builder, reply }) = recv.recv().await {
while let Some(SpawnArgs {
builder,
reply,
name,
}) = recv.recv().await
{
let fut = builder();
let handle = tokio::task::spawn_local(fut);
_ = reply.send(handle);
#[cfg(tokio_unstable)]
{
let mut builder = tokio::task::Builder::new();
if let Some(name) = name {
builder = builder.name(name);
}
builder.spawn_local(fut).expect("Tokio task spawn failed")
}
#[cfg(not(tokio_unstable))]
{
_ = name;
let handle = tokio::task::spawn_local(fut);
_ = reply.send(handle);
}
}
// If the while loop returns, then all the LocalSpawner
// objects have been dropped.
Expand All @@ -345,11 +360,13 @@
-> std::pin::Pin<Box<dyn Future<Output = Result<JoinHandle<()>, SpawnErr>>>>
+ Send,
>,
name: Option<String>,
) -> Result<JoinHandle<()>, SpawnErr> {
let (tx, rx) = crate::concurrency::oneshot();
let args = SpawnArgs {
builder,
reply: tx.into(),
name,
};

if self.send.send(args).is_err() {
Expand All @@ -362,3 +379,25 @@
.map_err(|joinerr| SpawnErr::StartupFailed(joinerr.into()))?
}
}

impl ActorCell {
/// Spawn an actor of the given type as a thread-local child of this actor, automatically starting the actor.
/// This [ActorCell] becomes the supervisor of the child actor.
///
/// * `name`: A name to give the actor. Useful for global referencing or debug printing
/// * `handler` The implementation of Self
/// * `startup_args`: Arguments passed to the `pre_start` call of the [Actor] to facilitate startup and

Check warning on line 389 in ractor/src/thread_local.rs

View workflow job for this annotation

GitHub Actions / docs

unresolved link to `Actor`
/// initial state creation
///
/// Returns a [Ok((ActorRef, JoinHandle<()>))] upon successful start, denoting the actor reference
/// along with the join handle which will complete when the actor terminates. Returns [Err(SpawnErr)] if
/// the actor failed to start
pub async fn spawn_local_linked<T: ThreadLocalActor>(
&self,
name: Option<String>,
startup_args: T::Arguments,
spawner: ThreadLocalActorSpawner,
) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
T::spawn_linked(name, startup_args, self.clone(), spawner).await
}
}
3 changes: 2 additions & 1 deletion ractor/src/thread_local/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl<TActor: ThreadLocalActor> ThreadLocalActorRuntime<TActor> {
actor_ref.set_status(ActorStatus::Starting);

// Generate the ActorRef which will be returned
let spawn_name = name.clone();
let myself_ret = actor_ref.clone();

// run the processing loop, backgrounding the work
Expand Down Expand Up @@ -311,7 +312,7 @@ impl<TActor: ThreadLocalActor> ThreadLocalActorRuntime<TActor> {
.boxed_local()
});
let handle = spawner
.spawn(builder)
.spawn(builder, spawn_name)
.await
.map_err(|e| SpawnErr::StartupFailed(e.into()))?;

Expand Down
Loading
Loading