diff --git a/async/examples/con1.rs b/async/examples/con1.rs index 0b42db6416189e0bc408f61154de2ac491ed72ac..cacb500c0c0af6894b87a84ef13352c353fd4ecb 100644 --- a/async/examples/con1.rs +++ b/async/examples/con1.rs @@ -4,7 +4,6 @@ use { log::{debug, error, info, log, trace, warn}, }; use anyhow::{Context, Result, Error, bail}; -// use snafu::ResultExt; use pretty_hex::PrettyHex; use tokio::net::TcpStream; @@ -134,10 +133,9 @@ async fn run(args: &Args) -> Result<()> { // TODO: better lifetime rather than leaking let work = Box::leak(Box::new(work)); - let mut sess = door_async::SimpleClient::new(args.username.as_ref().unwrap()); + let mut sess = door_async::CmdlineClient::new(args.username.as_ref().unwrap()); for i in &args.identityfile { - sess.add_authkey(read_key(&i) - .with_context(|| format!("loading key {i}"))?); + sess.add_authkey(read_key(&i).with_context(|| format!("loading key {i}"))?); } let mut door = SSHClient::new(work.as_mut_slice(), Box::new(sess))?; @@ -159,16 +157,14 @@ async fn run(args: &Args) -> Result<()> { match ev { Some(Event::Authenticated) => { - info!("auth auth"); + info!("Opening a new session channel"); let r = door.open_client_session_nopty(Some("cowsay it works")).await .context("Opening session")?; let (mut io, mut err) = r; tokio::spawn(async move { - trace!("channel copy"); let mut i = door_async::stdin()?; - // let mut o = tokio::io::stdout(); let mut o = door_async::stdout()?; - let mut e = tokio::io::stderr(); + let mut e = door_async::stderr()?; let mut io2 = io.clone(); let co = tokio::io::copy(&mut io, &mut o); let ci = tokio::io::copy(&mut i, &mut io2); @@ -199,27 +195,4 @@ async fn run(args: &Args) -> Result<()> { } } } - - // trace!("before loop"); - // loop { - // trace!("top loop"); - // future::race( - // netwrite.race(netread).map(|_| ()), - // stream::repeat(()) - // .then(|_| door.next_request()) - // .then(|q| async { - // handle_request(&door, q).await; - // () - // }) - // ).await; - - // // let r = futures::select! { - // // q = door.next_request().fuse() => { - // // handle_request(&door, q).await - // // } - // // _ = netwrite => break, - // // _ = netread => break, - // // }; - // // trace!("result {r:?}"); - // } } diff --git a/async/src/async_door.rs b/async/src/async_door.rs index cee550b3c6a5714997a2002bf5f5923aee80f61c..67c8ce4111634d2b6d444dc335ffcda7596ce4c8 100644 --- a/async/src/async_door.rs +++ b/async/src/async_door.rs @@ -19,7 +19,7 @@ use core::ops::DerefMut; use std::sync::Arc; use door_sshproto as door; -use door::{Behaviour, AsyncCliBehaviour, Runner, Conn, Result}; +use door::{Behaviour, AsyncCliBehaviour, Runner, Result, Event, ChanEvent}; // use door_sshproto::client::*; use pretty_hex::PrettyHex; @@ -52,6 +52,12 @@ impl<'a> AsyncDoor<'a> { Self { inner, progress_notify } } + fn private_clone(&self) -> Self { + Self { inner: self.inner.clone(), + progress_notify: self.progress_notify.clone(), + } + } + pub fn socket(&self) -> AsyncDoorSocket<'a> { AsyncDoorSocket::new(self) } @@ -66,7 +72,12 @@ impl<'a> AsyncDoor<'a> { let inner = inner.deref_mut(); let ev = inner.runner.progress(&mut inner.behaviour).await?; let r = if let Some(ev) = ev { - let r = f(ev); + let r = match ev { + Event::Channel(ChanEvent::Eof { num }) => { + Ok(None) + }, + _ => f(ev), + }; inner.runner.done_payload()?; r } else { @@ -119,15 +130,6 @@ impl<'a> AsyncDoor<'a> { } } -impl Clone for AsyncDoor<'_> { - fn clone(&self) -> Self { - Self { inner: self.inner.clone(), - progress_notify: self.progress_notify.clone(), - } - } -} - - /// Tries to lock Inner for a poll_read()/poll_write(). /// lock_fut from the caller holds the future so that it can /// be woken later if the lock was contended @@ -152,7 +154,7 @@ pub struct AsyncDoorSocket<'a> { impl<'a> AsyncDoorSocket<'a> { fn new(door: &AsyncDoor<'a>) -> Self { - AsyncDoorSocket { door: door.clone(), + AsyncDoorSocket { door: door.private_clone(), read_lock_fut: None, write_lock_fut: None } } } @@ -175,14 +177,13 @@ impl<'a> AsyncRead for AsyncDoorSocket<'a> { } }; - runner.set_output_waker(cx.waker().clone()); let b = buf.initialize_unfilled(); let r = runner.output(b).map_err(|e| IoError::new(ErrorKind::Other, e)); match r { - // poll_read() returning 0 means EOF, we don't want that Ok(0) => { trace!("set output waker"); + runner.set_output_waker(cx.waker().clone()); Poll::Pending } Ok(sz) => { @@ -213,7 +214,6 @@ impl<'a> AsyncWrite for AsyncDoorSocket<'a> { } }; - runner.set_input_waker(cx.waker().clone()); // TODO: should runner just have poll_write/poll_read? // TODO: is ready_input necessary? .input() should return size=0 // if nothing is consumed. Or .input() could return a Poll<Result<usize>> @@ -223,6 +223,7 @@ impl<'a> AsyncWrite for AsyncDoorSocket<'a> { .map_err(|e| IoError::new(std::io::ErrorKind::Other, e)); Poll::Ready(r) } else { + runner.set_input_waker(cx.waker().clone()); Poll::Pending }; @@ -280,7 +281,7 @@ pub struct ChanExtOut<'a> { impl<'a> ChanInOut<'a> { pub(crate) fn new(chan: u32, door: &AsyncDoor<'a>) -> Self { Self { - chan, door: door.clone(), + chan, door: door.private_clone(), rlfut: None, wlfut: None, } } @@ -289,7 +290,7 @@ impl<'a> ChanInOut<'a> { impl Clone for ChanInOut<'_> { fn clone(&self) -> Self { Self { - chan: self.chan, door: self.door.clone(), + chan: self.chan, door: self.door.private_clone(), rlfut: None, wlfut: None, } } @@ -298,7 +299,7 @@ impl Clone for ChanInOut<'_> { impl<'a> ChanExtIn<'a> { pub(crate) fn new(chan: u32, ext: u32, door: &AsyncDoor<'a>) -> Self { Self { - chan, ext, door: door.clone(), + chan, ext, door: door.private_clone(), rlfut: None, } } @@ -351,8 +352,7 @@ fn chan_poll_read<'a>( .map_err(|e| IoError::new(std::io::ErrorKind::Other, e)); match r { - // sz=0 means EOF, we don't want that - Ok(0) => { + Ok(0) if !runner.channel_eof(chan) => { let w = cx.waker().clone(); inner.chan_read_wakers.insert((chan, ext), w); Poll::Pending diff --git a/async/src/client.rs b/async/src/client.rs index 45f57c596c41f544f61859cf1787fd5286413764..247dd355dc7dd984ded31def10374e88daacee88 100644 --- a/async/src/client.rs +++ b/async/src/client.rs @@ -20,7 +20,7 @@ use crate::*; use crate::async_door::*; use door_sshproto as door; -use door::{Behaviour, AsyncCliBehaviour, Runner, Conn, Result}; +use door::{Behaviour, AsyncCliBehaviour, Runner, Result}; use door::sshnames::SSH_EXTENDED_DATA_STDERR; pub struct SSHClient<'a> { @@ -29,8 +29,7 @@ pub struct SSHClient<'a> { impl<'a> SSHClient<'a> { pub fn new(buf: &'a mut [u8], behaviour: Box<dyn AsyncCliBehaviour+Send>) -> Result<Self> { - let conn = Conn::new_client()?; - let runner = Runner::new(conn, buf)?; + let runner = Runner::new_client(buf)?; let b = Behaviour::new_async_client(behaviour); let door = AsyncDoor::new(runner, b); Ok(Self { diff --git a/async/src/fdio.rs b/async/src/fdio.rs index 00f05515c99f5b691566565de0db53b4f6f378ce..0fb9aefbbc37a8522be85a0ca98c539900eb35c5 100644 --- a/async/src/fdio.rs +++ b/async/src/fdio.rs @@ -20,51 +20,29 @@ fn dup_async(orig_fd: libc::c_int) -> Result<AsyncFd<RawFd>, IoError> { AsyncFd::new(fd) } -pub struct Stdin { +pub struct InFd { f: AsyncFd<RawFd>, } -pub struct Stdout { +pub struct OutFd { f: AsyncFd<RawFd>, } -pub struct Stderr { - f: AsyncFd<RawFd>, -} - -pub fn stdin() -> Result<Stdin, IoError> { - Ok(Stdin { +pub fn stdin() -> Result<InFd, IoError> { + Ok(InFd { f: dup_async(libc::STDIN_FILENO)?, }) } -pub fn stdout() -> Result<Stdout, IoError> { - Ok(Stdout { +pub fn stdout() -> Result<OutFd, IoError> { + Ok(OutFd { f: dup_async(libc::STDOUT_FILENO)?, }) } -pub fn stderr() -> Result<Stderr, IoError> { - Ok(Stderr { +pub fn stderr() -> Result<OutFd, IoError> { + Ok(OutFd { f: dup_async(libc::STDERR_FILENO)?, }) } -impl AsRef<AsyncFd<RawFd>> for Stdin { - fn as_ref(&self) -> &AsyncFd<RawFd> { - &self.f - } -} - -impl AsRef<AsyncFd<RawFd>> for Stdout { - fn as_ref(&self) -> &AsyncFd<RawFd> { - &self.f - } -} - -impl AsRef<AsyncFd<RawFd>> for Stderr { - fn as_ref(&self) -> &AsyncFd<RawFd> { - &self.f - } -} - -impl AsyncRead for Stdin { +impl AsyncRead for InFd { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -97,7 +75,7 @@ impl AsyncRead for Stdin { } } -impl AsyncWrite for Stdout { +impl AsyncWrite for OutFd { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/async/src/lib.rs b/async/src/lib.rs index 8773434c2a445c705d0297d477dc07c8c91e5a0c..ee0f84b24eced466b753b69ee4806789f3b86791 100644 --- a/async/src/lib.rs +++ b/async/src/lib.rs @@ -3,11 +3,11 @@ mod client; mod async_door; -mod simple_client; +mod cmdline_client; pub use async_door::AsyncDoor; pub use client::SSHClient; -pub use simple_client::SimpleClient; +pub use cmdline_client::CmdlineClient; #[cfg(unix)] mod fdio; diff --git a/async/src/simple_client.rs b/async/src/simple_client.rs deleted file mode 100644 index 0c6b0a762f846ccffbfe883ec04004ef458e6ce6..0000000000000000000000000000000000000000 --- a/async/src/simple_client.rs +++ /dev/null @@ -1,150 +0,0 @@ -#[allow(unused_imports)] -use log::{debug, error, info, log, trace, warn}; - -use core::str::FromStr; - -use door::SignKey; -use door_sshproto as door; -use door_sshproto::{BhError, BhResult}; -use door_sshproto::{ChanMsg, ChanMsgDetails, Error, RespPackets, Result, Runner}; - -use std::collections::VecDeque; - -use async_trait::async_trait; - -pub struct SimpleClient { - auth_done: bool, - main_ch: Option<u32>, - authkeys: VecDeque<SignKey>, - username: String, -} - -impl SimpleClient { - pub fn new(username: &impl AsRef<str>) -> Self { - SimpleClient { - auth_done: false, - main_ch: None, - authkeys: VecDeque::new(), - username: username.as_ref().into(), - } - } - - pub fn add_authkey(&mut self, k: SignKey) { - self.authkeys.push_back(k) - } -} - -// #[async_trait(?Send)] -#[async_trait] -impl door::AsyncCliBehaviour for SimpleClient { - async fn chan_handler( - &mut self, - _resp: &mut RespPackets, - chan_msg: ChanMsg, - ) -> Result<()> { - if Some(chan_msg.num) != self.main_ch { - return Err(Error::SSHProtoError); - } - - match chan_msg.msg { - ChanMsgDetails::ExtData { .. } => {} - ChanMsgDetails::Req { .. } => {} - _ => {} - } - Ok(()) - } - - async fn username(&mut self) -> BhResult<door::ResponseString> { - door::ResponseString::from_str(&self.username).map_err(|_| BhError::Fail) - } - - async fn valid_hostkey(&mut self, key: &door::PubKey) -> BhResult<bool> { - trace!("valid_hostkey for {key:?}"); - Ok(true) - } - - async fn next_authkey(&mut self) -> BhResult<Option<door::SignKey>> { - Ok(self.authkeys.pop_front()) - } - - async fn auth_password( - &mut self, - pwbuf: &mut door::ResponseString, - ) -> BhResult<bool> { - let pw = - rpassword::prompt_password(format!("password for {}: ", self.username)) - .map_err(|e| { - warn!("read_password failed {e:}"); - BhError::Fail - })?; - if pwbuf.push_str(&pw).is_err() { - Err(BhError::Fail) - } else { - Ok(true) - } - } - - async fn authenticated(&mut self) { - info!("Authentication succeeded"); - self.auth_done = true; - } - // } - - // impl door::BlockCliBehaviour for SimpleClient { - // fn chan_handler<'f>(&mut self, resp: &mut RespPackets, chan_msg: ChanMsg<'f>) -> Result<()> { - // if Some(chan_msg.num) != self.main_ch { - // return Err(Error::SSHProtoError) - // } - - // match chan_msg.msg { - // ChanMsgDetails::Data(buf) => { - // let _ = std::io::stdout().write_all(buf); - // }, - // ChanMsgDetails::ExtData{..} => { - // } - // ChanMsgDetails::Req{..} => { - // } - // _ => {} - // } - // Ok(()) - // } - - // fn progress(&mut self, runner: &mut Runner) -> Result<()> { - // if self.auth_done { - // if self.main_ch.is_none() { - // let ch = runner.open_client_session(Some("cowsay it works"), false)?; - // self.main_ch = Some(ch); - // } - // } - // Ok(()) - // } - - // fn username(&mut self) -> BhResult<door::ResponseString> { - // // TODO unwrap - // let mut p = door::ResponseString::new(); - // p.push_str("matt").unwrap(); - // Ok(p) - // } - - // fn valid_hostkey(&mut self, key: &door::PubKey) -> BhResult<bool> { - // trace!("valid_hostkey for {key:?}"); - // Ok(true) - // } - - // fn auth_password(&mut self, pwbuf: &mut door::ResponseString) -> BhResult<bool> { - // let pw = rpassword::prompt_password("password: ").map_err(|e| { - // warn!("read_password failed {e:}"); - // BhError::Fail - // })?; - // if pwbuf.push_str(&pw).is_err() { - // Err(BhError::Fail) - // } else { - // Ok(true) - // } - // } - - // fn authenticated(&mut self) { - // info!("Authentication succeeded"); - // self.auth_done = true; - // } -} diff --git a/sshproto/src/async_behaviour.rs b/sshproto/src/async_behaviour.rs index ddfbee1a2de3d97bfe996f42dfc6d798d57c453f..e6ef03816f5c99fa15d2ebeebb7d0b9b28a3465f 100644 --- a/sshproto/src/async_behaviour.rs +++ b/sshproto/src/async_behaviour.rs @@ -42,13 +42,6 @@ impl AsyncCliServ { }; Ok(c) } - - pub(crate) async fn chan_handler(&mut self, resp: &mut RespPackets<'_>, chan_msg: ChanMsg) -> Result<()> { - match self { - Self::Client(i) => i.chan_handler(resp, chan_msg).await, - Self::Server(i) => i.chan_handler(resp, chan_msg), - } - } } // Send+Sync bound here is required for trait objects since there are @@ -57,8 +50,6 @@ impl AsyncCliServ { // #[async_trait(?Send)] #[async_trait] pub trait AsyncCliBehaviour: Sync+Send { - async fn chan_handler(&mut self, resp: &mut RespPackets, chan_msg: ChanMsg) -> Result<()>; - /// Provide the username to use for authentication. Will only be called once /// per session. /// If the username needs to change a new connection should be made @@ -109,5 +100,4 @@ pub trait AsyncCliBehaviour: Sync+Send { // #[async_trait(?Send)] #[async_trait] pub trait AsyncServBehaviour: Sync+Send { - fn chan_handler(&mut self, resp: &mut RespPackets, chan_msg: ChanMsg) -> Result<()>; } diff --git a/sshproto/src/behaviour.rs b/sshproto/src/behaviour.rs index b100c41dabb5110cdd654d202991033e2159b149..b96417700a888ef98419ebd114a9934dd03cfb3b 100644 --- a/sshproto/src/behaviour.rs +++ b/sshproto/src/behaviour.rs @@ -70,10 +70,6 @@ impl Behaviour<'_> { } } - pub(crate) async fn chan_handler(&mut self, resp: &mut RespPackets<'_>, chan_msg: ChanMsg) -> Result<()> { - self.inner.chan_handler(resp, chan_msg).await - } - // TODO: or should we just pass CliBehaviour and ServBehaviour through runner, // don't switch here at all pub(crate) fn client(&mut self) -> Result<CliBehaviour> { @@ -101,10 +97,6 @@ impl<'a> Behaviour<'a> } } - pub(crate) async fn chan_handler(&mut self, resp: &mut RespPackets<'_>, chan_msg: ChanMsg) -> Result<()> { - self.inner.chan_handler(resp, chan_msg) - } - // TODO: or should we just pass CliBehaviour and ServBehaviour through runner, // don't switch here at all pub(crate) fn client(&mut self) -> Result<CliBehaviour> { diff --git a/sshproto/src/block_behaviour.rs b/sshproto/src/block_behaviour.rs index 03998eb62d0f2bbb5b45fb09a85ca357b3d3add5..16e2b0cb8b857e3c54d7a1ef7e73751702e7b76f 100644 --- a/sshproto/src/block_behaviour.rs +++ b/sshproto/src/block_behaviour.rs @@ -39,26 +39,9 @@ impl BlockCliServ<'_> }; Ok(c) } - - pub(crate) fn chan_handler<'f>( - &mut self, - resp: &mut RespPackets<'_>, - chan_msg: ChanMsg, - ) -> Result<()> { - match self { - Self::Client(i) => i.chan_handler(resp, chan_msg), - Self::Server(i) => i.chan_handler(resp, chan_msg), - } - } } pub trait BlockCliBehaviour { - fn chan_handler<'f>( - &mut self, - resp: &mut RespPackets, - chan_msg: ChanMsg, - ) -> Result<()>; - /// Provide the username to use for authentication. Will only be called once /// per session. /// If the username needs to change a new connection should be made @@ -103,9 +86,4 @@ pub trait BlockCliBehaviour { } pub trait BlockServBehaviour { - fn chan_handler( - &mut self, - resp: &mut RespPackets, - chan_msg: ChanMsg, - ) -> Result<()>; } diff --git a/sshproto/src/channel.rs b/sshproto/src/channel.rs index 6ab37a54abb29f832a0e6e1ed051a7f3460a813d..417cc06f380c7198d0ff6a55a1f424cc8ddfe8d0 100644 --- a/sshproto/src/channel.rs +++ b/sshproto/src/channel.rs @@ -58,7 +58,17 @@ impl Channels { Ok((ch.as_ref().unwrap(), p)) } - fn get_chan(&mut self, num: u32) -> Result<&mut Channel> { + pub(crate) fn get(&self, num: u32) -> Result<&Channel> { + self.ch + .get(num as usize) + // out of range + .ok_or(Error::BadChannel)? + .as_ref() + // unused channel + .ok_or(Error::BadChannel) + } + + pub(crate) fn get_mut(&mut self, num: u32) -> Result<&mut Channel> { self.ch .get_mut(num as usize) // out of range @@ -78,7 +88,7 @@ impl Channels { /// Returns the channel data packet to send, and the length of data consumed pub(crate) fn send_data<'b>(&mut self, num: u32, ext: Option<u32>, data: &'b [u8]) -> Result<(Packet<'b>, usize)> { - let send_ch = self.get_chan(num)?.send.as_ref().trap()?.num; + let send_ch = self.get(num)?.send.as_ref().trap()?.num; // TODO: check: channel state, channel window, maxpacket let len = data.len(); let data = BinString(data); @@ -98,7 +108,7 @@ impl Channels { Some(ref p) if p.chan == num => { // TODO: send window adjustment let len = p.len; - let ch = self.get_chan(num)?; + let ch = self.get_mut(num)?; ch.finished_input(len); self.pending_input = None; Ok(()) @@ -107,6 +117,10 @@ impl Channels { } } + pub(crate) fn recv_eof(&self, num: u32) -> bool { + self.get(num).map_or(false, |c| c.recv_eof()) + } + // incoming packet handling pub async fn dispatch( &mut self, @@ -120,7 +134,7 @@ impl Channels { todo!(); } Packet::ChannelOpenConfirmation(p) => { - let ch = self.get_chan(p.num)?; + let ch = self.get_mut(p.num)?; match ch.state { ChanState::Opening { .. } => { let init_state = @@ -143,7 +157,7 @@ impl Channels { } } Packet::ChannelOpenFailure(p) => { - let ch = self.get_chan(p.num)?; + let ch = self.get(p.num)?; if ch.send.is_some() { Err(Error::SSHProtoError) } else { @@ -156,7 +170,7 @@ impl Channels { todo!(); } Packet::ChannelData(p) => { - let ch = self.get_chan(p.num)?; + let ch = self.get(p.num)?; // TODO check we are expecting input if self.pending_input.is_some() { return Err(Error::bug()) @@ -166,7 +180,7 @@ impl Channels { Ok(Some(ChanEventMaker::DataIn(di))) } Packet::ChannelDataExt(p) => { - let ch = self.get_chan(p.num)?; + let ch = self.get(p.num)?; // TODO check we are expecting input and ext is valid. if self.pending_input.is_some() { return Err(Error::bug()) @@ -175,14 +189,15 @@ impl Channels { let di = DataIn { num: p.num, ext: Some(p.code), offset: p.data_offset(), len: p.data.0.len() }; Ok(Some(ChanEventMaker::DataIn(di))) } - Packet::ChannelEof(_p) => { - todo!(); + Packet::ChannelEof(p) => { + let _ch = self.get(p.num)?; + Ok(Some(ChanEventMaker::Eof { num: p.num })) } Packet::ChannelClose(_p) => { todo!(); } Packet::ChannelRequest(p) => { - match self.get_chan(p.num) { + match self.get(p.num) { Ok(ch) => Ok(Some(ChanEventMaker::Req)), Err(ch) => { if p.want_reply { @@ -329,13 +344,16 @@ pub enum ChanState { // the Behaviour own it? Or we don't store them here, just callback to the Behaviour. Opening { init_req: InitReqs }, Normal, - DrainRead, - DrainWrite, + + RecvEof, + RecvClose, } pub struct Channel { ty: ChanType, state: ChanState, + sent_eof: bool, + sent_close: bool, // queue of requests sent with want_reply last_req: heapless::Deque<ReqKind, MAX_OUTSTANDING_REQS>, @@ -350,8 +368,10 @@ pub struct Channel { impl Channel { fn new(num: u32, ty: ChanType, init_req: InitReqs) -> Self { Channel { - state: ChanState::Opening { init_req }, ty, + state: ChanState::Opening { init_req }, + sent_close: false, + sent_eof: false, last_req: Deque::new(), recv: ChanDir { num, @@ -376,6 +396,16 @@ impl Channel { fn finished_input(&mut self, len: usize ) { self.pending_adjust = self.pending_adjust.saturating_add(len) } + + pub fn recv_eof(&self) -> bool { + match self.state { + |ChanState::RecvEof + |ChanState::RecvClose + => true, + _ => false, + } + + } } pub struct ChanMsg { @@ -400,6 +430,26 @@ pub(crate) struct DataIn { pub len: usize, } +/// Application API +#[derive(Debug)] +pub enum ChanEvent<'a> { + // TODO: perhaps this one should go a level above since it isn't for existing channels? + OpenSuccess { num: u32 }, + + // TODO details + // OpenRequest { }, + + ReqPty { num: u32, want_reply: bool, pty: packets::Pty<'a> }, + + Req { num: u32, req: ChannelReqType<'a> }, + // TODO closein/closeout/eof, etc. Should also return the exit status etc + + Eof { num: u32 }, + + Close { num: u32 }, + // TODO: responses to a previous ChanMsg? +} + /// An event returned from `Channel::dispatch()`. /// Most are propagated to the application, `DataIn is caught by `runner` #[derive(Debug)] @@ -415,6 +465,8 @@ pub(crate) enum ChanEventMaker { Req, // TODO closein/closeout/eof, etc. Should also return the exit status etc + Eof { num: u32 }, + Close { num: u32 }, // TODO: responses to a previous ChanMsg? } @@ -442,30 +494,13 @@ impl ChanEventMaker { None } } + Self::Eof { num } => Some(ChanEvent::Eof { num: *num }), Self::Close { num } => Some(ChanEvent::Close { num: *num }), } } } -/// Application API -#[derive(Debug)] -pub enum ChanEvent<'a> { - // TODO: perhaps this one should go a level above since it isn't for existing channels? - OpenSuccess { num: u32 }, - - // TODO details - // OpenRequest { }, - - ReqPty { num: u32, want_reply: bool, pty: packets::Pty<'a> }, - - Req { num: u32, req: ChannelReqType<'a> }, - // TODO closein/closeout/eof, etc. Should also return the exit status etc - - Close { num: u32 }, - // TODO: responses to a previous ChanMsg? -} - struct PendInput { chan: u32, len: usize, diff --git a/sshproto/src/lib.rs b/sshproto/src/lib.rs index 70a5a2b86dfcdd59eecff171cc3f0ac429a743f7..06ce9d46228c933c02641b6bf5b12c23e812763f 100644 --- a/sshproto/src/lib.rs +++ b/sshproto/src/lib.rs @@ -52,9 +52,9 @@ pub use async_behaviour::{AsyncCliBehaviour,AsyncServBehaviour}; pub use block_behaviour::{BlockCliBehaviour,BlockServBehaviour}; pub use runner::Runner; -pub use conn::{Conn,RespPackets}; +pub use conn::RespPackets; pub use sign::SignKey; pub use packets::PubKey; pub use error::{Error,Result}; -pub use channel::{ChanMsg,ChanMsgDetails}; +pub use channel::{ChanMsg, ChanMsgDetails, ChanEvent}; pub use conn::Event; diff --git a/sshproto/src/runner.rs b/sshproto/src/runner.rs index bc566fc714225d0a7690dcaf483b11093fc2f002..871e163f5c4a2ad95991b9faf2d346a5ad3442fc 100644 --- a/sshproto/src/runner.rs +++ b/sshproto/src/runner.rs @@ -12,7 +12,7 @@ use crate::{*, channel::ChanEvent}; use encrypt::KeyState; use traffic::Traffic; -use conn::{Dispatched, EventMaker, Event}; +use conn::{Conn, Dispatched, EventMaker, Event}; use channel::ChanEventMaker; pub struct Runner<'a> { @@ -30,10 +30,10 @@ pub struct Runner<'a> { impl<'a> Runner<'a> { /// `iobuf` must be sized to fit the largest SSH packet allowed. - pub fn new( - conn: Conn<'a>, + pub fn new_client( iobuf: &'a mut [u8], ) -> Result<Runner<'a>, Error> { + let conn = Conn::new_client()?; let runner = Runner { conn, traffic: traffic::Traffic::new(iobuf), @@ -218,6 +218,10 @@ impl<'a> Runner<'a> { self.traffic.ready_channel_input() } + pub fn channel_eof(&self, chan: u32) -> bool { + self.conn.channels.recv_eof(chan) + } + // TODO check the chan/ext are valid, SSH window pub fn ready_channel_send(&self, _chan: u32, _ext: Option<u32>) -> bool { self.traffic.can_output()