From 43476aadac73325ca3a3186d2b077bcd44ad9ed2 Mon Sep 17 00:00:00 2001 From: Matt Johnston <matt@ucc.asn.au> Date: Sat, 27 Aug 2022 13:31:35 +0800 Subject: [PATCH] working again, no shell atm --- async/examples/con1.rs | 83 +++++++++++++++++++---------------------- async/src/async_door.rs | 62 ++++++++++++------------------ async/src/client.rs | 9 ++--- sshproto/src/conn.rs | 1 + sshproto/src/encrypt.rs | 38 ++++++++++++++++++- sshproto/src/runner.rs | 30 ++++++++++----- sshproto/src/traffic.rs | 1 + 7 files changed, 124 insertions(+), 100 deletions(-) diff --git a/async/examples/con1.rs b/async/examples/con1.rs index 020635f..673e673 100644 --- a/async/examples/con1.rs +++ b/async/examples/con1.rs @@ -162,51 +162,44 @@ async fn run(args: &Args) -> Result<()> { scope.spawn(async { loop { - let ev = cli.progress(&mut app, |ev| { - trace!("progress event {ev:?}"); - let e = match ev { - Event::CliAuthed => Some(Event::CliAuthed), - _ => None, - }; - Ok(e) - }).await.context("progress loop")?; - - match ev { - Some(Event::CliAuthed) => { - let mut raw_pty_guard = None; - info!("Opening a new session channel"); - let (mut io, mut errpair) = if wantpty { - raw_pty_guard = Some(raw_pty()?); - let io = cli.open_session_pty(cmd.as_deref()).await - .context("Opening session")?; - (io, None) - } else { - let (io, err) = cli.open_session_nopty(cmd.as_deref()).await - .context("Opening session")?; - let errpair = (err, door_async::stderr()?); - (io, Some(errpair)) - }; - - let mut i = door_async::stdin()?; - let mut o = door_async::stdout()?; - let mut io2 = io.clone(); - scope.spawn(async move { - moro::async_scope!(|scope| { - scope.spawn(tokio::io::copy(&mut io, &mut o)); - scope.spawn(tokio::io::copy(&mut i, &mut io2)); - if let Some(ref mut ep) = errpair { - let (err, e) = ep; - scope.spawn(tokio::io::copy(err, e)); - } - }).await; - drop(raw_pty_guard); - Ok::<_, anyhow::Error>(()) - }); - // TODO: handle channel completion or open failure - } - Some(_) => unreachable!(), - None => {}, - } + cli.progress(&mut app).await.context("progress loop")?; + + // match ev { + // Some(Event::CliAuthed) => { + // let mut raw_pty_guard = None; + // info!("Opening a new session channel"); + // let (mut io, mut errpair) = if wantpty { + // raw_pty_guard = Some(raw_pty()?); + // let io = cli.open_session_pty(cmd.as_deref()).await + // .context("Opening session")?; + // (io, None) + // } else { + // let (io, err) = cli.open_session_nopty(cmd.as_deref()).await + // .context("Opening session")?; + // let errpair = (err, door_async::stderr()?); + // (io, Some(errpair)) + // }; + + // let mut i = door_async::stdin()?; + // let mut o = door_async::stdout()?; + // let mut io2 = io.clone(); + // scope.spawn(async move { + // moro::async_scope!(|scope| { + // scope.spawn(tokio::io::copy(&mut io, &mut o)); + // scope.spawn(tokio::io::copy(&mut i, &mut io2)); + // if let Some(ref mut ep) = errpair { + // let (err, e) = ep; + // scope.spawn(tokio::io::copy(err, e)); + // } + // }).await; + // drop(raw_pty_guard); + // Ok::<_, anyhow::Error>(()) + // }); + // // TODO: handle channel completion or open failure + // } + // Some(_) => unreachable!(), + // None => {}, + // } } #[allow(unreachable_code)] Ok::<_, anyhow::Error>(()) diff --git a/async/src/async_door.rs b/async/src/async_door.rs index c3bc33f..4306c51 100644 --- a/async/src/async_door.rs +++ b/async/src/async_door.rs @@ -23,6 +23,7 @@ use door::{Runner, Result, Event, ChanEvent, Behaviour}; use pretty_hex::PrettyHex; +#[derive(Debug)] pub(crate) struct Inner<'a> { pub runner: Runner<'a>, @@ -62,43 +63,30 @@ impl<'a> AsyncDoor<'a> { /// The `f` closure should return `Some` if the result should be returned /// from `progress()`, or `None` to not do that. /// XXX better docs, perhaps it won't take a closure anyway - pub async fn progress<F, R>(&mut self, - b: &mut Behaviour<'_>, - f: F) - -> Result<Option<R>> - where F: FnOnce(door::Event) -> Result<Option<R>> { + pub async fn progress(&mut self, + b: &mut Behaviour<'_>) + -> Result<()> { trace!("progress"); let mut wakers = Vec::new(); - let res = { + + // scoped lock + { let mut inner = self.inner.lock().await; + trace!("locked progress"); let inner = inner.deref_mut(); - let ev = inner.runner.progress(b).await?; - let r = if let Some(ev) = ev { - let r = match ev { - Event::Channel(ChanEvent::Eof { num }) => { - // TODO - Ok(None) - }, - _ => f(ev), - }; - trace!("async prog done payload"); - r - } else { - Ok(None) - }; - inner.runner.done_payload()?; + inner.runner.progress(b).await?; if let Some(ce) = inner.runner.ready_channel_input() { inner.chan_read_wakers.remove(&ce) .map(|w| wakers.push(w)); } - // Pending https://github.com/rust-lang/rust/issues/59618 - // HashMap::drain_filter + // Pending HashMap::drain_filter + // https://github.com/rust-lang/rust/issues/59618 // TODO: untested. // TODO: fairness? Also it's not clear whether progress notify // will always get woken by runner.wake() to update this... - inner.chan_write_wakers.retain(|(ch, ext), w| { + inner.chan_write_wakers.retain(|(ch, _ext), w| { match inner.runner.ready_channel_send(*ch) { Some(n) if n > 0 => { wakers.push(w.clone()); @@ -107,25 +95,21 @@ impl<'a> AsyncDoor<'a> { _ => true } }); - - r - }; - // lock is dropped before waker or notify + } for w in wakers { trace!("woken {w:?}"); w.wake() } - // TODO: currently this is only woken by incoming data, should it - // also wake internally from runner or conn? It runs once at start - // to kick off the outgoing handshake at least. - if let Ok(None) = res { - trace!("progress wait"); - self.progress_notify.notified().await; - trace!("progress awaited"); - } - res + // // TODO: currently this is only woken by incoming data, should it + // // also wake internally from runner or conn? It runs once at start + // // to kick off the outgoing handshake at least. + trace!("progress wait"); + self.progress_notify.notified().await; + trace!("progress awaited"); + + Ok(()) } pub async fn with_runner<F, R>(&mut self, f: F) -> R @@ -147,6 +131,7 @@ pub(crate) fn poll_lock<'a>(inner: Arc<Mutex<Inner<'a>>>, cx: &mut Context<'_>, Poll::Ready(_) => None, Poll::Pending => Some(g), }; + trace!("poll_lock returned {:?}", p); p } @@ -207,7 +192,7 @@ impl<'a> AsyncWrite for AsyncDoorSocket<'a> { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, IoError>> { - trace!("poll_write"); + trace!("poll_write {}", buf.len()); let mut p = poll_lock(self.door.inner.clone(), cx, &mut self.write_lock_fut); @@ -228,6 +213,7 @@ impl<'a> AsyncWrite for AsyncDoorSocket<'a> { .map_err(|e| IoError::new(std::io::ErrorKind::Other, e)); Poll::Ready(r) } else { + trace!("not ready"); runner.set_input_waker(cx.waker().clone()); Poll::Pending }; diff --git a/async/src/client.rs b/async/src/client.rs index ad308f0..29ab881 100644 --- a/async/src/client.rs +++ b/async/src/client.rs @@ -46,14 +46,11 @@ impl<'a> SSHClient<'a> { /// Takes a closure to run on the "output" of the progress call. /// (This output can't be returned directly since it refers /// to contents of `Self` and would hit lifetime issues). - pub async fn progress<F, R>(&mut self, - b: &mut (dyn CliBehaviour+Send), - f: F) - -> Result<Option<R>> - where F: FnOnce(door::Event) -> Result<Option<R>> { + pub async fn progress(&mut self, + b: &mut (dyn CliBehaviour+Send)) -> Result<()> { let mut b = Behaviour::new_client(b); - self.door.progress(&mut b, f).await + self.door.progress(&mut b).await } // TODO: return a Channel object that gives events like WinChange or exit status diff --git a/sshproto/src/conn.rs b/sshproto/src/conn.rs index 13f89e2..b11396f 100644 --- a/sshproto/src/conn.rs +++ b/sshproto/src/conn.rs @@ -161,6 +161,7 @@ impl<'a> Conn<'a> { } pub(crate) fn initial_sent(&self) -> bool { + trace!("initial_sent state {:?}", self.state); match self.state { | ConnState::SendIdent | ConnState::SendFirstKexInit diff --git a/sshproto/src/encrypt.rs b/sshproto/src/encrypt.rs index 0ef9c06..f6ea2af 100644 --- a/sshproto/src/encrypt.rs +++ b/sshproto/src/encrypt.rs @@ -6,10 +6,11 @@ use { log::{debug, error, info, log, trace, warn}, }; -use aes::cipher::{BlockSizeUser, KeyIvInit, KeySizeUser, StreamCipher}; +use aes::{cipher::{BlockSizeUser, KeyIvInit, KeySizeUser, StreamCipher}, Aes256}; use core::num::Wrapping; use pretty_hex::PrettyHex; use core::fmt; +use core::fmt::Debug; use hmac::{Hmac, Mac}; use sha2::Digest as Sha2DigestForTrait; @@ -39,6 +40,7 @@ const MAX_KEY_LEN: usize = 64; /// Stateful [`Keys`], stores a sequence number as well, a single instance /// is kept for the entire session. +#[derive(Debug)] pub(crate) struct KeyState { keys: Keys, // Packet sequence numbers. These must be transferred to subsequent KeyState @@ -126,6 +128,7 @@ impl KeyState { } } +#[derive(Debug)] pub(crate) struct Keys { pub(crate) enc: EncKey, pub(crate) dec: DecKey, @@ -527,6 +530,17 @@ pub(crate) enum EncKey { NoCipher, } +impl Debug for EncKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let n = match self { + Self::ChaPoly(_) => "ChaPoly", + Self::Aes256Ctr(_) => "Aes256Ctr", + Self::NoCipher => "NoCipher", + }; + f.write_fmt(format_args!("EncKey::{n}")) + } +} + // TODO: could probably unify EncKey and DecKey as "CipherKey". // Ring had sealing/opening keys which are separate, but RustCrypto // uses the same structs in both directions. @@ -569,6 +583,17 @@ pub(crate) enum DecKey { NoCipher, } +impl Debug for DecKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let n = match self { + Self::ChaPoly(_) => "ChaPoly", + Self::Aes256Ctr(_) => "Aes256Ctr", + Self::NoCipher => "NoCipher", + }; + f.write_fmt(format_args!("DecKey::{n}")) + } +} + impl DecKey { /// Construct a key pub fn from_cipher<'a>( @@ -642,6 +667,17 @@ pub(crate) enum IntegKey { NoInteg, } +impl Debug for IntegKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let n = match self { + Self::ChaPoly => "ChaPoly", + Self::HmacSha256(_) => "HmacSha256", + Self::NoInteg => "NoInteg", + }; + f.write_fmt(format_args!("IntegKey::{n}")) + } +} + impl IntegKey { pub fn from_integ<'a>(integ: &Integ, key: &'a [u8]) -> Result<Self, Error> { match integ { diff --git a/sshproto/src/runner.rs b/sshproto/src/runner.rs index bbfc41d..a1954d2 100644 --- a/sshproto/src/runner.rs +++ b/sshproto/src/runner.rs @@ -30,6 +30,18 @@ pub struct Runner<'a> { input_waker: Option<Waker>, } +impl core::fmt::Debug for Runner<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Runner") + .field("keys", &self.keys) + .field("output_waker", &self.output_waker) + .field("input_waker", &self.input_waker) + .finish_non_exhaustive() + } +} + + + impl<'a> Runner<'a> { /// `inbuf` must be sized to fit the largest SSH packet allowed. pub fn new_client( @@ -90,26 +102,24 @@ impl<'a> Runner<'a> { /// event to the application. /// [`done_payload()`] must be called after any `Ok` result. pub async fn progress(&mut self, behaviour: &mut Behaviour<'_>) -> Result<()> { + let mut s = self.traf_out.sender(&mut self.keys); + // Handle incoming packets if let Some((payload, seq)) = self.traf_in.payload() { - // Lifetimes here are a bit subtle. - // `payload` has self.traffic lifetime, used until `handle_payload` - // completes. - // The `resp` from handle_payload() references self.conn, consumed - // by the send_packet(). - // After that progress() can perform more send_packet() itself. - - let mut s = self.traf_out.sender(&mut self.keys); let d = self.conn.handle_payload(payload, seq, &mut s, behaviour).await?; if let Some(d) = d.0 { + // incoming channel data, we haven't finished with payload + trace!("handle_payload chan input"); self.traf_in.handled_payload()?; self.traf_in.set_channel_input(d)?; } else { + // other packets have been completed + trace!("handle_payload done"); self.traf_in.done_payload()?; - self.conn.progress(&mut s, behaviour).await?; - self.wake(); } } + self.conn.progress(&mut s, behaviour).await?; + self.wake(); Ok(()) } diff --git a/sshproto/src/traffic.rs b/sshproto/src/traffic.rs index a404235..0fae0f5 100644 --- a/sshproto/src/traffic.rs +++ b/sshproto/src/traffic.rs @@ -97,6 +97,7 @@ impl<'a> TrafIn<'a> { } pub fn ready_input(&self) -> bool { + trace!("ready_input state {:?}", self.state); match self.state { RxState::Idle | RxState::ReadInitial { .. } -- GitLab