Skip to content

Commit 02f69ea

Browse files
authored
Handle ERROR_MORE_DATA when scheduling read for Window named pipes
1 parent 625655d commit 02f69ea

File tree

2 files changed

+131
-6
lines changed

2 files changed

+131
-6
lines changed

src/sys/windows/named_pipe.rs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::sync::{Arc, Mutex};
77
use std::{fmt, mem, slice};
88

99
use windows_sys::Win32::Foundation::{
10-
ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
11-
ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
10+
ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_MORE_DATA, ERROR_NO_DATA,
11+
ERROR_PIPE_CONNECTED, ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
1212
};
1313
use windows_sys::Win32::Storage::FileSystem::{
1414
ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
@@ -705,10 +705,13 @@ impl Inner {
705705
/// Schedules a read to happen in the background, executing an overlapped
706706
/// operation.
707707
///
708-
/// This function returns `true` if a normal error happens or if the read
709-
/// is scheduled in the background. If the pipe is no longer connected
710-
/// (ERROR_PIPE_LISTENING) then `false` is returned and no read is
711-
/// scheduled.
708+
/// This function returns `true` if either of the following conditions are met:
709+
/// * A normal error happens
710+
/// * The read is scheduled in the background
711+
/// * Data is already available to be read (ERROR_MORE_DATA)
712+
///
713+
/// If the pipe is no longer connected (ERROR_PIPE_LISTENING) then `false` is
714+
/// returned and no read is scheduled.
712715
fn schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool {
713716
// Check to see if a read is already scheduled/completed
714717
match io.read {
@@ -736,6 +739,20 @@ impl Inner {
736739
// we just need to wait for a connect.
737740
Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => false,
738741

742+
// If ERROR_MORE_DATA is returned, it means the slice of unused capacity of the
743+
// buffer provided is less than the amount of data available to be read. So
744+
// prioritize draining the buffer before scheduling a new read.
745+
//
746+
// Return `true` to indicate that an overlapped read was scheduled "successfully",
747+
// without actually scheduling it. Instead, update `io.read` to `State::Ok(buf, 0)`
748+
// to ensure that the next `std::io::Read::read` call is presented still with the
749+
// unread data to read from.
750+
Err(ref e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => {
751+
io.read = State::Ok(buf, 0);
752+
mem::forget(me.clone());
753+
true
754+
}
755+
739756
// If some other error happened, though, we're now readable to give
740757
// out the error.
741758
Err(e) => {
@@ -898,6 +915,13 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
898915
buf.set_len(status.bytes_transferred() as usize);
899916
io.read = State::Ok(buf, 0);
900917
}
918+
// This is non-fatal. The buffer was simply too small for the entire message.
919+
// Deliver the bytes we got, and if the caller wants to read the rest of the
920+
// message, they can initiate another read.
921+
Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => {
922+
buf.set_len(status.bytes_transferred() as usize);
923+
io.read = State::Ok(buf, 0);
924+
}
901925
Err(e) => {
902926
debug_assert_eq!(status.bytes_transferred(), 0);
903927
io.read = State::Err(e);

tests/win_named_pipe.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ use mio::windows::NamedPipe;
1010
use mio::{Events, Interest, Poll, Token};
1111
use windows_sys::Win32::{Foundation::ERROR_NO_DATA, Storage::FileSystem::FILE_FLAG_OVERLAPPED};
1212

13+
mod util;
14+
use util::{expect_events, ExpectEvent};
15+
1316
fn _assert_kinds() {
1417
fn _assert_send<T: Send>() {}
1518
fn _assert_sync<T: Sync>() {}
@@ -347,3 +350,101 @@ fn reregister_deregister_different_poll() {
347350
io::ErrorKind::AlreadyExists,
348351
);
349352
}
353+
354+
#[test]
355+
fn read_message_larger_than_internal_buffer() {
356+
let (mut server, mut client) = pipe();
357+
let mut poll = t!(Poll::new());
358+
t!(poll.registry().register(
359+
&mut server,
360+
Token(0),
361+
Interest::READABLE | Interest::WRITABLE,
362+
));
363+
t!(poll.registry().register(
364+
&mut client,
365+
Token(1),
366+
Interest::READABLE | Interest::WRITABLE,
367+
));
368+
let mut events = Events::with_capacity(128);
369+
t!(poll.poll(&mut events, None));
370+
371+
// Send message larger than the IPC kernel buffer (4096 bytes)
372+
let expected_msg = vec![0x5u8; 8192];
373+
assert_eq!(t!(client.write(&expected_msg)), 8192);
374+
375+
expect_events(
376+
&mut poll,
377+
&mut events,
378+
vec![ExpectEvent::new(Token(0), Interest::READABLE)],
379+
);
380+
381+
let mut buf = [0u8; 4000];
382+
let mut actual_msg = Vec::new();
383+
384+
loop {
385+
match server.read(&mut buf) {
386+
Ok(0) => break,
387+
Ok(n) => {
388+
actual_msg.extend_from_slice(&buf[..n]);
389+
if actual_msg.len() >= expected_msg.len() {
390+
break;
391+
}
392+
}
393+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
394+
t!(poll.poll(&mut events, Some(Duration::from_secs(1))));
395+
}
396+
Err(e) => panic!("error reading message: {e}"),
397+
}
398+
}
399+
400+
assert_eq!(expected_msg, actual_msg);
401+
}
402+
403+
#[test]
404+
fn read_with_small_buffer_provided() {
405+
let (mut server, mut client) = pipe();
406+
let mut poll = t!(Poll::new());
407+
t!(poll.registry().register(
408+
&mut server,
409+
Token(0),
410+
Interest::READABLE | Interest::WRITABLE,
411+
));
412+
t!(poll.registry().register(
413+
&mut client,
414+
Token(1),
415+
Interest::READABLE | Interest::WRITABLE,
416+
));
417+
418+
let mut events = Events::with_capacity(128);
419+
t!(poll.poll(&mut events, None));
420+
421+
let expected_msg = vec![1u8; 10000];
422+
assert_eq!(t!(client.write(&expected_msg)), 10000);
423+
424+
expect_events(
425+
&mut poll,
426+
&mut events,
427+
vec![ExpectEvent::new(Token(0), Interest::READABLE)],
428+
);
429+
430+
let mut buf = [0u8; 128];
431+
let mut actual_msg = Vec::new();
432+
433+
loop {
434+
match server.read(&mut buf) {
435+
Ok(0) => break,
436+
Ok(n) => {
437+
actual_msg.extend_from_slice(&buf[..n]);
438+
if actual_msg.len() >= expected_msg.len() {
439+
break;
440+
}
441+
}
442+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
443+
t!(poll.poll(&mut events, Some(Duration::from_millis(100))));
444+
}
445+
Err(e) => panic!("error reading message: {e}"),
446+
}
447+
}
448+
449+
assert_eq!(actual_msg, expected_msg);
450+
}

0 commit comments

Comments
 (0)