mirror of
https://github.com/quantum5/ntfy-run.git
synced 2025-04-24 13:41:58 -04:00
Byte-perfect stdout/stderr forwarding
Encoding errors are replaced with � when sent to the ntfy server.
This commit is contained in:
parent
b1e4ac2674
commit
765f3e6bd3
|
@ -4,6 +4,7 @@ use runner::CapturedOutput;
|
||||||
|
|
||||||
mod quote;
|
mod quote;
|
||||||
mod runner;
|
mod runner;
|
||||||
|
mod tap_stream;
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
/// Tool to run a command, capture its output, and send it to ntfy.
|
/// Tool to run a command, capture its output, and send it to ntfy.
|
||||||
|
@ -113,13 +114,13 @@ fn format_post_body(output: CapturedOutput) -> String {
|
||||||
|
|
||||||
if !output.stdout.is_empty() {
|
if !output.stdout.is_empty() {
|
||||||
fragments.push("==================== STDOUT ====================".to_string());
|
fragments.push("==================== STDOUT ====================".to_string());
|
||||||
fragments.push(output.stdout);
|
fragments.push(String::from_utf8_lossy(&output.stdout).into_owned());
|
||||||
fragments.push("\n".to_string());
|
fragments.push("\n".to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
if !output.stderr.is_empty() {
|
if !output.stderr.is_empty() {
|
||||||
fragments.push("==================== STDERR ====================".to_string());
|
fragments.push("==================== STDERR ====================".to_string());
|
||||||
fragments.push(output.stderr);
|
fragments.push(String::from_utf8_lossy(&output.stderr).into_owned());
|
||||||
fragments.push("\n".to_string());
|
fragments.push("\n".to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,19 +1,19 @@
|
||||||
|
use crate::tap_stream::{ReadOrWrite, TapStream};
|
||||||
use std::process::{ExitStatus, Stdio};
|
use std::process::{ExitStatus, Stdio};
|
||||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tokio::select;
|
use tokio::{io, select};
|
||||||
|
|
||||||
pub enum CaptureError {
|
pub enum CaptureError {
|
||||||
Spawn(std::io::Error),
|
Spawn(io::Error),
|
||||||
Stdout(std::io::Error),
|
Stdout(io::Error),
|
||||||
Stderr(std::io::Error),
|
Stderr(io::Error),
|
||||||
Wait(std::io::Error),
|
Wait(io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CapturedOutput {
|
pub struct CapturedOutput {
|
||||||
pub status: Option<ExitStatus>,
|
pub status: Option<ExitStatus>,
|
||||||
pub stdout: String,
|
pub stdout: Vec<u8>,
|
||||||
pub stderr: String,
|
pub stderr: Vec<u8>,
|
||||||
pub errors: Vec<CaptureError>,
|
pub errors: Vec<CaptureError>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,40 +38,50 @@ pub async fn run_forward_and_capture(cmdline: &Vec<String>) -> CapturedOutput {
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
return CapturedOutput {
|
return CapturedOutput {
|
||||||
status: None,
|
status: None,
|
||||||
stdout: "".to_string(),
|
stdout: vec![],
|
||||||
stderr: "".to_string(),
|
stderr: vec![],
|
||||||
errors: vec![CaptureError::Spawn(error)],
|
errors: vec![CaptureError::Spawn(error)],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut stdout_stream = BufReader::new(child.stdout.take().unwrap()).lines();
|
let mut stdout_tap = TapStream::new(child.stdout.take().unwrap(), io::stdout());
|
||||||
let mut stderr_stream = BufReader::new(child.stderr.take().unwrap()).lines();
|
let mut stderr_tap = TapStream::new(child.stderr.take().unwrap(), io::stderr());
|
||||||
|
|
||||||
let mut stdout_buffer = Vec::new();
|
let mut stdout = vec![];
|
||||||
let mut stderr_buffer = Vec::new();
|
let mut stderr = vec![];
|
||||||
let mut errors = Vec::new();
|
let mut errors = Vec::new();
|
||||||
|
|
||||||
|
let mut stdout_eof = false;
|
||||||
|
let mut stderr_eof = false;
|
||||||
|
let mut maybe_status: Option<ExitStatus> = None;
|
||||||
|
|
||||||
let status = loop {
|
let status = loop {
|
||||||
select! {
|
select! {
|
||||||
line = stdout_stream.next_line() => match line {
|
result = stdout_tap.step(), if !stdout_eof => match result {
|
||||||
Ok(Some(line)) => {
|
Ok(ReadOrWrite::Read(bytes)) => stdout.extend_from_slice(bytes),
|
||||||
println!("{}", line);
|
Ok(ReadOrWrite::Written) => (),
|
||||||
stdout_buffer.push(line);
|
Ok(ReadOrWrite::EOF) => match (stderr_eof, maybe_status) {
|
||||||
|
(true, Some(status)) => break status,
|
||||||
|
_ => stdout_eof = true,
|
||||||
},
|
},
|
||||||
Ok(None) => (),
|
|
||||||
Err(error) => errors.push(CaptureError::Stdout(error)),
|
Err(error) => errors.push(CaptureError::Stdout(error)),
|
||||||
},
|
},
|
||||||
line = stderr_stream.next_line() => match line {
|
result = stderr_tap.step(), if !stderr_eof => match result {
|
||||||
Ok(Some(line)) => {
|
Ok(ReadOrWrite::Read(bytes)) => stderr.extend_from_slice(bytes),
|
||||||
eprintln!("{}", line);
|
Ok(ReadOrWrite::Written) => (),
|
||||||
stderr_buffer.push(line);
|
Ok(ReadOrWrite::EOF) => match (stdout_eof, maybe_status) {
|
||||||
|
(true, Some(status)) => break status,
|
||||||
|
_ => stderr_eof = true,
|
||||||
},
|
},
|
||||||
Ok(None) => (),
|
|
||||||
Err(error) => errors.push(CaptureError::Stderr(error)),
|
Err(error) => errors.push(CaptureError::Stderr(error)),
|
||||||
},
|
},
|
||||||
status = child.wait() => match status {
|
status = child.wait(), if maybe_status.is_none() => match status {
|
||||||
Ok(status) => break status,
|
Ok(status) => if stdout_eof && stderr_eof {
|
||||||
|
break status;
|
||||||
|
} else {
|
||||||
|
maybe_status = Some(status);
|
||||||
|
},
|
||||||
Err(error) => errors.push(CaptureError::Wait(error)),
|
Err(error) => errors.push(CaptureError::Wait(error)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,8 +89,8 @@ pub async fn run_forward_and_capture(cmdline: &Vec<String>) -> CapturedOutput {
|
||||||
|
|
||||||
CapturedOutput {
|
CapturedOutput {
|
||||||
status: Some(status),
|
status: Some(status),
|
||||||
stdout: stdout_buffer.join("\n").to_string(),
|
stdout,
|
||||||
stderr: stderr_buffer.join("\n").to_string(),
|
stderr,
|
||||||
errors,
|
errors,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
52
src/tap_stream.rs
Normal file
52
src/tap_stream.rs
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
use std::io;
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
|
const BUF_SIZE: usize = 16384;
|
||||||
|
|
||||||
|
pub struct TapStream<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
|
||||||
|
source: R,
|
||||||
|
target: W,
|
||||||
|
buffer: [u8; BUF_SIZE],
|
||||||
|
buf_start: usize,
|
||||||
|
buf_end: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ReadOrWrite<'a> {
|
||||||
|
Read(&'a [u8]),
|
||||||
|
Written,
|
||||||
|
EOF,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> TapStream<R, W> {
|
||||||
|
pub fn new(source: R, target: W) -> TapStream<R, W> {
|
||||||
|
TapStream {
|
||||||
|
source,
|
||||||
|
target,
|
||||||
|
buffer: [0; BUF_SIZE],
|
||||||
|
buf_start: 0,
|
||||||
|
buf_end: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn step(&mut self) -> io::Result<ReadOrWrite> {
|
||||||
|
if self.buf_start == self.buf_end {
|
||||||
|
let bytes = self.source.read(&mut self.buffer[..]).await?;
|
||||||
|
self.buf_start = 0;
|
||||||
|
self.buf_end = bytes;
|
||||||
|
|
||||||
|
Ok(if bytes == 0 {
|
||||||
|
ReadOrWrite::EOF
|
||||||
|
} else {
|
||||||
|
ReadOrWrite::Read(&self.buffer[0..bytes])
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
let bytes = self
|
||||||
|
.target
|
||||||
|
.write(&self.buffer[self.buf_start..self.buf_end])
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.buf_start += bytes;
|
||||||
|
Ok(ReadOrWrite::Written)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue