diff --git a/Cargo.lock b/Cargo.lock index 50830f5204046b574fc8914788ffa6b71974bd88..d297564ae5c8bf1210148bb96185700264ef8629 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,7 +311,7 @@ name = "door" version = "0.1.0" [[package]] -name = "door-smol" +name = "door-async" version = "0.1.0" dependencies = [ "anyhow", @@ -320,7 +320,7 @@ dependencies = [ "door-sshproto", "futures", "log", - "pretty-hex 0.3.0", + "pretty-hex", "rpassword", "simplelog", "snafu", @@ -343,7 +343,7 @@ dependencies = [ "log", "no-panic", "poly1305", - "pretty-hex 0.3.0", + "pretty-hex", "proptest", "rand", "rand_core 0.6.3", @@ -357,20 +357,6 @@ dependencies = [ "sshwire_derive", ] -[[package]] -name = "door-tokio" -version = "0.1.0" -dependencies = [ - "anyhow", - "door-sshproto", - "log", - "pin-utils", - "pretty-hex 0.2.1", - "simplelog", - "snafu", - "tokio", -] - [[package]] name = "ed25519" version = "1.5.2" @@ -773,12 +759,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" -[[package]] -name = "pretty-hex" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5c99d529f0d30937f6f4b8a86d988047327bb88d04d2c4afc356de74722131" - [[package]] name = "pretty-hex" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index ff88aa1d5e4dbb47f037173d1545754bf5cd524b..c930a8a2888671b4bceff14e832b166a4838fa4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,8 @@ license = "MPL-2.0" [workspace] members = [ "sshproto", - "smol", - "door-tokio", "sshwire_derive", + "async", ] [profile.release] diff --git a/smol/Cargo.toml b/async/Cargo.toml similarity index 97% rename from smol/Cargo.toml rename to async/Cargo.toml index 8a5e337a503a5bcb628fb3a628aa8f811f733e03..50f94e7fd40d28a95c40343dbe3ab1c2da214344 100644 --- a/smol/Cargo.toml +++ b/async/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "door-smol" +name = "door-async" version = "0.1.0" edition = "2021" diff --git a/smol/examples/con1.rs b/async/examples/con1.rs similarity index 84% rename from smol/examples/con1.rs rename to async/examples/con1.rs index 7d0bc2618b3135e8229ddc771e9ff34f278a1011..05c63c6ea653b51704dc4cfae169d3afd621da61 100644 --- a/smol/examples/con1.rs +++ b/async/examples/con1.rs @@ -11,6 +11,7 @@ use tokio::net::TcpStream; use std::{net::Ipv6Addr, io::Read}; use door_sshproto::*; +use door_async::SSHClient; use simplelog::*; @@ -87,16 +88,6 @@ fn main() -> Result<()> { }) } -// async fn handle_request(door: &door_smol::AsyncDoor<'_>, query: HookQuery) { -// match query { -// HookQuery::Username(_) => { -// let mut s = ResponseString::new(); -// s.push_str("matt").unwrap(); -// door.reply_request(Ok(HookQuery::Username(s))).unwrap(); -// } -// } -// } - fn setup_log(args: &Args) { let mut conf = simplelog::ConfigBuilder::new(); let conf = conf @@ -105,7 +96,7 @@ fn setup_log(args: &Args) { // not debugging these bits of the stack at present // .add_filter_ignore_str("door_sshproto::traffic") // .add_filter_ignore_str("door_sshproto::runner") - // .add_filter_ignore_str("door_smol::async_door") + // .add_filter_ignore_str("door_async::async_door") .set_time_offset_to_local().expect("Couldn't get local timezone") .build(); @@ -137,30 +128,21 @@ async fn run(args: &Args) -> Result<()> { // Connect to a peer let mut stream = TcpStream::connect((args.host.as_str(), args.port)).await?; - // let mut stream = net::TcpStream::connect("::1:2244").await?; - // let mut stream = TcpStream::connect("130.95.13.18:22").await?; - let mut work = vec![0; 3000]; + let work = vec![0; 3000]; + // TODO: better lifetime rather than leaking let work = Box::leak(Box::new(work)); - let mut sess = door_smol::SimpleClient::new(args.username.as_ref().unwrap()); + + let mut sess = door_async::SimpleClient::new(args.username.as_ref().unwrap()); for i in &args.identityfile { sess.add_authkey(read_key(&i) .with_context(|| format!("loading key {i}"))?); } - let conn = Conn::new_client()?; - let runner = Runner::new(conn, work.as_mut_slice())?; - - let b = Behaviour::new_async_client(Box::new(sess)); - // let b = Behaviour::new_blocking_client(&mut sess); - let mut door = door_smol::AsyncDoor::new(runner, b); - - // let door = async_dup::Mutex::new(door_smol::AsyncDoor { runner }); - // let mut f = future::try_zip(netwrite, netread).fuse(); - // f.await; + let mut door = SSHClient::new(work.as_mut_slice(), Box::new(sess))?; - let mut d = door.clone(); - let netloop = tokio::io::copy_bidirectional(&mut stream, &mut d); + let mut s = door.socket(); + let netloop = tokio::io::copy_bidirectional(&mut stream, &mut s); let prog = tokio::spawn(async move { diff --git a/smol/src/async_client.rs b/async/src/async_client.rs similarity index 93% rename from smol/src/async_client.rs rename to async/src/async_client.rs index 8de183714c89bd4ff078a294196e24824ead28b1..d286fe8464195693b107b0ef51b5316f3110e458 100644 --- a/smol/src/async_client.rs +++ b/async/src/async_client.rs @@ -54,17 +54,6 @@ impl door::AsyncCliBehaviour for SimpleClient { Ok(()) } - fn progress(&mut self, runner: &mut Runner) -> Result<()> { - if self.auth_done { - if self.main_ch.is_none() { - let ch = - runner.open_client_session(Some("cowsay it works"), false)?; - self.main_ch = Some(ch); - } - } - Ok(()) - } - async fn username(&mut self) -> BhResult<door::ResponseString> { door::ResponseString::from_str(&self.username).map_err(|_| BhError::Fail) } diff --git a/smol/src/async_door.rs b/async/src/async_door.rs similarity index 87% rename from smol/src/async_door.rs rename to async/src/async_door.rs index d522a9ae954470d512da4b0135788c4161244fa6..b06dcbdd258146e5b23b37980a4f58c5cea25369 100644 --- a/smol/src/async_door.rs +++ b/async/src/async_door.rs @@ -22,9 +22,8 @@ use std::sync::Arc; // TODO use anyhow::{anyhow, Context as _, Error, Result}; -use door::{Behaviour, Runner}; use door_sshproto as door; -use door_sshproto::error::Error as DoorError; +use door::{Behaviour, AsyncCliBehaviour, Runner, Conn}; // use door_sshproto::client::*; use pretty_hex::PrettyHex; @@ -45,8 +44,6 @@ pub struct AsyncDoor<'a> { progress_notify: Arc<TokioNotify>, - read_lock_fut: Option<OwnedMutexLockFuture<Inner<'a>>>, - write_lock_fut: Option<OwnedMutexLockFuture<Inner<'a>>>, } impl<'a> AsyncDoor<'a> { @@ -56,7 +53,11 @@ impl<'a> AsyncDoor<'a> { let inner = Arc::new(Mutex::new(Inner { runner, behaviour, chan_read_wakers, chan_write_wakers })); let progress_notify = Arc::new(TokioNotify::new()); - Self { inner, progress_notify, read_lock_fut: None, write_lock_fut: None } + Self { inner, progress_notify } + } + + pub fn socket(&self) -> AsyncDoorSocket<'a> { + AsyncDoorSocket::new(self) } pub async fn progress<F, R>(&mut self, f: F) @@ -120,34 +121,58 @@ impl<'a> AsyncDoor<'a> { let mut inner = self.inner.lock().await; f(&mut inner.runner) } +} + +impl Clone for AsyncDoor<'_> { + fn clone(&self) -> Self { + Self { inner: self.inner.clone(), + progress_notify: self.progress_notify.clone(), + } + } +} + + +pub struct SSHClient<'a> { + door: AsyncDoor<'a>, +} + +impl<'a> SSHClient<'a> { + pub fn new(buf: &'a mut [u8], behaviour: Box<dyn AsyncCliBehaviour+Send>) -> Result<Self> { + let conn = Conn::new_client()?; + let runner = Runner::new(conn, buf)?; + let b = Behaviour::new_async_client(behaviour); + let door = AsyncDoor::new(runner, b); + Ok(Self { + door + }) + } + + pub fn socket(&self) -> AsyncDoorSocket<'a> { + self.door.socket() + } + + pub async fn progress<F, R>(&mut self, f: F) + -> Result<Option<R>> + where F: FnOnce(door::Event) -> Result<Option<R>> { + self.door.progress(f).await + } // TODO: return a Channel object that gives events like WinChange or exit status // TODO: move to SimpleClient or something? pub async fn open_client_session(&mut self, exec: Option<&str>, pty: bool) -> Result<(ChanInOut<'a>, ChanExtIn<'a>)> { - let chan = self.with_runner(|runner| { + let chan = self.door.with_runner(|runner| { runner.open_client_session(exec, pty) }).await?; - let cstd = ChanInOut::new(chan, &self); - let cerr = ChanExtIn::new(chan, SSH_EXTENDED_DATA_STDERR, &self); + let cstd = ChanInOut::new(chan, &self.door); + let cerr = ChanExtIn::new(chan, SSH_EXTENDED_DATA_STDERR, &self.door); Ok((cstd, cerr)) } } -impl Clone for AsyncDoor<'_> { - fn clone(&self) -> Self { - Self { inner: self.inner.clone(), - progress_notify: self.progress_notify.clone(), - read_lock_fut: None, - write_lock_fut: None, - } - } -} - - -/// Tries to locks Inner for a poll_read()/poll_write(). +/// Tries to lock Inner for a poll_read()/poll_write(). /// lock_fut from the caller holds the future so that it can /// be woken later if the lock was contended fn poll_lock<'a>(inner: Arc<Mutex<Inner<'a>>>, cx: &mut Context<'_>, @@ -162,7 +187,21 @@ fn poll_lock<'a>(inner: Arc<Mutex<Inner<'a>>>, cx: &mut Context<'_>, p } -impl<'a> AsyncRead for AsyncDoor<'a> { +pub struct AsyncDoorSocket<'a> { + door: AsyncDoor<'a>, + + read_lock_fut: Option<OwnedMutexLockFuture<Inner<'a>>>, + write_lock_fut: Option<OwnedMutexLockFuture<Inner<'a>>>, +} + +impl<'a> AsyncDoorSocket<'a> { + fn new(door: &AsyncDoor<'a>) -> Self { + AsyncDoorSocket { door: door.clone(), + read_lock_fut: None, write_lock_fut: None } + } +} + +impl<'a> AsyncRead for AsyncDoorSocket<'a> { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -170,7 +209,7 @@ impl<'a> AsyncRead for AsyncDoor<'a> { ) -> Poll<Result<(), IoError>> { trace!("poll_read"); - let mut p = poll_lock(self.inner.clone(), cx, &mut self.read_lock_fut); + let mut p = poll_lock(self.door.inner.clone(), cx, &mut self.read_lock_fut); let runner = match p { Poll::Ready(ref mut i) => &mut i.runner, @@ -200,7 +239,7 @@ impl<'a> AsyncRead for AsyncDoor<'a> { } } -impl<'a> AsyncWrite for AsyncDoor<'a> { +impl<'a> AsyncWrite for AsyncDoorSocket<'a> { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -208,7 +247,7 @@ impl<'a> AsyncWrite for AsyncDoor<'a> { ) -> Poll<Result<usize, IoError>> { trace!("poll_write"); - let mut p = poll_lock(self.inner.clone(), cx, &mut self.write_lock_fut); + let mut p = poll_lock(self.door.inner.clone(), cx, &mut self.write_lock_fut); let runner = match p { Poll::Ready(ref mut i) => &mut i.runner, @@ -237,7 +276,7 @@ impl<'a> AsyncWrite for AsyncDoor<'a> { if let Poll::Ready(_) = r { // TODO: only notify if packet traffic.payload().is_some() ? // Though we also are using progress() for other events. - self.progress_notify.notify_one(); + self.door.progress_notify.notify_one(); trace!("notify progress"); } r diff --git a/smol/src/lib.rs b/async/src/lib.rs similarity index 77% rename from smol/src/lib.rs rename to async/src/lib.rs index 2f4c164168f22462aa3a7ed9f4485b80e1de3a06..bb44ee3a82f5f3c8ca2f781b3f5ab25a01b29dc1 100644 --- a/smol/src/lib.rs +++ b/async/src/lib.rs @@ -4,4 +4,5 @@ mod async_door; pub use async_client::SimpleClient; pub use async_door::AsyncDoor; +pub use async_door::SSHClient; diff --git a/door-tokio/Cargo.toml b/door-tokio/Cargo.toml deleted file mode 100644 index 541f1d7e617c36a43e95bc2a6c2233e83cec59da..0000000000000000000000000000000000000000 --- a/door-tokio/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "door-tokio" -version = "0.1.0" -edition = "2021" - -[dependencies] -tokio = { version = "1.17", features = ["full"] } -door-sshproto = { path = "../sshproto", features = ["std"] } -log = { version = "0.4" } -pin-utils = "0.1" - -# anyhow = { version = "1.0" } - -[dev-dependencies] -snafu = { version = "0.7", default-features = true } -anyhow = { version = "1.0" } -pretty-hex = "0.2" -simplelog = "0.12" diff --git a/door-tokio/examples/con1.rs b/door-tokio/examples/con1.rs deleted file mode 100644 index 07336d59a3556d6afc0d1cc70d0104f9ba3e81e7..0000000000000000000000000000000000000000 --- a/door-tokio/examples/con1.rs +++ /dev/null @@ -1,120 +0,0 @@ -#[allow(unused_imports)] -use { - // crate::error::Error, - log::{debug, error, info, log, trace, warn}, -}; -use anyhow::Context; -use std::error::Error; -use pretty_hex::PrettyHex; -use tokio::{io::AsyncWriteExt,io}; -use tokio::net::TcpStream; -use pin_utils::*; - -use door_sshproto::*; - -use simplelog::*; - -#[tokio::main] -async fn main() { - - let r = run().await; - if let Err(e) = r { - error!("Finished with error: {:?}", e); - } -} - -async fn run() -> Result<(), Box<dyn Error>> { - - let conf = simplelog::ConfigBuilder::new() - // .add_filter_ignore_str("serde") - .build(); - - CombinedLogger::init( - vec![ - TermLogger::new(LevelFilter::Trace, conf, TerminalMode::Mixed, ColorChoice::Auto), - ] - ).unwrap(); - - info!("running main"); - trace!("tracing main"); - - // Connect to a peer - // let mut stream = TcpStream::connect("dropbear.nl:22").await?; - let mut stream = TcpStream::connect("::1:2244").await?; - // let mut stream = TcpStream::connect("130.95.13.18:22").await?; - - let mut work = vec![0; 3000]; - let cli= Client::new(); - let c = Conn::new_client(cli)?; - let runner = Runner::new(c, work.as_mut_slice())?; - - let door = door_tokio::AsyncDoor { runner }; - pin_mut!(door); - - io::copy_bidirectional(&mut stream, &mut door).await?; - - // let mut inbuf = vec![0; 3000]; - // let mut inpos = 0; - // let mut inlen = 0; - // let mut outbuf = vec![0; 3000]; - - - // loop { - // while r.output_pending() { - // let b = outbuf.as_mut_slice(); - // let l = r.output(b)?; - // let b = &b[..l]; - // stream.write_all(b).await.context("write_all")?; - // } - - // if r.ready_input() { - // trace!("ready in"); - // stream.readable().await.context("readable")?; - // if inlen == inpos { - // inlen = match stream.try_read(&mut inbuf) { - // Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - // trace!("wouldblock"); - // Ok(0) - // }, - // Ok(n) => { - // if n == 0 { - // Err(io::Error::from(io::ErrorKind::UnexpectedEof)) - // } else { - // Ok(n) - // } - // } - - // other_error => other_error, - // }.context("read")?; - // trace!("read new {inlen}"); - // inpos = 0; - // } - - // trace!("nputting {inlen}..{inpos}"); - // let l = r.input(&inbuf[inpos..inlen])?; - // inpos += l; - // } - // } - - - // let mut d = ident::RemoteVersion::new(); - // let (taken, done) = d.consume(&buf)?; - // println!("taken {taken} done {done}"); - // let v = d.version(); - // match v { - // Some(x) => { - // println!("v {:?}", x.hex_dump()); - // } - // None => { - // println!("None"); - // } - // } - // let (_, rest) = buf.split_at(taken + 5); - // println!("reset {:?}", rest.hex_dump()); - - // let ctx = packets::ParseContext::new(); - // let p = wireformat::packet_from_bytes(rest, &ctx)?; - // println!("{p:#?}"); - - Ok(()) -} diff --git a/door-tokio/src/lib.rs b/door-tokio/src/lib.rs deleted file mode 100644 index 365288798f55fc0b54fa2231a43be549d764b1d3..0000000000000000000000000000000000000000 --- a/door-tokio/src/lib.rs +++ /dev/null @@ -1,63 +0,0 @@ -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use core::pin::Pin; -use core::task::{Context,Poll}; - -// TODO -// use anyhow::{Context as _, Result, Error}; - -pub struct AsyncDoor<'a> { - pub runner: door_sshproto::Runner<'a>, -} - -impl<'a> AsyncRead for AsyncDoor<'a> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut ReadBuf) -> Poll<Result<(), std::io::Error>> - { - if self.runner.output_pending() { - let r = self.runner.output(buf.initialize_unfilled()) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - if let Ok(l) = r { - buf.advance(l) - } - Poll::Ready(r.map(|_| ())) - } else { - self.runner.set_output_waker(cx.waker().clone()); - Poll::Pending - } - } -} - -impl<'a> AsyncWrite for AsyncDoor<'a> { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &[u8]) -> Poll<Result<usize, std::io::Error>> - { - if self.runner.ready_input() { - let r = self.runner.input(buf) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - Poll::Ready(r) - } else { - self.runner.set_input_waker(cx.waker().clone()); - Poll::Pending - } - - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Result<(), std::io::Error>> - { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Result<(), std::io::Error>> - { - todo!("poll_close") - } - -} - - - -#[cfg(test)] -mod tests { -} diff --git a/sshproto/src/async_behaviour.rs b/sshproto/src/async_behaviour.rs index df4c59f2e2daca79cc4634b01f5374e1d4891b4a..ddfbee1a2de3d97bfe996f42dfc6d798d57c453f 100644 --- a/sshproto/src/async_behaviour.rs +++ b/sshproto/src/async_behaviour.rs @@ -43,13 +43,6 @@ impl AsyncCliServ { Ok(c) } - pub(crate) fn progress(&mut self, runner: &mut Runner) -> Result<()> { - match self { - Self::Client(i) => i.progress(runner), - Self::Server(i) => i.progress(runner), - } - } - pub(crate) async fn chan_handler(&mut self, resp: &mut RespPackets<'_>, chan_msg: ChanMsg) -> Result<()> { match self { Self::Client(i) => i.chan_handler(resp, chan_msg).await, @@ -66,9 +59,6 @@ impl AsyncCliServ { pub trait AsyncCliBehaviour: Sync+Send { async fn chan_handler(&mut self, resp: &mut RespPackets, chan_msg: ChanMsg) -> Result<()>; - /// Should not block - fn progress(&mut self, runner: &mut Runner) -> Result<()> { Ok(()) } - /// Provide the username to use for authentication. Will only be called once /// per session. /// If the username needs to change a new connection should be made @@ -119,7 +109,5 @@ pub trait AsyncCliBehaviour: Sync+Send { // #[async_trait(?Send)] #[async_trait] pub trait AsyncServBehaviour: Sync+Send { - fn progress(&mut self, runner: &mut Runner) -> Result<()> { Ok(()) } - fn chan_handler(&mut self, resp: &mut RespPackets, chan_msg: ChanMsg) -> Result<()>; } diff --git a/sshproto/src/behaviour.rs b/sshproto/src/behaviour.rs index a0f21624b2e60b9977919646e982ada2cb436168..b100c41dabb5110cdd654d202991033e2159b149 100644 --- a/sshproto/src/behaviour.rs +++ b/sshproto/src/behaviour.rs @@ -56,24 +56,20 @@ pub struct Behaviour<'a> { #[cfg(feature = "std")] impl Behaviour<'_> { - pub fn new_async_client(b: std::boxed::Box<dyn async_behaviour::AsyncCliBehaviour + Send>) -> Self { + pub fn new_async_client(b: Box<dyn async_behaviour::AsyncCliBehaviour + Send>) -> Self { Self { inner: async_behaviour::AsyncCliServ::Client(b), phantom: PhantomData::default(), } } - pub fn new_async_server(b: std::boxed::Box<dyn async_behaviour::AsyncServBehaviour + Send>) -> Self { + pub fn new_async_server(b: Box<dyn async_behaviour::AsyncServBehaviour + Send>) -> Self { Self { inner: async_behaviour::AsyncCliServ::Server(b), phantom: PhantomData::default(), } } - pub(crate) fn progress(&mut self, runner: &mut Runner) -> Result<()> { - self.inner.progress(runner) - } - pub(crate) async fn chan_handler(&mut self, resp: &mut RespPackets<'_>, chan_msg: ChanMsg) -> Result<()> { self.inner.chan_handler(resp, chan_msg).await } @@ -104,9 +100,6 @@ impl<'a> Behaviour<'a> phantom: PhantomData::default(), } } - pub(crate) fn progress(&mut self, runner: &mut Runner) -> Result<()> { - self.inner.progress(runner) - } pub(crate) async fn chan_handler(&mut self, resp: &mut RespPackets<'_>, chan_msg: ChanMsg) -> Result<()> { self.inner.chan_handler(resp, chan_msg) diff --git a/sshproto/src/bhnostd.rs b/sshproto/src/bhnostd.rs deleted file mode 100644 index 7e23325446970873b8226f4cdf56309b45876fab..0000000000000000000000000000000000000000 --- a/sshproto/src/bhnostd.rs +++ /dev/null @@ -1,74 +0,0 @@ -#![cfg(not(feature = "tokio-queue"))] - -#[allow(unused_imports)] -use { - crate::error::{Error, Result, TrapBug}, - log::{debug, error, info, log, trace, warn}, -}; - -use crate::*; -use crate::behaviour::*; - -// TODO -const CAPACITY: usize = 1; - -pub(crate) fn pair() -> (Queryer, Responder) { - let (s, r) = mpsc::channel(CAPACITY); - (Queryer { pipe: s }, Responder { pipe: r } ) -} - -pub(crate) struct Queryer { - pipe: Sender, -} - -impl Queryer { - pub async fn query(&self, q: BhQuery) -> BhResult<BhQuery> { - let (tx, rx) = oneshot::channel(); - self.pipe.send((q, tx)).await.trap()?; - rx.await.trap()? - } -} - - -pub(crate) struct Responder { - pipe: Receiver, -} - -impl Responder { - pub async fn next_query(&self) -> BhResult<ReplyChannel> { - let (query, tx) = self.pipe.recv().await; - Ok(ReplyChannel::new(query, tx)) - } -} - -pub struct ReplyChannel { - query: BhQuery, - query_disc: core::mem::Discriminant<BhQuery>, - tx: ReplyTx, -} - -impl ReplyChannel { - fn new(query: BhQuery, tx: ReplyTx) -> Self { - let query_disc = core::mem::discriminant(&query); - Self { - query, - query_disc, - tx, - } - } - pub fn query(&self) -> &BhQuery { - &self.query - } - - pub fn reply(self, r: BhResult<BhQuery>) -> BhResult<()> { - if let Ok(r) = r { - let reply_disc = core::mem::discriminant(&r); - if reply_disc != self.query_disc { - warn!("Mismatch reply"); - let _ = self.tx.send(Err(BhError::Fail)); - return Err(BhError::Fail); - } - } - self.tx.send(r).map_err(|_| BhError::Fail) - } -} diff --git a/sshproto/src/bhtokio.rs b/sshproto/src/bhtokio.rs deleted file mode 100644 index 15d90a59e2897ae47545ece075aebb21e7d3ef10..0000000000000000000000000000000000000000 --- a/sshproto/src/bhtokio.rs +++ /dev/null @@ -1,80 +0,0 @@ -#![cfg(feature = "tokio-queue")] - -#[allow(unused_imports)] -use { - crate::error::{Error, Result, TrapBug}, - log::{debug, error, info, log, trace, warn}, -}; - -use crate::*; -use crate::behaviour::*; -use tokio::sync::*; - -// TODO -const CAPACITY: usize = 1; - -type Msg = (BhQuery, oneshot::Sender<BhResult<BhQuery>>); -type Sender = mpsc::Sender::<Msg>; -type Receiver = mpsc::Receiver::<Msg>; -type ReplyTx = oneshot::Sender<BhResult<BhQuery>>; - -pub(crate) fn pair() -> (Queryer, Responder) { - let (s, r) = mpsc::channel(CAPACITY); - (Queryer { pipe: s }, Responder { pipe: r } ) -} - -pub(crate) struct Queryer { - pipe: Sender, -} - -impl Queryer { - pub async fn query(&self, q: BhQuery) -> BhResult<BhQuery> { - let (tx, rx) = oneshot::channel(); - self.pipe.send((q, tx)).await.trap()?; - rx.await.trap()? - } -} - - -pub(crate) struct Responder { - pipe: Receiver, -} - -impl Responder { - pub async fn next_query(&self) -> BhResult<ReplyChannel> { - let (query, tx) = self.pipe.recv().await; - Ok(ReplyChannel::new(query, tx)) - } -} - -pub struct ReplyChannel { - query: BhQuery, - query_disc: core::mem::Discriminant<BhQuery>, - tx: ReplyTx, -} - -impl ReplyChannel { - fn new(query: BhQuery, tx: ReplyTx) -> Self { - let query_disc = core::mem::discriminant(&query); - Self { - query, - query_disc, - tx, - } - } - pub fn query(&self) -> &BhQuery { - &self.query - } - - pub fn reply(self, r: BhResult<BhQuery>) -> BhResult<()> { - if let Ok(r) = r { - let reply_disc = core::mem::discriminant(&r); - if reply_disc != self.query_disc { - warn!("Mismatch reply"); - let _ = self.tx.send(Err(BhError::Fail)); - return Err(BhError::Fail); - } - } - self.tx.send(r).map_err(|_| BhError::Fail) - } -} diff --git a/sshproto/src/block_behaviour.rs b/sshproto/src/block_behaviour.rs index 1612b9f4130fe087ff7154d6b3f912ffd1026f31..03998eb62d0f2bbb5b45fb09a85ca357b3d3add5 100644 --- a/sshproto/src/block_behaviour.rs +++ b/sshproto/src/block_behaviour.rs @@ -40,13 +40,6 @@ impl BlockCliServ<'_> Ok(c) } - pub(crate) fn progress(&mut self, runner: &mut Runner) -> Result<()> { - match self { - Self::Client(i) => i.progress(runner), - Self::Server(i) => i.progress(runner), - } - } - pub(crate) fn chan_handler<'f>( &mut self, resp: &mut RespPackets<'_>, @@ -66,11 +59,6 @@ pub trait BlockCliBehaviour { chan_msg: ChanMsg, ) -> Result<()>; - /// Should not block - fn progress(&mut self, runner: &mut Runner) -> Result<()> { - Ok(()) - } - /// Provide the username to use for authentication. Will only be called once /// per session. /// If the username needs to change a new connection should be made @@ -115,10 +103,6 @@ pub trait BlockCliBehaviour { } pub trait BlockServBehaviour { - fn progress(&mut self, runner: &mut Runner) -> Result<()> { - Ok(()) - } - fn chan_handler( &mut self, resp: &mut RespPackets,