diff --git a/src/main.rs b/src/main.rs index 157f24f..8c51429 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use runner::CapturedOutput; mod quote; mod runner; +mod tap_stream; #[derive(Parser)] /// Tool to run a command, capture its output, and send it to ntfy. @@ -113,13 +114,13 @@ fn format_post_body(output: CapturedOutput) -> String { if !output.stdout.is_empty() { fragments.push("==================== STDOUT ====================".to_string()); - fragments.push(output.stdout); + fragments.push(String::from_utf8_lossy(&output.stdout).into_owned()); fragments.push("\n".to_string()); } if !output.stderr.is_empty() { fragments.push("==================== STDERR ====================".to_string()); - fragments.push(output.stderr); + fragments.push(String::from_utf8_lossy(&output.stderr).into_owned()); fragments.push("\n".to_string()); } diff --git a/src/runner.rs b/src/runner.rs index 2f87ff6..33b4dd3 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,19 +1,19 @@ +use crate::tap_stream::{ReadOrWrite, TapStream}; use std::process::{ExitStatus, Stdio}; -use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; -use tokio::select; +use tokio::{io, select}; pub enum CaptureError { - Spawn(std::io::Error), - Stdout(std::io::Error), - Stderr(std::io::Error), - Wait(std::io::Error), + Spawn(io::Error), + Stdout(io::Error), + Stderr(io::Error), + Wait(io::Error), } pub struct CapturedOutput { pub status: Option, - pub stdout: String, - pub stderr: String, + pub stdout: Vec, + pub stderr: Vec, pub errors: Vec, } @@ -38,40 +38,50 @@ pub async fn run_forward_and_capture(cmdline: &Vec) -> CapturedOutput { Err(error) => { return CapturedOutput { status: None, - stdout: "".to_string(), - stderr: "".to_string(), + stdout: vec![], + stderr: vec![], errors: vec![CaptureError::Spawn(error)], } } }; - let mut stdout_stream = BufReader::new(child.stdout.take().unwrap()).lines(); - let mut stderr_stream = BufReader::new(child.stderr.take().unwrap()).lines(); + let mut stdout_tap = TapStream::new(child.stdout.take().unwrap(), io::stdout()); + let mut stderr_tap = TapStream::new(child.stderr.take().unwrap(), io::stderr()); - let mut stdout_buffer = Vec::new(); - let mut stderr_buffer = Vec::new(); + let mut stdout = vec![]; + let mut stderr = vec![]; let mut errors = Vec::new(); + let mut stdout_eof = false; + let mut stderr_eof = false; + let mut maybe_status: Option = None; + let status = loop { select! { - line = stdout_stream.next_line() => match line { - Ok(Some(line)) => { - println!("{}", line); - stdout_buffer.push(line); + result = stdout_tap.step(), if !stdout_eof => match result { + Ok(ReadOrWrite::Read(bytes)) => stdout.extend_from_slice(bytes), + Ok(ReadOrWrite::Written) => (), + Ok(ReadOrWrite::EOF) => match (stderr_eof, maybe_status) { + (true, Some(status)) => break status, + _ => stdout_eof = true, }, - Ok(None) => (), Err(error) => errors.push(CaptureError::Stdout(error)), }, - line = stderr_stream.next_line() => match line { - Ok(Some(line)) => { - eprintln!("{}", line); - stderr_buffer.push(line); + result = stderr_tap.step(), if !stderr_eof => match result { + Ok(ReadOrWrite::Read(bytes)) => stderr.extend_from_slice(bytes), + Ok(ReadOrWrite::Written) => (), + Ok(ReadOrWrite::EOF) => match (stdout_eof, maybe_status) { + (true, Some(status)) => break status, + _ => stderr_eof = true, }, - Ok(None) => (), Err(error) => errors.push(CaptureError::Stderr(error)), }, - status = child.wait() => match status { - Ok(status) => break status, + status = child.wait(), if maybe_status.is_none() => match status { + Ok(status) => if stdout_eof && stderr_eof { + break status; + } else { + maybe_status = Some(status); + }, Err(error) => errors.push(CaptureError::Wait(error)), } } @@ -79,8 +89,8 @@ pub async fn run_forward_and_capture(cmdline: &Vec) -> CapturedOutput { CapturedOutput { status: Some(status), - stdout: stdout_buffer.join("\n").to_string(), - stderr: stderr_buffer.join("\n").to_string(), + stdout, + stderr, errors, } } diff --git a/src/tap_stream.rs b/src/tap_stream.rs new file mode 100644 index 0000000..20bbbfb --- /dev/null +++ b/src/tap_stream.rs @@ -0,0 +1,52 @@ +use std::io; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +const BUF_SIZE: usize = 16384; + +pub struct TapStream { + source: R, + target: W, + buffer: [u8; BUF_SIZE], + buf_start: usize, + buf_end: usize, +} + +pub enum ReadOrWrite<'a> { + Read(&'a [u8]), + Written, + EOF, +} + +impl TapStream { + pub fn new(source: R, target: W) -> TapStream { + TapStream { + source, + target, + buffer: [0; BUF_SIZE], + buf_start: 0, + buf_end: 0, + } + } + + pub async fn step(&mut self) -> io::Result { + if self.buf_start == self.buf_end { + let bytes = self.source.read(&mut self.buffer[..]).await?; + self.buf_start = 0; + self.buf_end = bytes; + + Ok(if bytes == 0 { + ReadOrWrite::EOF + } else { + ReadOrWrite::Read(&self.buffer[0..bytes]) + }) + } else { + let bytes = self + .target + .write(&self.buffer[self.buf_start..self.buf_end]) + .await?; + + self.buf_start += bytes; + Ok(ReadOrWrite::Written) + } + } +}