From c797b494fe7824a0a5d89f9cb4909bc3ccb9283a Mon Sep 17 00:00:00 2001
From: Matt Johnston <matt@ucc.asn.au>
Date: Thu, 23 Jun 2022 23:09:49 +0800
Subject: [PATCH] use moro and single thread

---
 Cargo.lock             | 179 +++++++++++++++++++++++++++++++++++------
 async/Cargo.toml       |   1 +
 async/examples/con1.rs |  97 +++++++++++-----------
 3 files changed, 203 insertions(+), 74 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1a8175f..8d47f97 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -63,11 +63,22 @@ version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bbf56136a5198c7b01a49e3afcbef6cf84597273d298f54432926024107b0109"
 
+[[package]]
+name = "async-channel"
+version = "1.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
+dependencies = [
+ "concurrent-queue",
+ "event-listener",
+ "futures-core 0.3.21",
+]
+
 [[package]]
 name = "async-trait"
-version = "0.1.53"
+version = "0.1.56"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
+checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -173,6 +184,12 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
 
+[[package]]
+name = "cache-padded"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
+
 [[package]]
 name = "cfg-if"
 version = "1.0.0"
@@ -200,6 +217,15 @@ dependencies = [
  "inout",
 ]
 
+[[package]]
+name = "concurrent-queue"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
+dependencies = [
+ "cache-padded",
+]
+
 [[package]]
 name = "const-oid"
 version = "0.7.1"
@@ -318,9 +344,10 @@ dependencies = [
  "argh",
  "async-trait",
  "door-sshproto",
- "futures",
+ "futures 0.4.0-alpha.0",
  "libc",
  "log",
+ "moro",
  "nix",
  "pretty-hex",
  "rpassword",
@@ -390,6 +417,12 @@ dependencies = [
  "void",
 ]
 
+[[package]]
+name = "event-listener"
+version = "2.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
+
 [[package]]
 name = "fastrand"
 version = "1.7.0"
@@ -405,18 +438,43 @@ version = "1.0.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 
+[[package]]
+name = "futures"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
+dependencies = [
+ "futures-channel 0.3.21",
+ "futures-core 0.3.21",
+ "futures-executor 0.3.21",
+ "futures-io 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-sink 0.3.21",
+ "futures-task 0.3.21",
+ "futures-util 0.3.21",
+]
+
 [[package]]
 name = "futures"
 version = "0.4.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 dependencies = [
- "futures-channel",
- "futures-core",
- "futures-executor",
- "futures-io",
- "futures-sink",
- "futures-task",
- "futures-util",
+ "futures-channel 0.4.0-alpha.0",
+ "futures-core 1.0.0-alpha.0",
+ "futures-executor 0.4.0-alpha.0",
+ "futures-io 0.3.21 (git+https://github.com/rust-lang/futures-rs)",
+ "futures-sink 0.4.0-alpha.0",
+ "futures-task 0.4.0-alpha.0",
+ "futures-util 0.4.0-alpha.0",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
+dependencies = [
+ "futures-core 0.3.21",
+ "futures-sink 0.3.21",
 ]
 
 [[package]]
@@ -424,30 +482,64 @@ name = "futures-channel"
 version = "0.4.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 dependencies = [
- "futures-core",
- "futures-sink",
+ "futures-core 1.0.0-alpha.0",
+ "futures-sink 0.4.0-alpha.0",
 ]
 
+[[package]]
+name = "futures-core"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+
 [[package]]
 name = "futures-core"
 version = "1.0.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 
+[[package]]
+name = "futures-executor"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
+dependencies = [
+ "futures-core 0.3.21",
+ "futures-task 0.3.21",
+ "futures-util 0.3.21",
+]
+
 [[package]]
 name = "futures-executor"
 version = "0.4.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 dependencies = [
- "futures-core",
- "futures-task",
- "futures-util",
+ "futures-core 1.0.0-alpha.0",
+ "futures-task 0.4.0-alpha.0",
+ "futures-util 0.4.0-alpha.0",
 ]
 
+[[package]]
+name = "futures-io"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
+
 [[package]]
 name = "futures-io"
 version = "0.3.21"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "futures-macro"
 version = "0.4.0-alpha.0"
@@ -458,27 +550,57 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "futures-sink"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
+
 [[package]]
 name = "futures-sink"
 version = "0.4.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 
+[[package]]
+name = "futures-task"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
+
 [[package]]
 name = "futures-task"
 version = "0.4.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 
+[[package]]
+name = "futures-util"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
+dependencies = [
+ "futures-channel 0.3.21",
+ "futures-core 0.3.21",
+ "futures-io 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-macro 0.3.21",
+ "futures-sink 0.3.21",
+ "futures-task 0.3.21",
+ "memchr",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
 [[package]]
 name = "futures-util"
 version = "0.4.0-alpha.0"
 source = "git+https://github.com/rust-lang/futures-rs#7f2603402a1ffbf6ad3a31f15598b72216bec242"
 dependencies = [
- "futures-channel",
- "futures-core",
- "futures-io",
- "futures-macro",
- "futures-sink",
- "futures-task",
+ "futures-channel 0.4.0-alpha.0",
+ "futures-core 1.0.0-alpha.0",
+ "futures-io 0.3.21 (git+https://github.com/rust-lang/futures-rs)",
+ "futures-macro 0.4.0-alpha.0",
+ "futures-sink 0.4.0-alpha.0",
+ "futures-task 0.4.0-alpha.0",
  "memchr",
  "pin-project-lite",
  "pin-utils",
@@ -643,6 +765,17 @@ dependencies = [
  "windows-sys",
 ]
 
+[[package]]
+name = "moro"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8472c674b8319e7529bfdb3c51216810e36727be2056136d07130a0b1c132df6"
+dependencies = [
+ "async-channel",
+ "async-trait",
+ "futures 0.3.21",
+]
+
 [[package]]
 name = "nb"
 version = "0.1.3"
@@ -1205,9 +1338,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
 
 [[package]]
 name = "syn"
-version = "1.0.95"
+version = "1.0.98"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942"
+checksum = "c50aef8a904de4c23c788f104b7dddc7d6f79c647c7c8ce4cc8f73eb0ca773dd"
 dependencies = [
  "proc-macro2",
  "quote",
diff --git a/async/Cargo.toml b/async/Cargo.toml
index 0f2227a..c0880fc 100644
--- a/async/Cargo.toml
+++ b/async/Cargo.toml
@@ -24,6 +24,7 @@ tokio = { version = "1.17", features = ["sync", "net"] }
 # require alpha for https://github.com/rust-lang/futures-rs/pull/2571
 futures = { version = "0.4.0-alpha.0", git = "https://github.com/rust-lang/futures-rs", revision = "8b0f812f53ada0d0aeb74abc32be22ab9dafae05" }
 async-trait = "0.1"
+moro = "0.4"
 
 libc = "0.2"
 nix = "0.24"
diff --git a/async/examples/con1.rs b/async/examples/con1.rs
index cacb500..67ead30 100644
--- a/async/examples/con1.rs
+++ b/async/examples/con1.rs
@@ -75,7 +75,7 @@ fn main() -> Result<()> {
     // logging uses the timezone, so we can't use async main.
     setup_log(&args);
 
-    tokio::runtime::Builder::new_multi_thread()
+    tokio::runtime::Builder::new_current_thread()
         .enable_all()
         .build()
         .unwrap()
@@ -143,56 +143,51 @@ async fn run(args: &Args) -> Result<()> {
     let mut s = door.socket();
     let netloop = tokio::io::copy_bidirectional(&mut stream, &mut s);
 
-    let prog = tokio::spawn(async move {
-
-        loop {
-            let ev = door.progress(|ev| {
-                trace!("progress event {ev:?}");
-                let e = match ev {
-                    Event::Authenticated => Some(Event::Authenticated),
-                    _ => None,
-                };
-                Ok(e)
-            }).await.context("progress loop")?;
-
-            match ev {
-                Some(Event::Authenticated) => {
-                    info!("Opening a new session channel");
-                    let r = door.open_client_session_nopty(Some("cowsay it works")).await
-                        .context("Opening session")?;
-                    let (mut io, mut err) = r;
-                    tokio::spawn(async move {
-                        let mut i = door_async::stdin()?;
-                        let mut o = door_async::stdout()?;
-                        let mut e = door_async::stderr()?;
-                        let mut io2 = io.clone();
-                        let co = tokio::io::copy(&mut io, &mut o);
-                        let ci = tokio::io::copy(&mut i, &mut io2);
-                        let ce = tokio::io::copy(&mut err, &mut e);
-                        let (r1, r2, r3) = futures::join!(co, ci, ce);
-                        r1?;
-                        r2?;
-                        r3?;
-                        Ok::<_, anyhow::Error>(())
-                    });
-                    // TODO: handle channel completion
+    moro::async_scope!(|scope| {
+        scope.spawn(netloop);
+
+        scope.spawn(async {
+            loop {
+                let ev = door.progress(|ev| {
+                    trace!("progress event {ev:?}");
+                    let e = match ev {
+                        Event::Authenticated => Some(Event::Authenticated),
+                        _ => None,
+                    };
+                    Ok(e)
+                }).await.context("progress loop")?;
+
+                match ev {
+                    Some(Event::Authenticated) => {
+                        info!("Opening a new session channel");
+                        let r = door.open_client_session_nopty(Some("cowsay it works")).await
+                            .context("Opening session")?;
+                        let (mut io, mut err) = r;
+                        scope.spawn(async move {
+                            let mut i = door_async::stdin()?;
+                            let mut o = door_async::stdout()?;
+                            let mut e = door_async::stderr()?;
+                            let mut io2 = io.clone();
+                            let co = tokio::io::copy(&mut io, &mut o);
+                            let ci = tokio::io::copy(&mut i, &mut io2);
+                            let ce = tokio::io::copy(&mut err, &mut e);
+                            let (r1, r2, r3) = futures::join!(co, ci, ce);
+                            r1?;
+                            r2?;
+                            r3?;
+                            Ok::<_, anyhow::Error>(())
+                        });
+                        // TODO: handle channel completion
+                    }
+                    Some(_) => unreachable!(),
+                    None => {},
                 }
-                Some(_) => unreachable!(),
-                None => {},
             }
-        }
-        #[allow(unreachable_code)]
-        Ok::<_, anyhow::Error>(())
-    });
-
-    loop {
-        tokio::select! {
-            e = prog => {
-                bail!("progress loop {e:?}");
-            }
-            e = netloop => {
-                bail!("net loop {e:?}");
-            }
-        }
-    }
+            #[allow(unreachable_code)]
+            Ok::<_, anyhow::Error>(())
+
+        });
+    }).await;
+
+    Ok(())
 }
-- 
GitLab