Skip to content

Commit a97e35e

Browse files
committed
dd: buffer partial blocks in the output writer
Add buffering of partial blocks in the output block writer until they are completed.
1 parent 8f7addc commit a97e35e

File tree

3 files changed

+194
-30
lines changed

3 files changed

+194
-30
lines changed

src/uu/dd/src/bufferedoutput.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ impl<'a> BufferedOutput<'a> {
4646
/// Flush the partial block stored in the internal buffer.
4747
pub(crate) fn flush(&mut self) -> std::io::Result<WriteStat> {
4848
let wstat = self.inner.write_blocks(&self.buf)?;
49-
let n = wstat.bytes_total;
50-
for _ in 0..n {
51-
self.buf.remove(0);
52-
}
49+
let n = wstat.bytes_total.try_into().unwrap();
50+
self.buf.drain(0..n);
5351
Ok(wstat)
5452
}
5553

@@ -70,28 +68,35 @@ impl<'a> BufferedOutput<'a> {
7068
/// block. The returned [`WriteStat`] object will include the
7169
/// number of blocks written during execution of this function.
7270
pub(crate) fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result<WriteStat> {
73-
// Concatenate the old partial block with the new incoming bytes.
74-
let towrite = [&self.buf, buf].concat();
71+
// Split the incoming buffer into two parts: the bytes to write
72+
// and the bytes to buffer for next time.
73+
//
74+
// If `buf` does not include enough bytes to form a full block,
75+
// just buffer the whole thing and write zero blocks.
76+
let n = self.buf.len() + buf.len();
77+
let rem = n % self.inner.settings.obs;
78+
let i = buf.len().saturating_sub(rem);
79+
let (to_write, to_buffer) = buf.split_at(i);
80+
81+
// Concatenate the old partial block with the new bytes to form
82+
// some number of complete blocks.
83+
self.buf.extend_from_slice(to_write);
7584

7685
// Write all complete blocks to the inner block writer.
7786
//
7887
// For example, if the output block size were 3, the buffered
7988
// partial block were `b"ab"` and the new incoming bytes were
8089
// `b"cdefg"`, then we would write blocks `b"abc"` and
8190
// b`"def"` to the inner block writer.
82-
let n = towrite.len();
83-
let rem = n % self.inner.settings.obs;
84-
let wstat = self.inner.write_blocks(&towrite[..n - rem])?;
85-
self.buf.clear();
91+
let wstat = self.inner.write_blocks(&self.buf)?;
8692

8793
// Buffer any remaining bytes as a partial block.
8894
//
8995
// Continuing the example above, the last byte `b"g"` would be
9096
// buffered as a partial block until the next call to
9197
// `write_blocks()`.
92-
for byte in &towrite[n - rem..] {
93-
self.buf.push(*byte);
94-
}
98+
self.buf.clear();
99+
self.buf.extend_from_slice(to_buffer);
95100

96101
Ok(wstat)
97102
}

src/uu/dd/src/dd.rs

Lines changed: 99 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod numbers;
1313
mod parseargs;
1414
mod progress;
1515

16+
use crate::bufferedoutput::BufferedOutput;
1617
use blocks::conv_block_unblock_helper;
1718
use datastructures::*;
1819
use parseargs::Parser;
@@ -801,6 +802,68 @@ impl<'a> Output<'a> {
801802
Ok(())
802803
}
803804
}
805+
806+
/// Truncate the underlying file to the current stream position, if possible.
807+
fn truncate(&mut self) -> std::io::Result<()> {
808+
self.dst.truncate()
809+
}
810+
}
811+
812+
/// The block writer either with or without partial block buffering.
813+
enum BlockWriter<'a> {
814+
/// Block writer with partial block buffering.
815+
///
816+
/// Partial blocks are buffered until completed.
817+
Buffered(BufferedOutput<'a>),
818+
819+
/// Block writer without partial block buffering.
820+
///
821+
/// Partial blocks are written immediately.
822+
Unbuffered(Output<'a>),
823+
}
824+
825+
impl<'a> BlockWriter<'a> {
826+
fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) {
827+
match self {
828+
Self::Unbuffered(o) => o.discard_cache(offset, len),
829+
Self::Buffered(o) => o.discard_cache(offset, len),
830+
}
831+
}
832+
833+
fn flush(&mut self) -> io::Result<WriteStat> {
834+
match self {
835+
Self::Unbuffered(_) => Ok(WriteStat::default()),
836+
Self::Buffered(o) => o.flush(),
837+
}
838+
}
839+
840+
fn sync(&mut self) -> io::Result<()> {
841+
match self {
842+
Self::Unbuffered(o) => o.sync(),
843+
Self::Buffered(o) => o.sync(),
844+
}
845+
}
846+
847+
/// Truncate the file to the final cursor location.
848+
fn truncate(&mut self) {
849+
// Calling `set_len()` may result in an error (for example,
850+
// when calling it on `/dev/null`), but we don't want to
851+
// terminate the process when that happens. Instead, we
852+
// suppress the error by calling `Result::ok()`. This matches
853+
// the behavior of GNU `dd` when given the command-line
854+
// argument `of=/dev/null`.
855+
match self {
856+
Self::Unbuffered(o) => o.truncate().ok(),
857+
Self::Buffered(o) => o.truncate().ok(),
858+
};
859+
}
860+
861+
fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result<WriteStat> {
862+
match self {
863+
Self::Unbuffered(o) => o.write_blocks(buf),
864+
Self::Buffered(o) => o.write_blocks(buf),
865+
}
866+
}
804867
}
805868

806869
/// Copy the given input data to this output, consuming both.
@@ -814,7 +877,7 @@ impl<'a> Output<'a> {
814877
///
815878
/// If there is a problem reading from the input or writing to
816879
/// this output.
817-
fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
880+
fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
818881
// The read and write statistics.
819882
//
820883
// These objects are counters, initialized to zero. After each
@@ -851,6 +914,9 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
851914
let (prog_tx, rx) = mpsc::channel();
852915
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
853916

917+
// Whether to truncate the output file after all blocks have been written.
918+
let truncate = !o.settings.oconv.notrunc;
919+
854920
// Optimization: if no blocks are to be written, then don't
855921
// bother allocating any buffers.
856922
if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count {
@@ -875,7 +941,15 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
875941
let len = o.dst.len()?.try_into().unwrap();
876942
o.discard_cache(offset, len);
877943
}
878-
return finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread);
944+
return finalize(
945+
BlockWriter::Unbuffered(o),
946+
rstat,
947+
wstat,
948+
start,
949+
&prog_tx,
950+
output_thread,
951+
truncate,
952+
);
879953
};
880954

881955
// Create a common buffer with a capacity of the block size.
@@ -895,6 +969,16 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
895969
let mut read_offset = 0;
896970
let mut write_offset = 0;
897971

972+
let input_nocache = i.settings.iflags.nocache;
973+
let output_nocache = o.settings.oflags.nocache;
974+
975+
// Add partial block buffering, if needed.
976+
let mut o = if o.settings.buffered {
977+
BlockWriter::Buffered(BufferedOutput::new(o))
978+
} else {
979+
BlockWriter::Unbuffered(o)
980+
};
981+
898982
// The main read/write loop.
899983
//
900984
// Each iteration reads blocks from the input and writes
@@ -919,7 +1003,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
9191003
//
9201004
// TODO Better error handling for overflowing `offset` and `len`.
9211005
let read_len = rstat_update.bytes_total;
922-
if i.settings.iflags.nocache {
1006+
if input_nocache {
9231007
let offset = read_offset.try_into().unwrap();
9241008
let len = read_len.try_into().unwrap();
9251009
i.discard_cache(offset, len);
@@ -931,7 +1015,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
9311015
//
9321016
// TODO Better error handling for overflowing `offset` and `len`.
9331017
let write_len = wstat_update.bytes_total;
934-
if o.settings.oflags.nocache {
1018+
if output_nocache {
9351019
let offset = write_offset.try_into().unwrap();
9361020
let len = write_len.try_into().unwrap();
9371021
o.discard_cache(offset, len);
@@ -951,34 +1035,33 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
9511035
prog_tx.send(prog_update).unwrap_or(());
9521036
}
9531037
}
954-
finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread)
1038+
finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate)
9551039
}
9561040

9571041
/// Flush output, print final stats, and join with the progress thread.
9581042
fn finalize<T>(
959-
output: &mut Output,
1043+
mut output: BlockWriter,
9601044
rstat: ReadStat,
9611045
wstat: WriteStat,
9621046
start: Instant,
9631047
prog_tx: &mpsc::Sender<ProgUpdate>,
9641048
output_thread: thread::JoinHandle<T>,
1049+
truncate: bool,
9651050
) -> std::io::Result<()> {
966-
// Flush the output, if configured to do so.
1051+
// Flush the output in case a partial write has been buffered but
1052+
// not yet written.
1053+
let wstat_update = output.flush()?;
1054+
1055+
// Sync the output, if configured to do so.
9671056
output.sync()?;
9681057

9691058
// Truncate the file to the final cursor location.
970-
//
971-
// Calling `set_len()` may result in an error (for example,
972-
// when calling it on `/dev/null`), but we don't want to
973-
// terminate the process when that happens. Instead, we
974-
// suppress the error by calling `Result::ok()`. This matches
975-
// the behavior of GNU `dd` when given the command-line
976-
// argument `of=/dev/null`.
977-
if !output.settings.oconv.notrunc {
978-
output.dst.truncate().ok();
1059+
if truncate {
1060+
output.truncate();
9791061
}
9801062

9811063
// Print the final read/write statistics.
1064+
let wstat = wstat + wstat_update;
9821065
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
9831066
prog_tx.send(prog_update).unwrap_or(());
9841067
// Wait for the output thread to finish

tests/by-util/test_dd.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// For the full copyright and license information, please view the LICENSE
44
// file that was distributed with this source code.
5-
// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg
5+
// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg fifoname
66

77
#[cfg(unix)]
88
use crate::common::util::run_ucmd_as_root_with_stdin_stdout;
@@ -15,6 +15,8 @@ use regex::Regex;
1515
use std::fs::{File, OpenOptions};
1616
use std::io::{BufReader, Read, Write};
1717
use std::path::PathBuf;
18+
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
19+
use std::process::{Command, Stdio};
1820
#[cfg(not(windows))]
1921
use std::thread::sleep;
2022
#[cfg(not(windows))]
@@ -1582,3 +1584,77 @@ fn test_seek_past_dev() {
15821584
print!("TEST SKIPPED");
15831585
}
15841586
}
1587+
1588+
#[test]
1589+
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
1590+
fn test_reading_partial_blocks_from_fifo() {
1591+
// Create the FIFO.
1592+
let ts = TestScenario::new(util_name!());
1593+
let at = ts.fixtures.clone();
1594+
at.mkfifo("fifo");
1595+
let fifoname = at.plus_as_string("fifo");
1596+
1597+
// Start a `dd` process that reads from the fifo (so it will wait
1598+
// until the writer process starts).
1599+
let mut reader_command = Command::new(TESTS_BINARY);
1600+
let child = reader_command
1601+
.args(["dd", "ibs=3", "obs=3", &format!("if={}", fifoname)])
1602+
.stdout(Stdio::piped())
1603+
.stderr(Stdio::piped())
1604+
.spawn()
1605+
.unwrap();
1606+
1607+
// Start different processes to write to the FIFO, with a small
1608+
// pause in between.
1609+
let mut writer_command = Command::new("sh");
1610+
writer_command
1611+
.args([
1612+
"-c",
1613+
&format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname),
1614+
])
1615+
.spawn()
1616+
.unwrap();
1617+
1618+
let output = child.wait_with_output().unwrap();
1619+
assert_eq!(output.stdout, b"abcd");
1620+
let expected = b"0+2 records in\n1+1 records out\n4 bytes copied";
1621+
assert!(output.stderr.starts_with(expected));
1622+
}
1623+
1624+
#[test]
1625+
#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))]
1626+
fn test_reading_partial_blocks_from_fifo_unbuffered() {
1627+
// Create the FIFO.
1628+
let ts = TestScenario::new(util_name!());
1629+
let at = ts.fixtures.clone();
1630+
at.mkfifo("fifo");
1631+
let fifoname = at.plus_as_string("fifo");
1632+
1633+
// Start a `dd` process that reads from the fifo (so it will wait
1634+
// until the writer process starts).
1635+
//
1636+
// `bs=N` takes precedence over `ibs=N` and `obs=N`.
1637+
let mut reader_command = Command::new(TESTS_BINARY);
1638+
let child = reader_command
1639+
.args(["dd", "bs=3", "ibs=1", "obs=1", &format!("if={}", fifoname)])
1640+
.stdout(Stdio::piped())
1641+
.stderr(Stdio::piped())
1642+
.spawn()
1643+
.unwrap();
1644+
1645+
// Start different processes to write to the FIFO, with a small
1646+
// pause in between.
1647+
let mut writer_command = Command::new("sh");
1648+
writer_command
1649+
.args([
1650+
"-c",
1651+
&format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname),
1652+
])
1653+
.spawn()
1654+
.unwrap();
1655+
1656+
let output = child.wait_with_output().unwrap();
1657+
assert_eq!(output.stdout, b"abcd");
1658+
let expected = b"0+2 records in\n0+2 records out\n4 bytes copied";
1659+
assert!(output.stderr.starts_with(expected));
1660+
}

0 commit comments

Comments
 (0)