diff --git a/Cargo.lock b/Cargo.lock index a2225c6934f5bb15bd99137a714b7464695ed167..1a19ead302ab6c3f1d74dc5b3d35ffec1d8c3235 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,7 +297,7 @@ dependencies = [ "defmt", "embassy-futures", "embassy-net", - "embassy-sync", + "embassy-sync 0.1.0", "embassy-time", "embedded-hal 1.0.0-alpha.9", "embedded-hal-async", @@ -447,7 +447,7 @@ dependencies = [ "embassy-executor", "embassy-hal-common", "embassy-macros", - "embassy-sync", + "embassy-sync 0.1.0 (git+https://github.com/embassy-rs/embassy?rev=e7fdd500d8354a03fcd105c8298cf7b4798a4107)", ] [[package]] @@ -455,7 +455,7 @@ name = "embassy-embedded-hal" version = "0.1.0" source = "git+https://github.com/embassy-rs/embassy?rev=e7fdd500d8354a03fcd105c8298cf7b4798a4107#e7fdd500d8354a03fcd105c8298cf7b4798a4107" dependencies = [ - "embassy-sync", + "embassy-sync 0.1.0 (git+https://github.com/embassy-rs/embassy?rev=e7fdd500d8354a03fcd105c8298cf7b4798a4107)", "embedded-hal 0.2.7", "embedded-hal 1.0.0-alpha.9", "embedded-hal-async", @@ -512,7 +512,7 @@ dependencies = [ "atomic-polyfill 1.0.1", "atomic-pool", "defmt", - "embassy-sync", + "embassy-sync 0.1.0 (git+https://github.com/embassy-rs/embassy?rev=e7fdd500d8354a03fcd105c8298cf7b4798a4107)", "embassy-time", "futures", "generic-array 0.14.6", @@ -538,7 +538,7 @@ dependencies = [ "embassy-executor", "embassy-futures", "embassy-hal-common", - "embassy-sync", + "embassy-sync 0.1.0 (git+https://github.com/embassy-rs/embassy?rev=e7fdd500d8354a03fcd105c8298cf7b4798a4107)", "embassy-time", "embassy-usb-driver", "embedded-hal 0.2.7", @@ -552,6 +552,18 @@ dependencies = [ "rp2040-pac2", ] +[[package]] +name = "embassy-sync" +version = "0.1.0" +dependencies = [ + "atomic-polyfill 1.0.1", + "cfg-if", + "critical-section 1.1.1", + "embedded-io", + "futures-util", + "heapless", +] + [[package]] name = "embassy-sync" version = "0.1.0" @@ -574,7 +586,7 @@ dependencies = [ "cfg-if", "critical-section 1.1.1", "defmt", - "embassy-sync", + "embassy-sync 0.1.0 (git+https://github.com/embassy-rs/embassy?rev=e7fdd500d8354a03fcd105c8298cf7b4798a4107)", "embedded-hal 0.2.7", "futures-util", "heapless", @@ -1318,9 +1330,10 @@ dependencies = [ "defmt", "defmt-rtt", "embassy-executor", + "embassy-futures", "embassy-net", "embassy-rp", - "embassy-sync", + "embassy-sync 0.1.0", "embassy-time", "embedded-hal 1.0.0-alpha.9", "embedded-hal-async", diff --git a/Cargo.toml b/Cargo.toml index 280178105303d0e1385052b1f3a7177340d3f7d2..c9de189e5182f24fc0e63c3b53c2871c8128de50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ embassy-time = { version = "0.1.0", features = ["defmt", "defmt-timestamp-uptim embassy-rp = { version = "0.1.0", features = ["defmt", "unstable-traits", "nightly", "unstable-pac"] } embassy-net = { version = "0.1.0", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "pool-16"] } embassy-sync = { version = "0.1.0" } +embassy-futures = { version = "0.1.0" } atomic-polyfill = "0.1.5" static_cell = "1.0" @@ -42,10 +43,11 @@ sha2 = { version = "0.10", default-features = false } embassy-executor = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } embassy-time = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } embassy-futures = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } -embassy-sync = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } +# embassy-sync = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } embassy-rp = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } embassy-net = { git = "https://github.com/embassy-rs/embassy", rev = "e7fdd500d8354a03fcd105c8298cf7b4798a4107" } -# embassy-net = { path = "/home/matt/3rd/rs/embassy/embassy-net" } + +embassy-sync = { path = "/home/matt/3rd/rs/embassy/embassy-sync" } [profile.dev] debug = 2 diff --git a/src/embassy_sunset.rs b/src/embassy_sunset.rs index 5eb32651d02a868733e6aec8e0d62e7d0b72fa54..b36c770e1ceec39ae9bde2c249655143c2be4b36 100644 --- a/src/embassy_sunset.rs +++ b/src/embassy_sunset.rs @@ -1,30 +1,33 @@ use core::future::poll_fn; use core::task::Poll; -use embassy_sync::waitqueue::AtomicWaker; +use embassy_sync::waitqueue::WakerRegistration; use embassy_sync::mutex::Mutex; use embassy_sync::blocking_mutex::raw::NoopRawMutex; use embassy_sync::signal::Signal; use sunset::{Runner, Result, Behaviour}; +use sunset::config::MAX_CHANNELS; pub(crate) struct Inner<'a> { pub runner: Runner<'a>, + + pub chan_read_wakers: [WakerRegistration; MAX_CHANNELS], + pub chan_write_wakers: [WakerRegistration; MAX_CHANNELS], } pub struct EmbassySunset<'a> { pub(crate) inner: Mutex<NoopRawMutex, Inner<'a>>, progress_notify: Signal<NoopRawMutex, ()>, - - read_waker: AtomicWaker, - write_waker: AtomicWaker, } impl<'a> EmbassySunset<'a> { pub fn new(runner: Runner<'a>) -> Self { let inner = Inner { runner, + chan_read_wakers: Default::default(), + chan_write_wakers: Default::default(), }; let inner = Mutex::new(inner); @@ -33,16 +36,29 @@ impl<'a> EmbassySunset<'a> { Self { inner, progress_notify, - read_waker: AtomicWaker::new(), - write_waker: AtomicWaker::new(), } } - pub async fn progress(&mut self, + pub async fn progress(&self, b: &mut Behaviour<'_>) -> Result<()> { let mut inner = self.inner.lock().await; - inner.runner.progress(b).await + inner.runner.progress(b).await?; + + if let Some((chan, _ext)) = inner.runner.ready_channel_input() { + inner.chan_read_wakers[chan as usize].wake() + } + + for chan in 0..MAX_CHANNELS { + if inner.runner.ready_channel_send(chan as u32).unwrap_or(0) > 0 { + inner.chan_write_wakers[chan].wake() + } + } + + // idle until input is received + // TODO do we also want to wake in other situations? + self.progress_notify.wait().await; + Ok(()) } pub async fn read(&self, buf: &mut [u8]) -> Result<usize> { @@ -50,7 +66,10 @@ impl<'a> EmbassySunset<'a> { let r = match self.inner.try_lock() { Ok(mut inner) => { match inner.runner.output(buf) { - Ok(0) => Poll::Pending, + Ok(0) => { + inner.runner.set_output_waker(cx.waker()); + Poll::Pending + } Ok(n) => Poll::Ready(Ok(n)), Err(e) => Poll::Ready(Err(e)), } @@ -58,9 +77,6 @@ impl<'a> EmbassySunset<'a> { Err(_) => Poll::Pending, }; - if r.is_pending() { - self.read_waker.register(cx.waker()) - } r }) .await @@ -72,7 +88,10 @@ impl<'a> EmbassySunset<'a> { Ok(mut inner) => { if inner.runner.ready_input() { match inner.runner.input(buf) { - Ok(0) => Poll::Pending, + Ok(0) => { + inner.runner.set_input_waker(cx.waker()); + Poll::Pending + }, Ok(n) => Poll::Ready(Ok(n)), Err(e) => Poll::Ready(Err(e)), } @@ -83,11 +102,13 @@ impl<'a> EmbassySunset<'a> { Err(_) => Poll::Pending, }; - if r.is_pending() { - self.write_waker.register(cx.waker()) + if r.is_ready() { + self.progress_notify.signal(()) } r }) .await } + + // pub async fn read_channel(&self, buf: &mut [u8]) -> Result<usize> { } diff --git a/src/main.rs b/src/main.rs index 12f9bd09516afabbc2a24979356df3b780efbd0e..dd25d5dae7b17acd847f14c0958b6fad1eef8aa7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use embassy_executor::Spawner; use embassy_net::tcp::TcpSocket; use embassy_net::{Stack, StackResources}; use embassy_rp::gpio::{Flex, Level, Output}; +use embassy_futures::join::join3; use embassy_rp::peripherals::{PIN_23, PIN_24, PIN_25, PIN_29}; use embedded_hal_async::spi::{ExclusiveDevice, SpiBusFlush, SpiBusRead, SpiBusWrite}; use embedded_io::asynch::{Read, Write}; @@ -139,7 +140,8 @@ async fn main(spawner: Spawner) { } } -#[embassy_executor::task] +// TODO: pool_size should be NUM_LISTENERS but needs a literal +#[embassy_executor::task(pool_size = 4)] async fn listener(stack: &'static Stack<cyw43::NetDevice<'static>>) -> ! { let mut rx_buffer = [0; 4096]; let mut tx_buffer = [0; 4096]; @@ -155,7 +157,7 @@ async fn listener(stack: &'static Stack<cyw43::NetDevice<'static>>) -> ! { } let r = session(&mut socket).await; - if let Err(e) = r { + if let Err(_e) = r { // warn!("Ended with error: {:?}", e); warn!("Ended with error"); } @@ -248,28 +250,47 @@ async fn session(socket: &mut TcpSocket<'_>) -> sunset::Result<()> { let serv = server::SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf, &mut app)?; let serv = &serv; + let (mut rsock, mut wsock) = socket.split(); + let tx = async { loop { // TODO: make sunset read directly from socket, no intermediate buffer. let mut buf = [0; 1024]; let l = serv.read(&mut buf).await?; - let buf = &buf[..l]; - socket.write(buf).await.expect("write"); + let mut buf = &buf[..l]; + while buf.len() > 0 { + let n = wsock.write(buf).await.expect("TODO handle write error"); + buf = &buf[n..]; + } } #[allow(unreachable_code)] Ok::<_, sunset::Error>(()) }; - // let rx = async { - // loop { - // // TODO: make sunset read directly from socket, no intermediate buffer. - // let mut buf = [0; 1024]; - // let l = socket.read(&mut buf).await.expect("read"); - // let buf = &buf[..l]; - // serv.write(&buf).await?; - // } - // #[allow(unreachable_code)] - // Ok::<_, sunset::Error>(()) - // }; + + let rx = async { + loop { + // TODO: make sunset read directly from socket, no intermediate buffer. + let mut buf = [0; 1024]; + let l = rsock.read(&mut buf).await.expect("TODO handle read error"); + let mut buf = &buf[..l]; + while buf.len() > 0 { + let n = serv.write(&buf).await?; + buf = &buf[n..]; + } + } + #[allow(unreachable_code)] + Ok::<_, sunset::Error>(()) + }; + + let prog = async { + loop { + serv.progress(&mut app).await?; + } + #[allow(unreachable_code)] + Ok::<_, sunset::Error>(()) + }; + join3(rx, tx, prog).await; + Ok(()) } diff --git a/src/server.rs b/src/server.rs index 557935106602cde36ad1cbf9268bc3099817f78b..9b1765af2c08607fcd49e75212fdc42036331b08 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,7 +18,7 @@ impl<'a> SSHServer<'a> { } pub async fn progress( - &mut self, + &self, b: &mut (dyn ServBehaviour + Send), ) -> Result<()> {