From 1548efbe37abc77c6d2c4d4e48a2d6b4e558db85 Mon Sep 17 00:00:00 2001 From: Matt Johnston <matt@ucc.asn.au> Date: Sun, 6 Nov 2022 23:35:26 +0800 Subject: [PATCH] Move run() into common code std and picow embassy server demos work, no channels yet --- embassy/Cargo.toml | 2 + embassy/README.md | 5 +- embassy/demos/picow/Cargo.lock | 3 + embassy/demos/picow/src/main.rs | 155 ++++++++------------------------ embassy/demos/std/Cargo.lock | 2 + embassy/demos/std/src/main.rs | 108 +++++----------------- embassy/src/embassy_sunset.rs | 1 + embassy/src/server.rs | 52 +++++++++++ 8 files changed, 118 insertions(+), 210 deletions(-) diff --git a/embassy/Cargo.toml b/embassy/Cargo.toml index b78c034..864b040 100644 --- a/embassy/Cargo.toml +++ b/embassy/Cargo.toml @@ -5,6 +5,8 @@ edition = "2021" [dependencies] embassy-sync = { version = "0.1.0" } +embassy-net = { version = "0.1.0" } +embassy-futures = { version = "0.1.0" } pin-utils = { version = "0.1" } sunset = { path = "../" } diff --git a/embassy/README.md b/embassy/README.md index 20bb872..8675d38 100644 --- a/embassy/README.md +++ b/embassy/README.md @@ -1,10 +1,11 @@ # Toolchain -Embassy requires Rust nightly, often with a specific version. [`rust-toolchain.toml`] records a known-good version. +Embassy requires Rust nightly, often with a specific version. +[`rust-toolchain.toml`](rust-toolchain.toml) records a known-good version. # Demos -[`demos/`] has some examples. These are separate crates since they have fairly distinct dependencies. +[`demos/`](demos/) has some examples. These are separate crates since they have fairly distinct dependencies. ## `picow` diff --git a/embassy/demos/picow/Cargo.lock b/embassy/demos/picow/Cargo.lock index 6ff60eb..6ead602 100644 --- a/embassy/demos/picow/Cargo.lock +++ b/embassy/demos/picow/Cargo.lock @@ -1354,8 +1354,11 @@ dependencies = [ name = "sunset-embassy" version = "0.2.0-alpha" dependencies = [ + "embassy-futures", + "embassy-net", "embassy-sync 0.1.0", "log", + "pin-utils", "sunset", ] diff --git a/embassy/demos/picow/src/main.rs b/embassy/demos/picow/src/main.rs index baea62a..baa0bf1 100644 --- a/embassy/demos/picow/src/main.rs +++ b/embassy/demos/picow/src/main.rs @@ -12,7 +12,7 @@ use embassy_sync::blocking_mutex::raw::NoopRawMutex; use embassy_net::tcp::TcpSocket; use embassy_net::{Stack, StackResources}; use embassy_rp::gpio::{Flex, Level, Output}; -use embassy_futures::join::join3; +use embassy_futures::join::join; use embassy_rp::peripherals::{PIN_23, PIN_24, PIN_25, PIN_29}; use embedded_hal_async::spi::{ExclusiveDevice, SpiBusFlush, SpiBusRead, SpiBusWrite}; use embedded_io::asynch::{Read, Write}; @@ -45,30 +45,6 @@ macro_rules! singleton { }}; } -// fn run() -> sunset::Result<()> { -// let mut x = [0u8; 500]; - -// let mut inbuf = [0u8; 1000]; -// let mut outbuf = [0u8; 1000]; -// let mut runner = sunset::Runner::new_server(&mut inbuf, &mut outbuf)?; -// let mut cli = SSHClient {}; -// let mut cli = sunset::Behaviour::new_client(&mut cli); - -// let mut pollctx = Context::from_waker(noop_waker_ref()); - -// runner.input(&x)?; -// let l = runner.progress(&mut cli); -// pin_mut!(l); -// let _ = l.poll(&mut pollctx); -// // runner.output(&mut x).unwrap(); - -// // tx.write(b'x').unwrap(); -// // write!(tx, "{}", x[0]); - -// Ok(()) -// } - - #[embassy_executor::task] async fn net_task(stack: &'static Stack<cyw43::NetDevice<'static>>) -> ! { stack.run().await @@ -114,8 +90,8 @@ async fn main(spawner: Spawner) { let net_device = control.init(clm).await; //control.join_open(env!("WIFI_NETWORK")).await; - // control.join_wpa2(env!("WIFI_NETWORK"), env!("WIFI_PASSWORD")).await; - control.join_wpa2("WIFI_NETWORK", "WIFI_PASSWORD").await; + control.join_wpa2(env!("WIFI_NETWORK"), env!("WIFI_PASSWORD")).await; + //control.join_wpa2("WIFI_NETWORK", "WIFI_PASSWORD").await; let config = embassy_net::ConfigStrategy::Dhcp; //let config = embassy_net::ConfigStrategy::Static(embassy_net::Config { @@ -166,32 +142,32 @@ async fn listener(stack: &'static Stack<cyw43::NetDevice<'static>>) -> ! { } struct DemoServer { - // keys: Vec<SignKey>, + keys: [SignKey; 1], sess: Option<u32>, want_shell: bool, shell_started: bool, + + notify: Signal<NoopRawMutex, ()>, } impl DemoServer { fn new() -> Result<Self> { - // let keys = keyfiles.iter().map(|f| { - // read_key(f).with_context(|| format!("loading key {f}")) - // }).collect::<Result<Vec<SignKey>>>()?; + let keys = [SignKey::generate(KeyType::Ed25519)?]; Ok(Self { sess: None, - // keys, + keys, want_shell: false, shell_started: false, + notify: Signal::new(), }) } } impl ServBehaviour for DemoServer { fn hostkeys(&mut self) -> BhResult<&[SignKey]> { - todo!() - // Ok(&self.keys) + Ok(&self.keys) } @@ -203,6 +179,12 @@ impl ServBehaviour for DemoServer { true } + fn auth_unchallenged(&mut self, username: TextString) -> bool { + let u = username.as_str().unwrap_or("mystery user"); + info!("Allowing auth for user {}", u); + true + } + fn auth_password(&mut self, user: TextString, password: TextString) -> bool { user.as_str().unwrap_or("") == "matt" && password.as_str().unwrap_or("") == "pw" } @@ -242,91 +224,24 @@ impl ServBehaviour for DemoServer { } async fn session(socket: &mut TcpSocket<'_>) -> sunset::Result<()> { - let mut app = DemoServer::new()?; - - let mut ssh_rxbuf = [0; 1024]; - let mut ssh_txbuf = [0; 1024]; - let serv = SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf, &mut app)?; - let serv = &serv; - - let app = Mutex::<NoopRawMutex, _>::new(app); - - let (mut rsock, mut wsock) = socket.split(); - - let tx = async { - loop { - // TODO: make sunset read directly from socket, no intermediate buffer. - let mut buf = [0; 1024]; - let l = serv.read(&mut buf).await?; - let mut buf = &buf[..l]; - while buf.len() > 0 { - let n = wsock.write(buf).await.expect("TODO handle write error"); - buf = &buf[n..]; - } - } - #[allow(unreachable_code)] - Ok::<_, sunset::Error>(()) - }; - - let rx = async { - loop { - // TODO: make sunset read directly from socket, no intermediate buffer. - let mut buf = [0; 1024]; - let l = rsock.read(&mut buf).await.expect("TODO handle read error"); - let mut buf = &buf[..l]; - while buf.len() > 0 { - let n = serv.write(&buf).await?; - buf = &buf[n..]; - } - } - #[allow(unreachable_code)] - Ok::<_, sunset::Error>(()) - }; - - let prog = async { - loop { - serv.progress(&app).await?; - } - #[allow(unreachable_code)] - Ok::<_, sunset::Error>(()) - }; - join3(rx, tx, prog).await; - - Ok(()) -} + let mut app = DemoServer::new()?; + + let mut ssh_rxbuf = [0; 2000]; + let mut ssh_txbuf = [0; 2000]; + let serv = SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf, &mut app)?; + let serv = &serv; + + let app = Mutex::<NoopRawMutex, _>::new(app); - // info!("Received connection from {:?}", socket.remote_endpoint()); - - // #[embassy_executor::task] - // async fn from_wir(stack: &'static Stack<cyw43::NetDevice<'static>>) -> ! { - // stack.run().await - // } - - // spawner.spawn(sunset_task(runner)).unwrap(); - - // run().unwrap(); - - // loop { - // let n = match socket.read(&mut buf).await { - // Ok(0) => { - // warn!("read EOF"); - // break; - // } - // Ok(n) => n, - // Err(e) => { - // warn!("read error: {:?}", e); - // break; - // } - // }; - - // info!("rxd {:02x}", &buf[..n]); - - // match socket.write(&buf[..n]).await { - // Ok(_sz) => {} - // Err(e) => { - // warn!("write error: {:?}", e); - // break; - // } - // }; - // } + let run = serv.run(socket, &app); + let session = async { + loop { + } + }; + + // TODO: handle results + join(run, session).await; + + Ok(()) +} diff --git a/embassy/demos/std/Cargo.lock b/embassy/demos/std/Cargo.lock index cb24dc9..8f17a03 100644 --- a/embassy/demos/std/Cargo.lock +++ b/embassy/demos/std/Cargo.lock @@ -1288,6 +1288,8 @@ dependencies = [ name = "sunset-embassy" version = "0.2.0-alpha" dependencies = [ + "embassy-futures", + "embassy-net", "embassy-sync 0.1.0", "log", "pin-utils", diff --git a/embassy/demos/std/src/main.rs b/embassy/demos/std/src/main.rs index 96841eb..abd740b 100644 --- a/embassy/demos/std/src/main.rs +++ b/embassy/demos/std/src/main.rs @@ -43,29 +43,6 @@ macro_rules! singleton { }}; } -// fn run() -> sunset::Result<()> { -// let mut x = [0u8; 500]; - -// let mut inbuf = [0u8; 1000]; -// let mut outbuf = [0u8; 1000]; -// let mut runner = sunset::Runner::new_server(&mut inbuf, &mut outbuf)?; -// let mut cli = SSHClient {}; -// let mut cli = sunset::Behaviour::new_client(&mut cli); - -// let mut pollctx = Context::from_waker(noop_waker_ref()); - -// runner.input(&x)?; -// let l = runner.progress(&mut cli); -// pin_mut!(l); -// let _ = l.poll(&mut pollctx); -// // runner.output(&mut x).unwrap(); - -// // tx.write(b'x').unwrap(); -// // write!(tx, "{}", x[0]); - -// Ok(()) -// } - #[embassy_executor::task] async fn net_task(stack: &'static Stack<TunTapDevice>) -> ! { stack.run().await @@ -82,7 +59,6 @@ async fn main_task(spawner: Spawner) { // Init network device let device = TunTapDevice::new(opt_tap0).unwrap(); - let seed = OsRng.next_u64(); // Init network stack @@ -126,7 +102,7 @@ async fn listener(stack: &'static Stack<TunTapDevice>) -> ! { } struct DemoServer { - // keys: Vec<SignKey>, + keys: [SignKey; 1], sess: Option<u32>, want_shell: bool, @@ -135,13 +111,12 @@ struct DemoServer { impl DemoServer { fn new() -> Result<Self> { - // let keys = keyfiles.iter().map(|f| { - // read_key(f).with_context(|| format!("loading key {f}")) - // }).collect::<Result<Vec<SignKey>>>()?; + + let keys = [SignKey::generate(KeyType::Ed25519)?]; Ok(Self { sess: None, - // keys, + keys, want_shell: false, shell_started: false, }) @@ -150,8 +125,7 @@ impl DemoServer { impl ServBehaviour for DemoServer { fn hostkeys(&mut self) -> BhResult<&[SignKey]> { - todo!("hostkeys()") - // Ok(&self.keys) + Ok(&self.keys) } @@ -160,6 +134,11 @@ impl ServBehaviour for DemoServer { } fn have_auth_pubkey(&self, user: TextString) -> bool { + false + } + + fn auth_unchallenged(&mut self, username: TextString) -> bool { + info!("Allowing auth for user {:?}", username.as_str()); true } @@ -202,63 +181,16 @@ impl ServBehaviour for DemoServer { } async fn session(socket: &mut TcpSocket<'_>) -> sunset::Result<()> { - let mut app = DemoServer::new()?; - - let mut ssh_rxbuf = [0; 4000]; - let mut ssh_txbuf = [0; 4000]; - let serv = SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf, &mut app)?; - let serv = &serv; - - let app = Mutex::<NoopRawMutex, _>::new(app); - - let (mut rsock, mut wsock) = socket.split(); - - let tx = async { - loop { - // TODO: make sunset read directly from socket, no intermediate buffer. - let mut buf = [0; 1024]; - trace!("tx read"); - let l = serv.read(&mut buf).await?; - trace!("tx read done"); - let mut buf = &buf[..l]; - while buf.len() > 0 { - let n = wsock.write(buf).await.expect("TODO handle write error"); - buf = &buf[n..]; - } - trace!("tx write done"); - } - #[allow(unreachable_code)] - Ok::<_, sunset::Error>(()) - }; - - let rx = async { - loop { - // TODO: make sunset read directly from socket, no intermediate buffer. - let mut buf = [0; 1024]; - trace!("rx read"); - let l = rsock.read(&mut buf).await.expect("TODO handle read error"); - trace!("rx read done {l}"); - let mut buf = &buf[..l]; - while buf.len() > 0 { - let n = serv.write(&buf).await?; - buf = &buf[n..]; - } - trace!("rx write done"); - } - #[allow(unreachable_code)] - Ok::<_, sunset::Error>(()) - }; - - let prog = async { - loop { - serv.progress(&app).await?; - } - #[allow(unreachable_code)] - Ok::<_, sunset::Error>(()) - }; - join3(rx, tx, prog).await; - - Ok(()) + let mut app = DemoServer::new()?; + + let mut ssh_rxbuf = [0; 2000]; + let mut ssh_txbuf = [0; 2000]; + let serv = SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf, &mut app)?; + let serv = &serv; + + let app = Mutex::<NoopRawMutex, _>::new(app); + + serv.run(socket, &app).await } static EXECUTOR: StaticCell<Executor> = StaticCell::new(); diff --git a/embassy/src/embassy_sunset.rs b/embassy/src/embassy_sunset.rs index 2646ec2..31f4b12 100644 --- a/embassy/src/embassy_sunset.rs +++ b/embassy/src/embassy_sunset.rs @@ -83,6 +83,7 @@ impl<'a> EmbassySunset<'a> { } // inner dropped } + warn!("progress unlocked"); // idle until input is received // TODO do we also want to wake in other situations? diff --git a/embassy/src/server.rs b/embassy/src/server.rs index 742cb49..b5958cb 100644 --- a/embassy/src/server.rs +++ b/embassy/src/server.rs @@ -1,5 +1,7 @@ use embassy_sync::mutex::Mutex; use embassy_sync::blocking_mutex::raw::{NoopRawMutex, RawMutex}; +use embassy_futures::join::join3; +use embassy_net::tcp::TcpSocket; use sunset::*; @@ -28,6 +30,56 @@ impl<'a> SSHServer<'a> { self.sunset.progress_server(b).await } + pub async fn run<M>(&self, socket: &mut TcpSocket<'_>, b: &Mutex<M, impl ServBehaviour>) -> Result<()> + where M: RawMutex + { + let (mut rsock, mut wsock) = socket.split(); + + let tx = async { + loop { + // TODO: make sunset read directly from socket, no intermediate buffer. + let mut buf = [0; 1024]; + let l = self.read(&mut buf).await?; + let mut buf = &buf[..l]; + while buf.len() > 0 { + let n = wsock.write(buf).await.expect("TODO handle write error"); + buf = &buf[n..]; + } + } + #[allow(unreachable_code)] + Ok::<_, sunset::Error>(()) + }; + + let rx = async { + loop { + // TODO: make sunset read directly from socket, no intermediate buffer. + let mut buf = [0; 1024]; + let l = rsock.read(&mut buf).await.expect("TODO handle read error"); + let mut buf = &buf[..l]; + while buf.len() > 0 { + let n = self.write(&buf).await?; + buf = &buf[n..]; + } + } + #[allow(unreachable_code)] + Ok::<_, sunset::Error>(()) + }; + + let prog = async { + loop { + self.progress(b).await?; + } + #[allow(unreachable_code)] + Ok::<_, sunset::Error>(()) + }; + + + // TODO: handle results + join3(rx, tx, prog).await; + + Ok(()) + } + // pub async fn channel(&mut self, ch: u32) -> Result<(ChanInOut<'a>, Option<ChanExtOut<'a>>)> { // let ty = self.sunset.with_runner(|r| r.channel_type(ch)).await?; // let inout = ChanInOut::new(ch, &self.sunset); -- GitLab