Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 78 additions & 33 deletions codex-rs/app-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ pub struct InProcessAppServerClient {
thread_manager: Arc<ThreadManager>,
}

/// Cloneable request handle for detached app-server RPCs.
#[derive(Clone)]
pub struct InProcessAppServerRequester {
command_tx: mpsc::Sender<ClientCommand>,
}

impl InProcessAppServerClient {
/// Starts the in-process runtime and facade worker task.
///
Expand Down Expand Up @@ -457,30 +463,19 @@ impl InProcessAppServerClient {
self.thread_manager.clone()
}

/// Returns a cloneable request handle for detached background RPCs.
pub fn requester(&self) -> InProcessAppServerRequester {
InProcessAppServerRequester {
command_tx: self.command_tx.clone(),
}
}

/// Sends a typed client request and returns raw JSON-RPC result.
///
/// Callers that expect a concrete response type should usually prefer
/// [`request_typed`](Self::request_typed).
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server request channel is closed",
)
})?
send_request(&self.command_tx, request).await
}

/// Sends a typed client request and decodes the successful response body.
Expand All @@ -493,20 +488,7 @@ impl InProcessAppServerClient {
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
request_typed(&self.command_tx, request).await
}

/// Sends a typed client notification.
Expand Down Expand Up @@ -641,6 +623,69 @@ impl InProcessAppServerClient {
}
}

impl InProcessAppServerRequester {
/// Sends a typed client request and returns raw JSON-RPC result.
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
send_request(&self.command_tx, request).await
}

/// Sends a typed client request and decodes the successful response body.
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
request_typed(&self.command_tx, request).await
}
}

async fn send_request(
command_tx: &mpsc::Sender<ClientCommand>,
request: ClientRequest,
) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
command_tx
.send(ClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server request channel is closed",
)
})?
}

async fn request_typed<T>(
command_tx: &mpsc::Sender<ClientCommand>,
request: ClientRequest,
) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
send_request(command_tx, request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}

/// Extracts the JSON-RPC method name for diagnostics without extending the
/// protocol crate with in-process-only helpers.
fn request_method_name(request: &ClientRequest) -> String {
Expand Down
Loading
Loading