Compare commits

...

10 commits

Author SHA1 Message Date
Quantum 2109cd45aa Document cargo install in README.md 2024-10-15 02:32:20 -04:00
Quantum a5e9f7e6fe Fix license tag 2024-10-15 02:27:11 -04:00
Quantum 4ef2eab3db Ignore .github in published crate 2024-10-15 02:22:35 -04:00
Quantum 6faafdd7ae Bump version to 0.1.2 and add cargo metadata 2024-10-15 02:18:34 -04:00
Quantum 58f0ba9952 Clean up use statements 2024-10-15 02:05:39 -04:00
Quantum 7e74fc8888 Forward child process return code if possible 2024-10-15 02:02:39 -04:00
Quantum ffb1c1c8b5 Bump version to 0.1.1 2024-10-14 21:25:22 -04:00
Quantum d59da1aeb9 Shorten section tags and improve formatting 2024-10-14 19:20:09 -04:00
Quantum 5a464aa3b1 Use simpler else clause with tokio::select! 2024-10-14 19:02:29 -04:00
Quantum 765f3e6bd3 Byte-perfect stdout/stderr forwarding
Encoding errors are replaced with � when sent to the ntfy server.
2024-10-14 16:00:14 -04:00
6 changed files with 125 additions and 48 deletions

2
Cargo.lock generated
View file

@ -592,7 +592,7 @@ dependencies = [
[[package]] [[package]]
name = "ntfy-run" name = "ntfy-run"
version = "0.1.0" version = "0.1.2"
dependencies = [ dependencies = [
"clap", "clap",
"itertools", "itertools",

View file

@ -1,7 +1,19 @@
[package] [package]
name = "ntfy-run" name = "ntfy-run"
version = "0.1.0" version = "0.1.2"
edition = "2021" edition = "2021"
authors = ["Quantum <me@quantum5.ca>"]
description = "ntfy-run is a tool to run a command, capture its output, and send it to a ntfy server."
readme = "README.md"
homepage = "https://github.com/quantum5/ntfy-run"
repository = "https://github.com/quantum5/ntfy-run"
license = "GPL-3.0-or-later"
keywords = ["ntfy", "cron", "notifications", "utility"]
categories = ["command-line-interface"]
exclude = [
".github/*"
]
[dependencies] [dependencies]
clap = { version = "4.5.20", features = ["derive", "env"] } clap = { version = "4.5.20", features = ["derive", "env"] }

View file

@ -16,7 +16,13 @@ sudo wget https://github.com/quantum5/ntfy-run/releases/latest/download/ntfy-run
sudo chmod a+x /usr/local/bin/ntfy-run sudo chmod a+x /usr/local/bin/ntfy-run
``` ```
Alternatively, build it yourself: You can also install the latest release with `cargo`:
```bash
cargo install ntfy-run
```
Finally, you can build the latest Git version yourself:
```bash ```bash
git clone https://github.com/quantum5/ntfy-run.git git clone https://github.com/quantum5/ntfy-run.git

View file

@ -1,12 +1,13 @@
use crate::runner::CaptureError; use crate::runner::{CaptureError, CapturedOutput};
use clap::Parser; use clap::Parser;
use runner::CapturedOutput; use std::process::exit;
mod quote; mod quote;
mod runner; mod runner;
mod tap_stream;
#[derive(Parser)] #[derive(Parser)]
/// Tool to run a command, capture its output, and send it to ntfy. #[command(version, about)]
struct Cli { struct Cli {
/// URL of the ntfy server and topic, e.g. https://ntfy.sh/topic /// URL of the ntfy server and topic, e.g. https://ntfy.sh/topic
#[arg(short = 'n', long = "ntfy-url", env = "NTFY_URL", alias = "url")] #[arg(short = 'n', long = "ntfy-url", env = "NTFY_URL", alias = "url")]
@ -99,7 +100,8 @@ fn format_post_body(output: CapturedOutput) -> String {
}]; }];
if !output.errors.is_empty() { if !output.errors.is_empty() {
fragments.push("==================== Errors ====================".to_string()); fragments.push("".to_string());
fragments.push("========== Errors ==========".to_string());
for error in &output.errors { for error in &output.errors {
fragments.push(match error { fragments.push(match error {
CaptureError::Spawn(error) => format!("Spawn error: {}", error), CaptureError::Spawn(error) => format!("Spawn error: {}", error),
@ -108,19 +110,18 @@ fn format_post_body(output: CapturedOutput) -> String {
CaptureError::Wait(error) => format!("Error while waiting for process: {}", error), CaptureError::Wait(error) => format!("Error while waiting for process: {}", error),
}); });
} }
fragments.push("\n".to_string());
} }
if !output.stdout.is_empty() { if !output.stdout.is_empty() {
fragments.push("==================== STDOUT ====================".to_string()); fragments.push("".to_string());
fragments.push(output.stdout); fragments.push("========== STDOUT ==========".to_string());
fragments.push("\n".to_string()); fragments.push(String::from_utf8_lossy(output.stdout.trim_ascii_end()).into_owned());
} }
if !output.stderr.is_empty() { if !output.stderr.is_empty() {
fragments.push("==================== STDERR ====================".to_string()); fragments.push("".to_string());
fragments.push(output.stderr); fragments.push("========== STDERR ==========".to_string());
fragments.push("\n".to_string()); fragments.push(String::from_utf8_lossy(output.stderr.trim_ascii_end()).into_owned());
} }
fragments.join("\n") fragments.join("\n")
@ -209,7 +210,13 @@ async fn main() {
}; };
match request.send().await.and_then(|r| r.error_for_status()) { match request.send().await.and_then(|r| r.error_for_status()) {
Ok(_) => (), Ok(_) => exit(match status {
Err(error) => eprintln!("Failed to send request to ntfy: {}", error), Some(code) => code.code().unwrap_or(255),
None => 255,
}),
Err(error) => {
eprintln!("Failed to send request to ntfy: {}", error);
exit(37)
}
} }
} }

View file

@ -1,19 +1,18 @@
use crate::tap_stream::{ReadOrWrite, TapStream};
use std::process::{ExitStatus, Stdio}; use std::process::{ExitStatus, Stdio};
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::{io, process::Command, select};
use tokio::process::Command;
use tokio::select;
pub enum CaptureError { pub enum CaptureError {
Spawn(std::io::Error), Spawn(io::Error),
Stdout(std::io::Error), Stdout(io::Error),
Stderr(std::io::Error), Stderr(io::Error),
Wait(std::io::Error), Wait(io::Error),
} }
pub struct CapturedOutput { pub struct CapturedOutput {
pub status: Option<ExitStatus>, pub status: Option<ExitStatus>,
pub stdout: String, pub stdout: Vec<u8>,
pub stderr: String, pub stderr: Vec<u8>,
pub errors: Vec<CaptureError>, pub errors: Vec<CaptureError>,
} }
@ -38,49 +37,50 @@ pub async fn run_forward_and_capture(cmdline: &Vec<String>) -> CapturedOutput {
Err(error) => { Err(error) => {
return CapturedOutput { return CapturedOutput {
status: None, status: None,
stdout: "".to_string(), stdout: vec![],
stderr: "".to_string(), stderr: vec![],
errors: vec![CaptureError::Spawn(error)], errors: vec![CaptureError::Spawn(error)],
} }
} }
}; };
let mut stdout_stream = BufReader::new(child.stdout.take().unwrap()).lines(); let mut stdout_tap = TapStream::new(child.stdout.take().unwrap(), io::stdout());
let mut stderr_stream = BufReader::new(child.stderr.take().unwrap()).lines(); let mut stderr_tap = TapStream::new(child.stderr.take().unwrap(), io::stderr());
let mut stdout_buffer = Vec::new(); let mut stdout = vec![];
let mut stderr_buffer = Vec::new(); let mut stderr = vec![];
let mut errors = Vec::new(); let mut errors = Vec::new();
let mut stdout_eof = false;
let mut stderr_eof = false;
let mut maybe_status: Option<ExitStatus> = None;
let status = loop { let status = loop {
select! { select! {
line = stdout_stream.next_line() => match line { result = stdout_tap.step(), if !stdout_eof => match result {
Ok(Some(line)) => { Ok(ReadOrWrite::Read(bytes)) => stdout.extend_from_slice(bytes),
println!("{}", line); Ok(ReadOrWrite::Written) => (),
stdout_buffer.push(line); Ok(ReadOrWrite::EOF) => stdout_eof = true,
},
Ok(None) => (),
Err(error) => errors.push(CaptureError::Stdout(error)), Err(error) => errors.push(CaptureError::Stdout(error)),
}, },
line = stderr_stream.next_line() => match line { result = stderr_tap.step(), if !stderr_eof => match result {
Ok(Some(line)) => { Ok(ReadOrWrite::Read(bytes)) => stderr.extend_from_slice(bytes),
eprintln!("{}", line); Ok(ReadOrWrite::Written) => (),
stderr_buffer.push(line); Ok(ReadOrWrite::EOF) => stderr_eof = true,
},
Ok(None) => (),
Err(error) => errors.push(CaptureError::Stderr(error)), Err(error) => errors.push(CaptureError::Stderr(error)),
}, },
status = child.wait() => match status { status = child.wait(), if maybe_status.is_none() => match status {
Ok(status) => break status, Ok(status) => maybe_status = Some(status),
Err(error) => errors.push(CaptureError::Wait(error)), Err(error) => errors.push(CaptureError::Wait(error)),
} },
else => break maybe_status.unwrap(),
} }
}; };
CapturedOutput { CapturedOutput {
status: Some(status), status: Some(status),
stdout: stdout_buffer.join("\n").to_string(), stdout,
stderr: stderr_buffer.join("\n").to_string(), stderr,
errors, errors,
} }
} }

52
src/tap_stream.rs Normal file
View file

@ -0,0 +1,52 @@
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
const BUF_SIZE: usize = 16384;
pub struct TapStream<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> {
source: R,
target: W,
buffer: [u8; BUF_SIZE],
buf_start: usize,
buf_end: usize,
}
pub enum ReadOrWrite<'a> {
Read(&'a [u8]),
Written,
EOF,
}
impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> TapStream<R, W> {
pub fn new(source: R, target: W) -> TapStream<R, W> {
TapStream {
source,
target,
buffer: [0; BUF_SIZE],
buf_start: 0,
buf_end: 0,
}
}
pub async fn step(&mut self) -> io::Result<ReadOrWrite> {
if self.buf_start == self.buf_end {
let bytes = self.source.read(&mut self.buffer[..]).await?;
self.buf_start = 0;
self.buf_end = bytes;
if bytes == 0 {
Ok(ReadOrWrite::EOF)
} else {
Ok(ReadOrWrite::Read(&self.buffer[0..bytes]))
}
} else {
let bytes = self
.target
.write(&self.buffer[self.buf_start..self.buf_end])
.await?;
self.buf_start += bytes;
Ok(ReadOrWrite::Written)
}
}
}