diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index 13c66565e8109fd0b28266191d3a414f6ec8564f..b78c034120710c8ed23c605eb6d250e4aa3ffb56 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] embassy-sync = { version = "0.1.0" } +pin-utils = { version = "0.1" } sunset = { path = "../" } diff --git a/embassy/demos/std/Cargo.lock b/embassy/demos/std/Cargo.lock index b21166f260ad2383614933b36f1643ad44a52746..cb24dc96580f9327d6d086b51d3291f3652f8ac6 100644 --- a/embassy/demos/std/Cargo.lock +++ b/embassy/demos/std/Cargo.lock @@ -1290,6 +1290,7 @@ version = "0.2.0-alpha" dependencies = [ "embassy-sync 0.1.0", "log", + "pin-utils", "sunset", ] diff --git a/embassy/demos/std/src/main.rs b/embassy/demos/std/src/main.rs index f91efedec2aa90ed6e8ba346db7a174c799cf309..cab788b3e4c7726dfb1d756ce0202f1b775ab1ab 100644 --- a/embassy/demos/std/src/main.rs +++ b/embassy/demos/std/src/main.rs @@ -204,8 +204,8 @@ impl ServBehaviour for DemoServer { async fn session(socket: &mut TcpSocket<'_>) -> sunset::Result<()> { let mut app = DemoServer::new()?; - let mut ssh_rxbuf = [0; 1024]; - let mut ssh_txbuf = [0; 1024]; + let mut ssh_rxbuf = [0; 4000]; + let mut ssh_txbuf = [0; 4000]; let serv = SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf, &mut app)?; let serv = &serv; @@ -225,6 +225,7 @@ async fn session(socket: &mut TcpSocket<'_>) -> sunset::Result<()> { let n = wsock.write(buf).await.expect("TODO handle write error"); buf = &buf[n..]; } + trace!("tx write done"); } #[allow(unreachable_code)] Ok::<_, sunset::Error>(()) @@ -236,12 +237,13 @@ async fn session(socket: &mut TcpSocket<'_>) -> sunset::Result<()> { let mut buf = [0; 1024]; trace!("rx read"); let l = rsock.read(&mut buf).await.expect("TODO handle read error"); - trace!("rx read done"); + trace!("rx read done {l}"); let mut buf = &buf[..l]; while buf.len() > 0 { let n = serv.write(&buf).await?; buf = &buf[n..]; } + trace!("rx write done"); } #[allow(unreachable_code)] Ok::<_, sunset::Error>(()) @@ -265,6 +267,7 @@ fn main() { env_logger::builder() .filter_level(log::LevelFilter::Trace) .filter_module("async_io", log::LevelFilter::Info) + .filter_module("polling", log::LevelFilter::Info) .format_timestamp_nanos() .init(); diff --git a/embassy/src/embassy_sunset.rs b/embassy/src/embassy_sunset.rs index fb2e342f7e04f4b0307dee9021a21ebf5e63d0b2..33eff100e90678fce76c3ae15986d290782e3c23 100644 --- a/embassy/src/embassy_sunset.rs +++ b/embassy/src/embassy_sunset.rs @@ -3,7 +3,7 @@ use { log::{debug, error, info, log, trace, warn}, }; -use core::future::poll_fn; +use core::future::{poll_fn, Future}; use core::task::Poll; use embassy_sync::waitqueue::WakerRegistration; @@ -11,6 +11,8 @@ use embassy_sync::mutex::Mutex; use embassy_sync::blocking_mutex::raw::{NoopRawMutex, RawMutex}; use embassy_sync::signal::Signal; +use pin_utils::pin_mut; + use sunset::{Runner, Result, Behaviour, ServBehaviour, CliBehaviour}; use sunset::config::MAX_CHANNELS; @@ -25,6 +27,7 @@ pub struct EmbassySunset<'a> { pub(crate) inner: Mutex<NoopRawMutex, Inner<'a>>, progress_notify: Signal<NoopRawMutex, ()>, + lock_waker: WakerRegistration, } impl<'a> EmbassySunset<'a> { @@ -41,6 +44,7 @@ impl<'a> EmbassySunset<'a> { Self { inner, progress_notify, + lock_waker: WakerRegistration::new(), } } @@ -63,18 +67,23 @@ impl<'a> EmbassySunset<'a> { -> Result<()> where M: RawMutex, B: ServBehaviour { - let mut inner = self.inner.lock().await; + { + let mut inner = self.inner.lock().await; { - let mut b = b.lock().await; - // XXX: unsure why we need this explicit type - let b: &mut B = &mut b; - let mut b = Behaviour::new_server(b); - inner.runner.progress(&mut b).await?; - // b is dropped, allowing other users - } + { + let mut b = b.lock().await; + warn!("progress locked"); + // XXX: unsure why we need this explicit type + let b: &mut B = &mut b; + let mut b = Behaviour::new_server(b); + inner.runner.progress(&mut b).await?; + // b is dropped, allowing other users + } - self.wake_channels(&mut inner) + self.wake_channels(&mut inner) + } + // inner dropped } // idle until input is received @@ -85,8 +94,13 @@ impl<'a> EmbassySunset<'a> { pub async fn read(&self, buf: &mut [u8]) -> Result<usize> { poll_fn(|cx| { - let r = match self.inner.try_lock() { - Ok(mut inner) => { + + trace!("read locking"); + let i = self.inner.lock(); + pin_mut!(i); + let r = match i.poll(cx) { + Poll::Ready(mut inner) => { + warn!("read lock ready"); match inner.runner.output(buf) { Ok(0) => { inner.runner.set_output_waker(cx.waker()); @@ -96,8 +110,12 @@ impl<'a> EmbassySunset<'a> { Err(e) => Poll::Ready(Err(e)), } } - Err(_) => Poll::Pending, + Poll::Pending => { + trace!("read lock pending"); + Poll::Pending + } }; + trace!("read result {r:?}"); r }) @@ -106,8 +124,12 @@ impl<'a> EmbassySunset<'a> { pub async fn write(&self, buf: &[u8]) -> Result<usize> { poll_fn(|cx| { - let r = match self.inner.try_lock() { - Ok(mut inner) => { + trace!("write locking"); + let i = self.inner.lock(); + pin_mut!(i); + let r = match i.poll(cx) { + Poll::Ready(mut inner) => { + warn!("write lock ready"); if inner.runner.ready_input() { match inner.runner.input(buf) { Ok(0) => { @@ -121,12 +143,15 @@ impl<'a> EmbassySunset<'a> { Poll::Pending } } - Err(_) => Poll::Pending, + Poll::Pending => { + trace!("write lock pending"); + Poll::Pending + } }; - if r.is_ready() { self.progress_notify.signal(()) } + trace!("write result {r:?}"); r }) .await