diff --git a/Cargo.lock b/Cargo.lock index eabcf14a84ca727623983621240ff11966700f7f..9fea71108121bde744bc5a0aadec57de20031ff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,7 @@ dependencies = [ "nix", "pretty-hex", "rpassword", + "signal-hook-tokio", "simplelog", "snafu", "tokio", @@ -1224,6 +1225,16 @@ dependencies = [ "digest 0.10.3", ] +[[package]] +name = "signal-hook" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1233,6 +1244,17 @@ dependencies = [ "libc", ] +[[package]] +name = "signal-hook-tokio" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e" +dependencies = [ + "libc", + "signal-hook", + "tokio", +] + [[package]] name = "signature" version = "1.5.0" diff --git a/async/Cargo.toml b/async/Cargo.toml index eb07034dfd65766a7a2797f7330c55394f936430..cbe0cdd7221c3ae692b9a758396af89f8a1a9608 100644 --- a/async/Cargo.toml +++ b/async/Cargo.toml @@ -29,6 +29,8 @@ moro = "0.4" libc = "0.2" nix = "0.24" +signal-hook-tokio = "0.3" + heapless = "0.7.10" # TODO diff --git a/async/examples/con1.rs b/async/examples/con1.rs index 3e97b7ab66333f9fe4b07ac70b921ee33064006e..b4dba8e13c9825fc887684cd9f69babec7bb9fe5 100644 --- a/async/examples/con1.rs +++ b/async/examples/con1.rs @@ -77,7 +77,7 @@ fn main() -> Result<()> { // tries to mutate shared state with setenv. // https://github.com/rust-lang/rust/issues/90308 etc // logging uses the timezone, so we can't use async main. - setup_log(&args); + setup_log(&args)?; tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/async/src/async_channel.rs b/async/src/async_channel.rs new file mode 100644 index 0000000000000000000000000000000000000000..952ec94f799f56be3bebb0fa2837b3b89373c3fc --- /dev/null +++ b/async/src/async_channel.rs @@ -0,0 +1,181 @@ +#[allow(unused_imports)] +use log::{debug, error, info, log, trace, warn}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::ops::DerefMut; + +use std::io::Error as IoError; +use std::io::ErrorKind; +use std::collections::HashMap; + +use futures::lock::{Mutex, OwnedMutexLockFuture, OwnedMutexGuard}; + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use crate::*; +use async_door::{Inner, poll_lock}; + +pub struct ChanInOut<'a> { + chan: u32, + door: AsyncDoor<'a>, + + rlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, + wlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, +} + +pub struct ChanExtIn<'a> { + chan: u32, + ext: u32, + door: AsyncDoor<'a>, + + rlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, +} + +pub struct ChanExtOut<'a> { + chan: u32, + ext: u32, + door: AsyncDoor<'a>, + + wlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, +} + +impl<'a> ChanInOut<'a> { + pub(crate) fn new(chan: u32, door: &AsyncDoor<'a>) -> Self { + Self { + chan, door: door.private_clone(), + rlfut: None, wlfut: None, + } + } +} + +impl Clone for ChanInOut<'_> { + fn clone(&self) -> Self { + Self { + chan: self.chan, door: self.door.private_clone(), + rlfut: None, wlfut: None, + } + } +} + +impl<'a> ChanExtIn<'a> { + pub(crate) fn new(chan: u32, ext: u32, door: &AsyncDoor<'a>) -> Self { + Self { + chan, ext, door: door.private_clone(), + rlfut: None, + } + } +} + +impl<'a> AsyncRead for ChanInOut<'a> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf, + ) -> Poll<Result<(), IoError>> { + let this = self.deref_mut(); + chan_poll_read(&mut this.door, this.chan, None, cx, buf, &mut this.rlfut) + } +} + +impl<'a> AsyncRead for ChanExtIn<'a> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf, + ) -> Poll<Result<(), IoError>> { + let this = self.deref_mut(); + chan_poll_read(&mut this.door, this.chan, Some(this.ext), cx, buf, &mut this.rlfut) + } +} + +// Common for `ChanInOut` and `ChanExtIn` +fn chan_poll_read<'a>( + door: &mut AsyncDoor<'a>, + chan: u32, + ext: Option<u32>, + cx: &mut Context, + buf: &mut ReadBuf, + lock_fut: &mut Option<OwnedMutexLockFuture<Inner<'a>>>, +) -> Poll<Result<(), IoError>> { + trace!("chan read"); + + let mut p = poll_lock(door.inner.clone(), cx, lock_fut); + let inner = match p { + Poll::Ready(ref mut i) => i, + Poll::Pending => { + return Poll::Pending + } + }; + + let runner = &mut inner.runner; + + let b = buf.initialize_unfilled(); + let r = runner.channel_input(chan, ext, b) + .map_err(|e| IoError::new(std::io::ErrorKind::Other, e)); + + match r { + // poll_read() returns 0 on EOF, if the channel isn't eof yet + // we want to return pending + Ok(0) if !runner.channel_eof(chan) => { + let w = cx.waker().clone(); + inner.chan_read_wakers.insert((chan, ext), w); + Poll::Pending + } + Ok(sz) => { + buf.advance(sz); + Poll::Ready(Ok(())) + } + Err(e) => Poll::Ready(Err(e)), + } +} + +impl<'a> AsyncWrite for ChanInOut<'a> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, IoError>> { + let this = self.deref_mut(); + chan_poll_write(&mut this.door, this.chan, None, cx, buf, &mut this.wlfut) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + // perhaps common between InOut and ExtOut? + todo!("channel poll_shutdown") + } +} + +fn chan_poll_write<'a>( + door: &mut AsyncDoor<'a>, + chan: u32, + ext: Option<u32>, + cx: &mut Context<'_>, + buf: &[u8], + lock_fut: &mut Option<OwnedMutexLockFuture<Inner<'a>>>, +) -> Poll<Result<usize, IoError>> { + trace!("chan write"); + + let mut p = poll_lock(door.inner.clone(), cx, lock_fut); + let inner = match p { + Poll::Ready(ref mut i) => i, + Poll::Pending => return Poll::Pending, + }; + let runner = &mut inner.runner; + + match runner.channel_send(chan, ext, buf) { + Ok(Some(l)) if l == 0 => { + inner.chan_write_wakers.insert((chan, ext), cx.waker().clone()); + Poll::Pending + } + Ok(Some(l)) => Poll::Ready(Ok(l)), + // return 0 for EOF + Ok(None) => Poll::Ready(Ok(0)), + Err(e) => Poll::Ready(Err(IoError::new(ErrorKind::Other, e))), + } +} diff --git a/async/src/async_door.rs b/async/src/async_door.rs index 4507053d2a73bddb91f3a3e0b6ae07ce7148bab0..fbb996ce528cf1ebcd4d66f3cf0f090c81d4cdf3 100644 --- a/async/src/async_door.rs +++ b/async/src/async_door.rs @@ -24,19 +24,19 @@ use door::{Behaviour, AsyncCliBehaviour, Runner, Result, Event, ChanEvent}; use pretty_hex::PrettyHex; -pub struct Inner<'a> { - runner: Runner<'a>, +pub(crate) struct Inner<'a> { + pub runner: Runner<'a>, // TODO: perhaps behaviour can move to runner? unsure of lifetimes. behaviour: Behaviour<'a>, - chan_read_wakers: HashMap<(u32, Option<u32>), Waker>, - chan_write_wakers: HashMap<(u32, Option<u32>), Waker>, + pub(crate) chan_read_wakers: HashMap<(u32, Option<u32>), Waker>, + pub(crate) chan_write_wakers: HashMap<(u32, Option<u32>), Waker>, } pub struct AsyncDoor<'a> { // Not contended much since the Runner is inherently single threaded anyway, // using a single buffer for input/output. - inner: Arc<Mutex<Inner<'a>>>, + pub(crate) inner: Arc<Mutex<Inner<'a>>>, progress_notify: Arc<TokioNotify>, @@ -52,7 +52,7 @@ impl<'a> AsyncDoor<'a> { Self { inner, progress_notify } } - fn private_clone(&self) -> Self { + pub(crate) fn private_clone(&self) -> Self { Self { inner: self.inner.clone(), progress_notify: self.progress_notify.clone(), } @@ -136,7 +136,7 @@ impl<'a> AsyncDoor<'a> { /// Tries to lock Inner for a poll_read()/poll_write(). /// lock_fut from the caller holds the future so that it can /// be woken later if the lock was contended -fn poll_lock<'a>(inner: Arc<Mutex<Inner<'a>>>, cx: &mut Context<'_>, +pub(crate) fn poll_lock<'a>(inner: Arc<Mutex<Inner<'a>>>, cx: &mut Context<'_>, lock_fut: &mut Option<OwnedMutexLockFuture<Inner<'a>>>) -> Poll<OwnedMutexGuard<Inner<'a>>> { let mut g = inner.lock_owned(); @@ -259,165 +259,3 @@ impl<'a> AsyncWrite for AsyncDoorSocket<'a> { } } -pub struct ChanInOut<'a> { - chan: u32, - door: AsyncDoor<'a>, - - rlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, - wlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, -} - -pub struct ChanExtIn<'a> { - chan: u32, - ext: u32, - door: AsyncDoor<'a>, - - rlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, -} - -pub struct ChanExtOut<'a> { - chan: u32, - ext: u32, - door: AsyncDoor<'a>, - - wlfut: Option<OwnedMutexLockFuture<Inner<'a>>>, -} - -impl<'a> ChanInOut<'a> { - pub(crate) fn new(chan: u32, door: &AsyncDoor<'a>) -> Self { - Self { - chan, door: door.private_clone(), - rlfut: None, wlfut: None, - } - } -} - -impl Clone for ChanInOut<'_> { - fn clone(&self) -> Self { - Self { - chan: self.chan, door: self.door.private_clone(), - rlfut: None, wlfut: None, - } - } -} - -impl<'a> ChanExtIn<'a> { - pub(crate) fn new(chan: u32, ext: u32, door: &AsyncDoor<'a>) -> Self { - Self { - chan, ext, door: door.private_clone(), - rlfut: None, - } - } -} - -impl<'a> AsyncRead for ChanInOut<'a> { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf, - ) -> Poll<Result<(), IoError>> { - let this = self.deref_mut(); - chan_poll_read(&mut this.door, this.chan, None, cx, buf, &mut this.rlfut) - } -} - -impl<'a> AsyncRead for ChanExtIn<'a> { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf, - ) -> Poll<Result<(), IoError>> { - let this = self.deref_mut(); - chan_poll_read(&mut this.door, this.chan, Some(this.ext), cx, buf, &mut this.rlfut) - } -} - -// Common for `ChanInOut` and `ChanExtIn` -fn chan_poll_read<'a>( - door: &mut AsyncDoor<'a>, - chan: u32, - ext: Option<u32>, - cx: &mut Context, - buf: &mut ReadBuf, - lock_fut: &mut Option<OwnedMutexLockFuture<Inner<'a>>>, -) -> Poll<Result<(), IoError>> { - trace!("chan read"); - - let mut p = poll_lock(door.inner.clone(), cx, lock_fut); - let inner = match p { - Poll::Ready(ref mut i) => i, - Poll::Pending => { - return Poll::Pending - } - }; - - let runner = &mut inner.runner; - - let b = buf.initialize_unfilled(); - let r = runner.channel_input(chan, ext, b) - .map_err(|e| IoError::new(std::io::ErrorKind::Other, e)); - - match r { - // poll_read() returns 0 on EOF, if the channel isn't eof yet - // we want to return pending - Ok(0) if !runner.channel_eof(chan) => { - let w = cx.waker().clone(); - inner.chan_read_wakers.insert((chan, ext), w); - Poll::Pending - } - Ok(sz) => { - buf.advance(sz); - Poll::Ready(Ok(())) - } - Err(e) => Poll::Ready(Err(e)), - } -} - -impl<'a> AsyncWrite for ChanInOut<'a> { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize, IoError>> { - let this = self.deref_mut(); - chan_poll_write(&mut this.door, this.chan, None, cx, buf, &mut this.wlfut) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { - // perhaps common between InOut and ExtOut? - todo!("channel poll_shutdown") - } -} - -fn chan_poll_write<'a>( - door: &mut AsyncDoor<'a>, - chan: u32, - ext: Option<u32>, - cx: &mut Context<'_>, - buf: &[u8], - lock_fut: &mut Option<OwnedMutexLockFuture<Inner<'a>>>, -) -> Poll<Result<usize, IoError>> { - trace!("chan write"); - - let mut p = poll_lock(door.inner.clone(), cx, lock_fut); - let inner = match p { - Poll::Ready(ref mut i) => i, - Poll::Pending => return Poll::Pending, - }; - let runner = &mut inner.runner; - - match runner.channel_send(chan, ext, buf) { - Ok(Some(l)) if l == 0 => { - inner.chan_write_wakers.insert((chan, ext), cx.waker().clone()); - Poll::Pending - } - Ok(Some(l)) => Poll::Ready(Ok(l)), - // return 0 for EOF - Ok(None) => Poll::Ready(Ok(0)), - Err(e) => Poll::Ready(Err(IoError::new(ErrorKind::Other, e))), - } -} diff --git a/async/src/client.rs b/async/src/client.rs index acc2f8c94bbecde0500a120ede8986d4a2ee01cf..ef5a565907151cd0c35a29a0d3690c4f94d0cfa4 100644 --- a/async/src/client.rs +++ b/async/src/client.rs @@ -18,6 +18,7 @@ use nix::fcntl::{fcntl, FcntlArg, OFlag}; use crate::*; use crate::async_door::*; +use crate::async_channel::*; use door_sshproto as door; use door::{Behaviour, AsyncCliBehaviour, Runner, Result}; diff --git a/async/src/lib.rs b/async/src/lib.rs index 8c167c6258a62035fc39a803050f287a3765dfea..0bd2e20fb9794c352f4543262eb00b4f3b2def27 100644 --- a/async/src/lib.rs +++ b/async/src/lib.rs @@ -2,6 +2,7 @@ mod client; mod async_door; +mod async_channel; mod cmdline_client; mod pty;