diff --git a/src/runner.rs b/src/runner.rs index 526c2bcb2b4085fe45f74ae49e03579a9a2c281b..10d3db728f0a117a019fbbe1f0b854b5dcf0f147 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -81,24 +81,6 @@ impl<'a> Runner<'a> { Ok(runner) } - pub fn input(&mut self, buf: &[u8]) -> Result<usize, Error> { - self.traf_in.input( - &mut self.keys, - &mut self.conn.remote_version, - buf, - ) - } - - /// Write any pending output to the wire, returning the size written - pub fn output(&mut self, buf: &mut [u8]) -> Result<usize, Error> { - let r = self.traf_out.output(buf); - if r > 0 { - trace!("output() wake"); - self.wake(); - } - Ok(r) - } - /// Drives connection progress, handling received payload and sending /// other packets as required. This must be polled/awaited regularly. pub async fn progress(&mut self, behaviour: &mut Behaviour<'_>) -> Result<()> { @@ -124,29 +106,53 @@ impl<'a> Runner<'a> { Ok(()) } - pub fn done_payload(&mut self) -> Result<()> { - self.traf_in.done_payload()?; - self.wake(); - Ok(()) + pub fn input(&mut self, buf: &[u8]) -> Result<usize, Error> { + self.traf_in.input( + &mut self.keys, + &mut self.conn.remote_version, + buf, + ) } - pub fn wake(&mut self) { - if self.ready_input() { - trace!("wake ready_input, waker {:?}", self.input_waker); - if let Some(w) = self.input_waker.take() { - trace!("wake input waker"); - w.wake() + /// Write any pending output to the wire, returning the size written + pub fn output(&mut self, buf: &mut [u8]) -> Result<usize, Error> { + let r = self.traf_out.output(buf); + if r > 0 { + trace!("output() wake"); + self.wake(); + } + Ok(r) + } + + pub fn ready_input(&self) -> bool { + self.conn.initial_sent() && self.traf_in.ready_input() + } + + pub fn output_pending(&self) -> bool { + self.traf_out.output_pending() + } + + /// Set a waker to be notified when the `Runner` is ready + /// to accept input from the main SSH socket. + pub fn set_input_waker(&mut self, waker: &Waker) { + if let Some(ref w) = self.input_waker { + if w.will_wake(waker) { + return } } + self.input_waker.replace(waker.clone()) + .map(|w| w.wake()); + } - if self.output_pending() { - if let Some(w) = self.output_waker.take() { - trace!("wake output waker"); - w.wake() - } else { - trace!("no waker"); + /// Set a waker to be notified when SSH socket output is ready + pub fn set_output_waker(&mut self, waker: &Waker) { + if let Some(ref w) = self.output_waker { + if w.will_wake(waker) { + return } } + self.output_waker.replace(waker.clone()) + .map(|w| w.wake()); } // TODO: move somewhere client specific? @@ -216,37 +222,6 @@ impl<'a> Runner<'a> { Ok(len) } - pub fn ready_input(&self) -> bool { - self.conn.initial_sent() && self.traf_in.ready_input() - } - - pub fn output_pending(&self) -> bool { - self.traf_out.output_pending() - } - - /// Set a waker to be notified when the `Runner` is ready - /// to accept input from the main SSH socket. - pub fn set_input_waker(&mut self, waker: &Waker) { - if let Some(ref w) = self.input_waker { - if w.will_wake(waker) { - return - } - } - self.input_waker.replace(waker.clone()) - .map(|w| w.wake()); - } - - /// Set a waker to be notified when SSH socket output is ready - pub fn set_output_waker(&mut self, waker: &Waker) { - if let Some(ref w) = self.output_waker { - if w.will_wake(waker) { - return - } - } - self.output_waker.replace(waker.clone()) - .map(|w| w.wake()); - } - /// When channel data is ready, returns a tuple /// `Some((channel, ext))` where `ext` is `None` for stdout, `Some(exttype)` /// for extended types (like stderr). @@ -278,4 +253,24 @@ impl<'a> Runner<'a> { // pub fn set_chan_waker(&mut self, waker: Waker) { // self.chan_waker = Some(waker); // } + + fn wake(&mut self) { + if self.ready_input() { + trace!("wake ready_input, waker {:?}", self.input_waker); + if let Some(w) = self.input_waker.take() { + trace!("wake input waker"); + w.wake() + } + } + + if self.output_pending() { + if let Some(w) = self.output_waker.take() { + trace!("wake output waker"); + w.wake() + } else { + trace!("no waker"); + } + } + } + }