diff --git a/Cargo.lock b/Cargo.lock index 91dab633c262b7a6572cd207764683c5b98c66bc..c19f3a0e401a39365c7254024a6101b80a00b687 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -344,7 +344,6 @@ dependencies = [ "hmac", "log", "no-panic", - "pin-utils", "poly1305", "pretty-hex 0.3.0", "proptest", diff --git a/smol/examples/con1.rs b/smol/examples/con1.rs index 59d10ddea192ff2028831f74a22dad54569317da..54dc556ad6fac37621203e91baf0feaefd21c5fb 100644 --- a/smol/examples/con1.rs +++ b/smol/examples/con1.rs @@ -105,7 +105,7 @@ fn setup_log(args: &Args) { // not debugging these bits of the stack at present // .add_filter_ignore_str("door_sshproto::traffic") // .add_filter_ignore_str("door_sshproto::runner") - .add_filter_ignore_str("door_smol::async_door") + // .add_filter_ignore_str("door_smol::async_door") .set_time_offset_to_local().expect("Couldn't get local timezone") .build(); @@ -166,8 +166,22 @@ async fn run(args: &Args) -> Result<()> { e = &mut netio => break e.map(|_| ()).context("net loop"), ev = door.progress(|ev| { trace!("progress event {ev:?}"); - Ok(()) - }) => {} + let e = match ev { + Event::Authenticated => Some(Event::Authenticated), + _ => None, + }; + Ok(e) + }) => { + let ev = ev?; + match ev { + Some(Event::Authenticated) => { + info!("auth auth") + + } + Some(_) => unreachable!(), + None => {}, + } + } // q = door.next_request() => { // handle_request(&door, q).await // } diff --git a/smol/src/async_door.rs b/smol/src/async_door.rs index b57b19a1db59e9e780a09261a5f4a862958fcb8d..7db796ed6fab052bc77eb745ea046133263433d5 100644 --- a/smol/src/async_door.rs +++ b/smol/src/async_door.rs @@ -54,24 +54,40 @@ impl<'a> AsyncDoor<'a> { } pub async fn progress<F, R>(&mut self, f: F) - -> Result<Option<R>> where F: FnOnce(door::Event) -> Result<R> { + -> Result<Option<R>> where F: FnOnce(door::Event) -> Result<Option<R>> { { - self.progress_notify.notified().await; + info!("progress top"); let res = { let mut inner = self.inner.lock().await; + info!("progress locked"); let inner = inner.deref_mut(); let ev = inner.runner.progress(&mut inner.behaviour).await.context("progess")?; + info!("progress ev {ev:?}"); if let Some(ev) = ev { - f(ev).map(|r| Some(r)) + let r = f(ev); + inner.runner.done_payload()?; + r } else { Ok(None) } }; - // self.read_waker.take().map(|w| w.wake()); - // self.write_waker.take().map(|w| w.wake()); + self.read_waker.take().map(|w| w.wake()); + self.write_waker.take().map(|w| w.wake()); + // TODO: currently this is only woken by incoming data, should it + // also wake internally from runner or conn? It runs once at start + // to kick off the outgoing handshake at least. + if let Ok(None) = res { + self.progress_notify.notified().await; + } res } } + + pub async fn with_runner<F, R>(&mut self, f: F) -> R + where F: FnOnce(&mut Runner) -> R { + let mut inner = self.inner.lock().await; + f(&mut inner.runner) + } } impl<'a> AsyncRead for AsyncDoor<'a> { @@ -140,7 +156,7 @@ impl<'a> AsyncWrite for AsyncDoor<'a> { }; drop(inner); self.progress_notify.notify_one(); - // self.read_waker.take().map(|w| w.wake()); + self.read_waker.take().map(|w| w.wake()); r } diff --git a/sshproto/Cargo.toml b/sshproto/Cargo.toml index a81a39b18cf2eeb3cd70825384bfb86dbf407469..af94f5d6c38e7ff0dc2c113678844f45c97bbf60 100644 --- a/sshproto/Cargo.toml +++ b/sshproto/Cargo.toml @@ -36,12 +36,8 @@ ssh-key = { version = "0.4", default-features = false, features = ["ed25519", "e # for debug printing pretty-hex = { version = "0.3", default-features = false } -pin-utils = "0.1" - -# tokio = { version = "1.18", features = ["sync"], optional = true } async-trait = { version = "0.1", optional = true } - [features] default = [ "getrandom" ] std = ["async-trait", "snafu/std"] diff --git a/sshproto/src/async_behaviour.rs b/sshproto/src/async_behaviour.rs index 729f07748581db7f27fe7b282cadf9847623c651..7fe1a31c09653fc3c7fd369bc35a99528d062e07 100644 --- a/sshproto/src/async_behaviour.rs +++ b/sshproto/src/async_behaviour.rs @@ -100,6 +100,11 @@ pub trait AsyncCliBehaviour { /// by the server so could be hazardous, they should be escaped with /// [`banner.escape_default()`](core::str::escape_default) or similar. /// Language may be empty, is provided by the server. + + /// This is a `Behaviour` method rather than an [`Event`] because + /// it must be displayed prior to other authentication + /// functions. `Events` may be handled asynchronously so wouldn't + /// guarantee that. #[allow(unused)] async fn show_banner(&self, banner: &str, language: &str) { info!("Got banner:\n{:?}", banner.escape_default()); diff --git a/sshproto/src/conn.rs b/sshproto/src/conn.rs index 0ff407e7a1a27212997fb21a2d46993f1dd3b11d..f981ce4c70b288217574afd1a5254f0c2691f4f6 100644 --- a/sshproto/src/conn.rs +++ b/sshproto/src/conn.rs @@ -18,7 +18,7 @@ use encrypt::KeyState; use packets::{Packet,ParseContext}; use server::Server; use traffic::{Traffic,PacketMaker}; -use channel::{Channel, Channels}; +use channel::{Channel, Channels, ChanEvent, ChanEventMaker}; use config::MAX_CHANNELS; use kex::SessId; @@ -94,11 +94,13 @@ enum ConnState { // Application API #[derive(Debug)] pub enum Event<'a> { - Channel(channel::ChanEvent<'a>), + Channel(ChanEvent<'a>), + Authenticated, } pub(crate) enum EventMaker { - Channel(channel::ChanEventMaker), + Channel(ChanEventMaker), + Authenticated, } impl<'a> Conn<'a> { @@ -190,7 +192,7 @@ impl<'a> Conn<'a> { // to run first? trace!("Incoming {packet:#?}"); let mut resp = RespPackets::new(); - let mut ev = None; + let mut event = None; match packet { Packet::KexInit(_) => { if matches!(self.state, ConnState::InKex { .. }) { @@ -306,14 +308,7 @@ impl<'a> Conn<'a> { if matches!(self.state, ConnState::PreAuth) { self.state = ConnState::Authed; cli.auth_success(&mut resp, &mut self.parse_ctx, &mut b.client()?).await?; - // if h.open_session { - // let (chan, p) = self.channels.open( - // packets::ChannelOpenType::Session)?; - // resp.push(p).trap()?; - // if h.pty { - // todo!(); - // } - // } + event = Some(EventMaker::Authenticated); } else { debug!("Received UserauthSuccess unrequested") } @@ -354,29 +349,30 @@ impl<'a> Conn<'a> { // TODO: maybe needs a conn or cliserv argument. => { let chev = self.channels.dispatch(packet, &mut resp, b).await?; - ev = chev.map(|c| EventMaker::Channel(c)) + event = chev.map(|c| EventMaker::Channel(c)) } }; - if let Some(ev) = ev { - if resp.is_empty() { - Ok(Dispatched::Event(ev)) - } else { - Err(Error::bug()) - } - } else { - Ok(Dispatched::Resp(resp)) - } + Ok(Dispatched { resp, event }) } - pub(crate) fn make_event<'p>(&mut self, payload: &'p [u8], ev: EventMaker) + /// creates an `Event` that borrows data from the payload. Some `Event` variants don't + /// require payload data, the payload is not required in that case. + /// Those variants are allowed to return `resp` packets from `dispatch()` + pub(crate) fn make_event<'p>(&mut self, payload: Option<&'p [u8]>, ev: EventMaker) -> Result<Option<Event<'p>>> { - let p = sshwire::packet_from_bytes(payload, &self.parse_ctx)?; - match ev { + let p = payload.map(|pl| sshwire::packet_from_bytes(pl, &self.parse_ctx)).transpose()?; + let r = match ev { + EventMaker::Channel(ChanEventMaker::DataIn(_)) => { + // no event returned, handled specially by caller + None + } EventMaker::Channel(cev) => { - let c = cev.make(p); - Ok(c.map(|c| Event::Channel(c))) + let c = cev.make(p.trap()?); + c.map(|c| Event::Channel(c)) } - } + EventMaker::Authenticated => Some(Event::Authenticated), + }; + Ok(r) } } @@ -386,7 +382,20 @@ impl<'a> Conn<'a> { // pub event: Option<Event<'e>>, // } -pub(crate) enum Dispatched<'r> { - Resp(RespPackets<'r>), - Event(EventMaker), +pub(crate) struct Dispatched<'r> { + pub resp: RespPackets<'r>, + pub event: Option<EventMaker>, } + +#[cfg(test)] +mod tests { + use crate::doorlog::*; + use crate::conn::*; + use crate::error::Error; + + // #[test] + // fn event_variants() { + // // TODO sanity check event variants. + // } +} + diff --git a/sshproto/src/runner.rs b/sshproto/src/runner.rs index 692fe12a25eeeaa8f50337fb2f1ad45010b7626b..c7825d366c47dbb4a18a686c29ecef263fda43d0 100644 --- a/sshproto/src/runner.rs +++ b/sshproto/src/runner.rs @@ -53,7 +53,7 @@ impl<'a> Runner<'a> { buf, )?; // payload will be handled when progress() is called - if self.traffic.payload().is_some() { + if self.traffic.payload(false).is_some() { trace!("payload some, waker {:?}", self.output_waker); if let Some(w) = self.output_waker.take() { trace!("woke"); @@ -68,29 +68,27 @@ impl<'a> Runner<'a> { /// Optionally returns `Event` which provides channel or session // event to the application. pub async fn progress<'f>(&'f mut self, b: &mut Behaviour<'_>) -> Result<Option<Event<'f>>, Error> { - let em = if let Some(payload) = self.traffic.payload() { + trace!("prog"); + let em = if let Some(payload) = self.traffic.payload(false) { // Lifetimes here are a bit subtle. // `payload` has self.traffic lifetime, used until `handle_payload` // completes. - // The `resp` from handle_payload() references self.conn, consume + // The `resp` from handle_payload() references self.conn, consumed // by the send_packet(). // After that progress() can perform more send_packet() itself. - let r = self.conn.handle_payload(payload, &mut self.keys, b).await?; - match r { - Dispatched::Resp(resp) => { - debug!("done_payload"); - self.traffic.done_payload()?; - for r in resp { - r.send_packet(&mut self.traffic, &mut self.keys)?; - } + let d = self.conn.handle_payload(payload, &mut self.keys, b).await?; + self.traffic.handled_payload()?; - None - } - Dispatched::Event(em) => { - Some(em) - } + if !d.resp.is_empty() || d.event.is_none() { + // switch to using the buffer for output. + self.traffic.done_payload()?; + } + for r in d.resp { + r.send_packet(&mut self.traffic, &mut self.keys)?; } + + d.event } else { None }; @@ -105,22 +103,29 @@ impl<'a> Runner<'a> { match em { EventMaker::Channel(ChanEventMaker::DataIn(di)) => { self.traffic.set_channel_input(di)?; + self.traffic.done_payload()?; None } _ => { - let payload = self.traffic.payload().trap()?; + // Some(payload) is only required for some variants in make_event() + trace!("event "); + let payload = self.traffic.payload(true); self.conn.make_event(payload, em)? } } - } else { self.conn.progress(&mut self.traffic, &mut self.keys, b).await?; None }; + trace!("prog event {ev:?}"); Ok(ev) } + pub fn done_payload(&mut self) -> Result<()> { + self.traffic.done_payload() + } + /// Write any pending output to the wire, returning the size written pub fn output(&mut self, buf: &mut [u8]) -> Result<usize, Error> { let r = self.traffic.output(buf); diff --git a/sshproto/src/traffic.rs b/sshproto/src/traffic.rs index 3a245235c268745ea38931fa9aff50c670c8f8bf..2db447b3742c62c762e960d4399a4c636b8950b7 100644 --- a/sshproto/src/traffic.rs +++ b/sshproto/src/traffic.rs @@ -42,6 +42,9 @@ enum TrafState { ReadComplete { len: usize }, /// Decrypted complete input payload InPayload { len: usize }, + /// Decrypted complete input payload. It has been dispatched by handle_payload(), + /// remains "borrowed" for use by a progress() Event. + BorrowPayload { len: usize }, /// Decrypted incoming channel data InChannelData { /// channel number @@ -104,6 +107,7 @@ impl<'a> Traffic<'a> { | TrafState::Read { .. } => true, TrafState::ReadComplete { .. } | TrafState::InPayload { .. } + | TrafState::BorrowPayload { .. } | TrafState::InChannelData { .. } | TrafState::Write { .. } => false, } @@ -144,20 +148,47 @@ impl<'a> Traffic<'a> { Ok(inlen) } - /// Returns a reference to the decrypted payload buffer if ready - pub(crate) fn payload(&mut self) -> Option<&[u8]> { - trace!("traf payload {:?}", self.state); - if let TrafState::InPayload { len } = self.state { - let payload = &self.buf[SSH_PAYLOAD_START..SSH_PAYLOAD_START + len]; - Some(payload) - } else { - None + /// Returns a reference to the decrypted payload buffer if ready. + /// For a given payload should be called once initially to pass to handle_payload(), + /// with borrow=false. Subsequent calls will only return the payload if borrow=false, + /// used for borrowing the payload for Event. + pub(crate) fn payload(&mut self, borrow: bool) -> Option<&[u8]> { + trace!("traf payload {:?} borrow {borrow}", self.state); + let p = match self.state { + | TrafState::InPayload { len, .. } + => { + let payload = &self.buf[SSH_PAYLOAD_START..SSH_PAYLOAD_START + len]; + Some(payload) + } + | TrafState::BorrowPayload { len, .. } if borrow + => { + let payload = &self.buf[SSH_PAYLOAD_START..SSH_PAYLOAD_START + len]; + Some(payload) + } + _ => None, + }; + trace!("traf 2 {:?}", self.state); + p + } + + pub(crate) fn handled_payload(&mut self) -> Result<(), Error> { + match self.state { + | TrafState::InPayload { len } + | TrafState::BorrowPayload { len } + => { + self.state = TrafState::BorrowPayload { len }; + Ok(()) + } + _ => Err(Error::bug()) } } pub(crate) fn done_payload(&mut self) -> Result<(), Error> { match self.state { - TrafState::InPayload { .. } => { + | TrafState::InPayload { .. } + | TrafState::BorrowPayload { .. } + | TrafState::Idle // TODO, is this wise? + => { self.state = TrafState::Idle; Ok(()) }