diff --git a/async/src/async_sunset.rs b/async/src/async_sunset.rs index d0f414806c0e568e1e949cd3fa05b6db6e636d0c..eb9b41870175be6866eac38cc3c8a35c92f7a3e5 100644 --- a/async/src/async_sunset.rs +++ b/async/src/async_sunset.rs @@ -170,7 +170,7 @@ impl<'a> AsyncRead for AsyncSunsetSocket<'a> { match r { Ok(0) => { trace!("set output waker"); - runner.set_output_waker(cx.waker().clone()); + runner.set_output_waker(cx.waker()); Poll::Pending } Ok(sz) => { @@ -211,7 +211,7 @@ impl<'a> AsyncWrite for AsyncSunsetSocket<'a> { Poll::Ready(r) } else { trace!("not ready"); - runner.set_input_waker(cx.waker().clone()); + runner.set_input_waker(cx.waker()); Poll::Pending }; diff --git a/src/runner.rs b/src/runner.rs index e95920d3050383ff1a5672310ecd22a4b1102894..5543ba46d89043987ab5c289166d7f81e48570fd 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -25,8 +25,10 @@ pub struct Runner<'a> { /// Current encryption/integrity keys keys: KeyState, - output_waker: Option<Waker>, - input_waker: Option<Waker>, + /// Waker when output is ready + pub output_waker: Option<Waker>, + /// Waker when ready to consume input. + pub input_waker: Option<Waker>, } impl core::fmt::Debug for Runner<'_> { @@ -99,9 +101,6 @@ impl<'a> Runner<'a> { /// Drives connection progress, handling received payload and sending /// other packets as required. This must be polled/awaited regularly. - /// Optionally returns `Event` which provides channel or session - /// event to the application. - /// [`done_payload()`] must be called after any `Ok` result. pub async fn progress(&mut self, behaviour: &mut Behaviour<'_>) -> Result<()> { let mut s = self.traf_out.sender(&mut self.keys); // Handle incoming packets @@ -225,14 +224,33 @@ impl<'a> Runner<'a> { self.traf_out.output_pending() } - pub fn set_input_waker(&mut self, waker: Waker) { - self.input_waker = Some(waker); + /// Set a waker to be notified when the `Runner` is ready + /// to accept input from the main SSH socket. + pub fn set_input_waker(&mut self, waker: &Waker) { + if let Some(ref w) = self.input_waker { + if w.will_wake(waker) { + return + } + } + self.input_waker.replace(waker.clone()) + .map(|w| w.wake()); } - pub fn set_output_waker(&mut self, waker: Waker) { - self.output_waker = Some(waker); + /// Set a waker to be notified when SSH socket output is ready + pub fn set_output_waker(&mut self, waker: &Waker) { + if let Some(ref w) = self.output_waker { + if w.will_wake(waker) { + return + } + } + self.output_waker.replace(waker.clone()) + .map(|w| w.wake()); } + /// When channel data is ready, returns a tuple + /// `Some((channel, ext))` where `ext` is `None` for stdout, `Some(exttype)` + /// for extended types (like stderr). + /// Returns `None` if none ready. pub fn ready_channel_input(&self) -> Option<(u32, Option<u32>)> { self.traf_in.ready_channel_input() }