diff --git a/embassy/demos/picow/src/takepipe.rs b/embassy/demos/picow/src/takepipe.rs index 64786f5dbb189e8148613078bc7f70f7f277d48b..9f455e9b6bff3d23dd42be9615d55ceac05db5ad 100644 --- a/embassy/demos/picow/src/takepipe.rs +++ b/embassy/demos/picow/src/takepipe.rs @@ -1,3 +1,5 @@ +use core::ops::DerefMut; + use embedded_io::{asynch, Io}; use embassy_sync::{pipe, mutex::{MutexGuard, Mutex}, signal::Signal}; @@ -9,6 +11,8 @@ use sunset_embassy::{SunsetMutex, SunsetRawMutex}; pub const READ_SIZE: usize = 4000; pub const WRITE_SIZE: usize = 64; +// TODO: this is fairly ugly, the mutex and counter could perhaps be abstracted + /// Allows a bidirectional pipe to be shared by many endpoints /// /// One end of the pipe is fixed (attached to eg a physical/virtual @@ -23,6 +27,7 @@ pub(crate) struct TakePipe { fanout: Pipe<SunsetRawMutex, READ_SIZE>, fanin: Pipe<SunsetRawMutex, WRITE_SIZE>, wake: Signal<SunsetRawMutex, ()>, + counter: u64, } impl TakePipe { @@ -32,8 +37,8 @@ impl TakePipe { pub fn base(&self) -> TakeBase { TakeBase { - shared_read: Mutex::new(self.fanout.reader()), - shared_write: Mutex::new(self.fanin.writer()), + shared_read: Mutex::new((0, self.fanout.reader())), + shared_write: Mutex::new((0, self.fanin.writer())), pipe: self, } } @@ -45,21 +50,28 @@ impl Default for TakePipe { fanout: Pipe::new(), fanin: Pipe::new(), wake: Signal::new(), + counter: 0, } } } pub(crate) struct TakeBase<'a> { - shared_read: Mutex<SunsetRawMutex, pipe::Reader<'a, SunsetRawMutex, READ_SIZE>>, - shared_write: Mutex<SunsetRawMutex, pipe::Writer<'a, SunsetRawMutex, WRITE_SIZE>>, + shared_read: Mutex<SunsetRawMutex, (u64, pipe::Reader<'a, SunsetRawMutex, READ_SIZE>)>, + shared_write: Mutex<SunsetRawMutex, (u64, pipe::Writer<'a, SunsetRawMutex, WRITE_SIZE>)>, pipe: &'a TakePipe, } impl<'a> TakeBase<'a> { pub async fn take(&'a self) -> (TakeRead<'a>, TakeWrite<'a>) { + self.pipe.wake.signal(()); - let r = self.shared_read.lock().await; - let w = self.shared_write.lock().await; + let mut lr = self.shared_read.lock().await; + let (cr, _r) = lr.deref_mut(); + let mut lw = self.shared_write.lock().await; + let (cw, _w) = lw.deref_mut(); + *cr += 1; + *cw += 1; + debug_assert!(*cr == *cw); // We could .clear() the pipes, but // that wouldn't deal with data that has already progressed // further along out the SSH channel etc. So we leave that @@ -68,11 +80,13 @@ impl<'a> TakeBase<'a> { let r = TakeRead { pipe: self.pipe, - shared: Some(r), + shared: Some(&self.shared_read), + counter: *cr, }; let w = TakeWrite { pipe: self.pipe, - shared: Some(w), + shared: Some(&self.shared_write), + counter: *cw, }; (r, w) } @@ -118,24 +132,34 @@ impl Io for TakeBaseWrite<'_> { pub(crate) struct TakeRead<'a> { pipe: &'a TakePipe, - shared: Option<MutexGuard<'a, SunsetRawMutex, pipe::Reader<'a, SunsetRawMutex, READ_SIZE>>>, + shared: Option<&'a SunsetMutex<(u64, pipe::Reader<'a, SunsetRawMutex, READ_SIZE>)>>, + counter: u64, } impl asynch::Read for TakeRead<'_> { async fn read(&mut self, buf: &mut [u8]) -> sunset::Result<usize> { - let p = self.shared.as_ref().ok_or(sunset::Error::ChannelEOF)?; + let p = self.shared.ok_or(sunset::Error::ChannelEOF)?; + + let op = async { + let mut p = p.lock().await; + let (c, o) = p.deref_mut(); + if *c != self.counter { + return Err(sunset::Error::ChannelEOF); + } + Ok(o.read(buf).await) + }; let r = select( - p.read(buf), + op, self.pipe.wake.wait(), ); match r.await { // read completed - Either::First(l) => Ok(l), + Either::First(l) => l, // lost the pipe - Either::Second(l) => { + Either::Second(()) => { self.shared = None; Err(sunset::Error::ChannelEOF) } @@ -149,21 +173,31 @@ impl Io for TakeRead<'_> { pub(crate) struct TakeWrite<'a> { pipe: &'a TakePipe, - shared: Option<MutexGuard<'a, SunsetRawMutex, pipe::Writer<'a, SunsetRawMutex, WRITE_SIZE>>>, + shared: Option<&'a SunsetMutex<(u64, pipe::Writer<'a, SunsetRawMutex, WRITE_SIZE>)>>, + counter: u64, } impl asynch::Write for TakeWrite<'_> { async fn write(&mut self, buf: &[u8]) -> sunset::Result<usize> { - let p = self.shared.as_ref().ok_or(sunset::Error::ChannelEOF)?; + let p = self.shared.ok_or(sunset::Error::ChannelEOF)?; + + let op = async { + let mut p = p.lock().await; + let (c, o) = p.deref_mut(); + if *c != self.counter { + return Err(sunset::Error::ChannelEOF); + } + Ok(o.write(buf).await) + }; let r = select( - p.write(buf), + op, self.pipe.wake.wait(), ); match r.await { // write completed - Either::First(l) => Ok(l), + Either::First(l) => l, // lost the pipe Either::Second(l) => { self.shared = None;