diff --git a/async/Cargo.toml b/async/Cargo.toml index 6707cfd003005f15db20667a65250c6188a5ad09..424553642f19f3bf411af3900ae76f169fabfec8 100644 --- a/async/Cargo.toml +++ b/async/Cargo.toml @@ -39,7 +39,6 @@ pretty-hex = "0.3" [dev-dependencies] anyhow = { version = "1.0" } pretty-hex = "0.3" -simplelog = "0.12" whoami = "1.3" zeroize = "1.5" @@ -47,3 +46,7 @@ tokio = { version = "1.17", features = ["full"] } # adapters for tokio and async-std embedded-io = { version = "0.4", features = ["tokio"] } + +simplelog = "0.12" +# for simplelog +time = { version = "0.3", features = ["local-offset"] } diff --git a/async/examples/sunsetc.rs b/async/examples/sunsetc.rs index 65e61df247b1d09fc86ee3b44eb1e754c1150fe8..bc12b34ac6fc8899e3ea11e06edfadad50c10132 100644 --- a/async/examples/sunsetc.rs +++ b/async/examples/sunsetc.rs @@ -5,7 +5,7 @@ use { }; use anyhow::{Context, Result, anyhow, bail}; use argh::FromArgs; -use embassy_sync::{mutex::Mutex, blocking_mutex::raw::NoopRawMutex}; +use embassy_sync::mutex::Mutex; use tokio::net::TcpStream; use tokio::task::spawn_local; @@ -13,7 +13,7 @@ use tokio::task::spawn_local; use std::io::Read; use sunset::*; -use sunset_embassy::SSHClient; +use sunset_embassy::{SSHClient, SunsetRawMutex}; use sunset_async::{CmdlineClient, AgentClient}; @@ -22,6 +22,127 @@ use embedded_io::adapters::FromTokio; use zeroize::Zeroizing; use simplelog::*; +use time::UtcOffset; + +#[tokio::main] +async fn real_main(tz: UtcOffset) -> Result<()> { + let args = parse_args(tz)?; + + // TODO: currently we just run it all on a single thread. + // Running with tokio's normal multiple threads works fine + // if we change SunsetRawMutex to a CriticalSectionMutex + // (or something wrapping std::sync::Mutex) and make + // `CliBehaviour : Send`. But then embedded platforms won't work, + // need to figure how to make it configurable. + let local = tokio::task::LocalSet::new(); + local.run_until(run(args)).await +} + +fn main() { + // Crates won't let us read from environment variables once + // threading starts, so do it before tokio main. + let tz = UtcOffset::current_local_offset() + .unwrap_or(UtcOffset::UTC); + + if let Err(e) = real_main(tz) { + error!("Exit with error: {e}"); + } +} + +async fn run(args: Args) -> Result<()> { + + trace!("tracing main"); + debug!("verbose main"); + + if !args.cmd.is_empty() && args.subsystem.is_some() { + bail!("can't have '-s subsystem' with a command") + } + + let mut want_pty = true; + let cmd = if args.cmd.is_empty() { + None + } else { + want_pty = false; + Some(args.cmd.join(" ")) + }; + + if args.subsystem.is_some() { + want_pty = false; + } + + if args.force_no_pty { + want_pty = false + } + + let ssh_task = spawn_local(async move { + let mut rxbuf = Zeroizing::new(vec![0; 3000]); + let mut txbuf = Zeroizing::new(vec![0; 3000]); + let cli = SSHClient::new(&mut rxbuf, &mut txbuf)?; + + let mut app = CmdlineClient::new( + args.username.as_ref().unwrap(), + &args.host, + ); + + app.port(args.port); + + if want_pty { + app.pty(); + } + if let Some(c) = cmd { + app.exec(&c); + } + if let Some(c) = args.subsystem { + app.subsystem(&c); + } + for i in &args.identityfile { + app.add_authkey(read_key(&i).with_context(|| format!("loading key {i}"))?); + } + + let agent = load_agent_keys(&mut app).await; + if let Some(agent) = agent { + app.agent(agent); + } + + // Connect to a peer + let mut stream = TcpStream::connect((args.host.as_str(), args.port)).await?; + let (rsock, wsock) = stream.split(); + let mut rsock = FromTokio::new(rsock); + let mut wsock = FromTokio::new(wsock); + + let (hooks, mut cmd) = app.split(); + + let hooks = Mutex::<SunsetRawMutex, _>::new(hooks); + + let ssh = async { + let r = cli.run(&mut rsock, &mut wsock, &hooks).await; + trace!("ssh run finished"); + hooks.lock().await.exited().await; + r + }; + + // Circular reference here, cli -> cmd and cmd->cli + let session = cmd.run(&cli); + let session = async { + let r = session.await; + trace!("client session run finished"); + cli.exit().await; + r + }; + + let (res_ssh, res_session) = futures::future::join(ssh, session).await; + debug!("res_ssh {res_ssh:?}"); + debug!("res_session {res_session:?}"); + res_ssh?; + res_session?; + Ok::<_, anyhow::Error>(()) + }); + + match ssh_task.await { + Err(_) => Err(anyhow!("Sunset task panicked")), + Ok(r) => r, + } +} #[derive(argh::FromArgs)] /** Sunset SSH Client @@ -90,7 +211,7 @@ struct Args { option: Vec<String>, } -fn parse_args() -> Result<Args> { +fn parse_args(tz: UtcOffset) -> Result<Args> { let mut in_args = std::env::args(); // OpenSSH has some quirks such as -oCommand, so we pre-process the commandline. @@ -115,11 +236,7 @@ fn parse_args() -> Result<Args> { std::process::exit(1) }); - // time crate won't read TZ if we're threaded, in case someone - // tries to mutate shared state with setenv. - // https://github.com/rust-lang/rust/issues/90308 etc - // logging uses the timezone, so we can't use async main. - setup_log(&args)?; + setup_log(&args, tz)?; if args.username.is_none() { // user@host syntax. rsplit for usernames with @ in them @@ -139,31 +256,7 @@ fn parse_args() -> Result<Args> { Ok(args) } -fn try_main() -> Result<()> { - let args = parse_args()?; - - - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - // TODO: currently we just run it all on a single thread. - // once SunsetRawWaker is configurable we could run threaded, - // but would also need to have `Send` methods in `Behaviour` - // which currently isn't supported by async functions in traits. - let local = tokio::task::LocalSet::new(); - local.run_until(run(args)).await - }) -} - -fn main() { - if let Err(e) = try_main() { - error!("Exit with error: {e}"); - } -} - -fn setup_log(args: &Args) -> Result<()> { +fn setup_log(args: &Args, tz: UtcOffset) -> Result<()> { let mut conf = simplelog::ConfigBuilder::new(); let conf = conf .add_filter_allow_str("sunset") @@ -172,7 +265,7 @@ fn setup_log(args: &Args) -> Result<()> { .add_filter_ignore_str("sunset::traffic") .add_filter_ignore_str("sunset::runner") .add_filter_ignore_str("sunset_embassy") - .set_time_offset_to_local().expect("Couldn't get local timezone") + .set_time_offset(tz) .build(); let level = if args.trace { @@ -229,98 +322,3 @@ async fn load_agent_keys(app: &mut CmdlineClient) -> Option<AgentClient> { Some(agent) } -async fn run(args: Args) -> Result<()> { - - trace!("tracing main"); - debug!("verbose main"); - - if !args.cmd.is_empty() && args.subsystem.is_some() { - bail!("can't have '-s subsystem' with a command") - } - - let mut want_pty = true; - let cmd = if args.cmd.is_empty() { - None - } else { - want_pty = false; - Some(args.cmd.join(" ")) - }; - - if args.subsystem.is_some() { - want_pty = false; - } - - if args.force_no_pty { - want_pty = false - } - - let ssh_task = spawn_local(async move { - let mut rxbuf = Zeroizing::new(vec![0; 3000]); - let mut txbuf = Zeroizing::new(vec![0; 3000]); - let cli = SSHClient::new(&mut rxbuf, &mut txbuf)?; - - let mut app = CmdlineClient::new( - args.username.as_ref().unwrap(), - &args.host, - ); - - app.port(args.port); - - if want_pty { - app.pty(); - } - if let Some(c) = cmd { - app.exec(&c); - } - if let Some(c) = args.subsystem { - app.subsystem(&c); - } - for i in &args.identityfile { - app.add_authkey(read_key(&i).with_context(|| format!("loading key {i}"))?); - } - - let agent = load_agent_keys(&mut app).await; - if let Some(agent) = agent { - app.agent(agent); - } - - // Connect to a peer - let mut stream = TcpStream::connect((args.host.as_str(), args.port)).await?; - let (rsock, wsock) = stream.split(); - let mut rsock = FromTokio::new(rsock); - let mut wsock = FromTokio::new(wsock); - - let (hooks, mut cmd) = app.split(); - - let hooks = Mutex::<NoopRawMutex, _>::new(hooks); - // let bhooks = &hooks as &Mutex::<NoopRawMutex, CliBehaviour>; - - let ssh = async { - let r = cli.run(&mut rsock, &mut wsock, &hooks).await; - trace!("ssh run finished"); - hooks.lock().await.exited().await; - r - }; - - // Circular reference here, cli -> cmd and cmd->cli - let session = cmd.run(&cli); - let session = async { - let r = session.await; - trace!("client session run finished"); - cli.exit().await; - r - }; - - let (res_ssh, res_session) = futures::future::join(ssh, session).await; - debug!("res_ssh {res_ssh:?}"); - debug!("res_session {res_session:?}"); - res_ssh?; - res_session?; - Ok::<_, anyhow::Error>(()) - }); - - match ssh_task.await { - Err(_) => Err(anyhow!("Sunset task panicked")), - Ok(r) => r, - } -} diff --git a/embassy/src/embassy_sunset.rs b/embassy/src/embassy_sunset.rs index 3449f0619bdef2c692d0d10b60966082b005649b..57aa9745854a72c4e07d34a3c9c6cde775af950e 100644 --- a/embassy/src/embassy_sunset.rs +++ b/embassy/src/embassy_sunset.rs @@ -28,6 +28,7 @@ use sunset::config::MAX_CHANNELS; // For now we only support single-threaded executors. // In future this could be behind a cfg to allow different // RawMutex for std executors or other situations. +// Also requires making CliBehaviour : Send, etc. pub type SunsetRawMutex = NoopRawMutex; pub type SunsetMutex<T> = Mutex<SunsetRawMutex, T>;