diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a091542 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "messing_with_async" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3" \ No newline at end of file diff --git a/README.md b/README.md index 2f974c2..180fff6 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # messing_with_async -don't use this. \ No newline at end of file +don't use this. + +seriously please don't use this. \ No newline at end of file diff --git a/src/futures.rs b/src/futures.rs new file mode 100644 index 0000000..57e50a7 --- /dev/null +++ b/src/futures.rs @@ -0,0 +1,417 @@ +//! Various futures, such as [`TimerFuture`], [`AllFuture`], and [`AnyFuture`]. +//! +//! Also includes macros such as [`any`] and [`all`] to better make use of these futures. + +use std::{ + future::Future, + sync::{Arc, Mutex}, + thread, + time::Duration, + task::{Waker, Context, Poll}, + pin::Pin +}; + + + +// TODO: Check consistent wording of doc comments + +/// A future that waits for a specified amount of time. +/// +/// ``` +/// # use messing_with_async::{async_main, futures::TimerFuture}; +/// # use std::time::Duration; +/// # async_main!(async_main); +/// # async fn async_main() { +/// println!("Hello, world!"); +/// TimerFuture::new(Duration::from_secs(5)).await; +/// println!("Goodbye, world!"); +/// # } +/// # main(); +/// ``` +pub struct TimerFuture { + done: Arc>, + waker: Arc>> +} + +impl TimerFuture { + pub fn new(dur: Duration) -> TimerFuture { + let waker = Arc::new(Mutex::new(None)); + let waker_clone = waker.clone(); + let done = Arc::new(Mutex::new(false)); + let done_clone = done.clone(); + + thread::spawn(move || { + thread::sleep(dur); + *done_clone.lock().unwrap() = true; + if let Some(waker) = waker_clone.lock().unwrap().take() { + let waker: Waker = waker; // needed for some reason + waker.wake(); + } + }); + + TimerFuture { + done, + waker + } + } +} + +impl Future for TimerFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.waker.lock().unwrap().replace(cx.waker().clone()); + if *self.done.lock().unwrap() { Poll::Ready(()) } + else { Poll::Pending } + } +} + +/// Creates a future that waits for the given amount of milliseconds. +/// +/// ``` +/// # use messing_with_async::{async_main, futures::TimerFuture}; +/// # use std::time::Duration; +/// # async_main!(async_main); +/// # async fn async_main() { +/// println!("Hello, world!"); +/// sleep!(5000).await; +/// println!("Goodbye, world!"); +/// # } +/// # main(); +/// ``` +#[macro_export] +macro_rules! sleep { + ($millis:expr) => { + $crate::futures::TimerFuture::new(::core::time::Duration::from_millis($millis)) + }; +} + + + +/// A future that waits for two futures to all complete concurrently. +/// +/// ``` +/// # use messing_with_async::{async_main, AllFuture}; +/// # async_main!(async_main); +/// # async fn async_main() { +/// let fut = AllFuture::new(async { 5 }, async { "foobar" }); +/// assert_eq!(fut.await, (5, "foobar")); +/// # } +/// # main(); +/// ``` +pub struct AllFuture { + a: Option>>>, + b: Option>>>, + v: Option, + w: Option, +} + +impl AllFuture { + pub fn new(a: impl Future + 'static, b: impl Future + 'static) -> AllFuture { // TODO: "a and b dont live long enough, but i wont tell you how long it should live" + Self { + a: Some(Box::pin(a)), + b: Some(Box::pin(b)), + v: None, + w: None, + } + } +} + +impl Unpin for AllFuture {} // needed cause T can be !Unpin, but either way Self is Unpin + +impl Future for AllFuture { + type Output = (T, U); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if let Some(mut a) = this.a.take() { + if let Poll::Ready(v) = a.as_mut().poll(cx) { + this.v = Some(v); + } else { + this.a.replace(a); + } + } + + if let Some(mut b) = this.b.take() { + if let Poll::Ready(w) = b.as_mut().poll(cx) { + this.w = Some(w); + } else { + this.b.replace(b); + } + } + + if this.a.is_none() && this.b.is_none() { + Poll::Ready((this.v.take().unwrap(), this.w.take().unwrap())) + } else { Poll::Pending } + } +} + +/// Creates a future that waits for multiple futures to all complete concurrently. +/// +/// ``` +/// # use messing_with_async::{async_main, all}; +/// # async_main!(async_main); +/// # async fn async_main() { +/// let fut = all!(async { 5 }, async { "foobar" }, async { true }); +/// assert_eq!(fut.await, (5, "foobar", true)); +/// # } +/// # main(); +/// ``` +#[macro_export] +macro_rules! all { + (@FUTURE $fut:expr) => { + $fut + }; + (@FUTURE $fut:expr, $($tail:tt)+) => { + $crate::futures::AllFuture::new($fut, all!(@FUTURE $($tail)+)) + }; + (@TUPLE $base:expr; $fut:expr; $(, $out:expr)*) => { + ($($out),*, $base) + }; + (@TUPLE $base:expr; $fut:expr, $($fut_tail:expr),+; $($out:tt)*) => { + all!(@TUPLE $base.1; $($fut_tail),+; $($out)*, $base.0) + }; + ($($fut:expr),+) => { + async { + let fut = all!(@FUTURE $($fut),+); + let res = fut.await; + all!(@TUPLE res; $($fut),+;) + } + }; +} + +/// A future that waits for multiple futures (with the same `Output` type) in a `Vec` to all complete concurrently. +/// +/// ``` +/// # use messing_with_async::{async_main, AllVecFuture}; +/// # use std::future::Future; +/// # async_main!(async_main); +/// # async fn async_main() { +/// let futures: Vec>> = vec![Box::new(async { 1 }), Box::new(async { 2 }), Box::new(async { 3 })]; +/// let fut = AllVecFuture::new(futures); +/// assert_eq!(fut.await, vec![1, 2, 3]) +/// # } +/// # main(); +/// ``` +pub struct AllVecFuture { + futs: Vec>>>>, + vals: Vec> +} + +impl AllVecFuture { + pub fn new(futs: Vec>>) -> Self { + Self { + vals: futs.iter().map(|_| None).collect(), + futs: futs.into_iter() + .map(|p| unsafe { Some(Pin::new_unchecked(p)) } ) + .collect() + } + } +} + +impl Unpin for AllVecFuture {} // needed cause T can be !Unpin, but either way Self is Unpin + +impl Future for AllVecFuture { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + for (i, fut_opt) in this.futs.iter_mut().enumerate() { + if let Some(fut) = fut_opt { + match fut.as_mut().poll(cx) { + Poll::Ready(v) => { + fut_opt.take(); + this.vals[i].replace(v); + }, + Poll::Pending => {} + } + } + } + + if this.vals.iter().all(Option::is_some) { + return Poll::Ready(this.vals.iter_mut().map(Option::take).map(Option::unwrap).collect()) + } + + Poll::Pending + } +} + +/// Creates a future that waits for multiple futures (with the same `Output` type) to all complete concurrently using a `Vec`. +/// +/// ``` +/// # use messing_with_async::{async_main, all_vec}; +/// # use std::future::Future; +/// # async_main!(async_main); +/// # async fn async_main() { +/// let fut = all_vec!(async { 1 }, async { 2 }, async { 3 }); +/// assert_eq!(fut.await, vec![1, 2, 3]); +/// # } +/// # main(); +/// ``` +#[macro_export] +macro_rules! all_vec { + ($($fut:expr),+) => { + async { + let fut = $crate::futures::AllVecFuture::new(::std::vec![$(::std::boxed::Box::new($fut)),+]); + let res = fut.await; + res + } + }; +} + +/// Represents a type which is either a `T` or a `U`. +#[derive(Debug, PartialEq, Eq)] +pub enum Sum { + A(T), + B(U) +} + +impl Sum { + pub fn is_a(&self) -> bool { + match self { + Self::A(_) => true, + _ => false + } + } + + pub fn is_b(&self) -> bool { + match self { + Self::B(_) => true, + _ => false + } + } + + pub fn take_a(self) -> Option { + match self { + Self::A(v) => Some(v), + _ => None + } + } + + pub fn take_b(self) -> Option { + match self { + Self::B(v) => Some(v), + _ => None + } + } + + pub fn unwrap_a(self) -> T { + match self { + Self::A(v) => v, + _ => panic!("Attempted to unwrap_a a Sum::B value!") + } + } + + pub fn unwrap_b(self) -> U { + match self { + Self::B(v) => v, + _ => panic!("Attempted to unwrap_b a Sum::A value!") + } + } +} + +/// A future that waits for one of two futures to complete concurrently. +/// +/// ``` +/// # use messing_with_async::{async_main, AnyFuture, Sum, TimerFuture}; +/// # async_main!(async_main); +/// # async fn async_main() { +/// let fut = AnyFuture::new(async { TimerFuture::new(1000).await; 5 }, async { "foobar" }); +/// assert_eq!(fut.await, Sum::B("foobar")); +/// # } +/// # main(); +/// ``` +// TODO: this future is still broken, in the above doc comment after 1 second main will be polled again and panic +pub struct AnyFuture { + a: Pin>>, + b: Pin>> +} + +impl AnyFuture { + pub fn new(a: impl Future + 'static, b: impl Future + 'static) -> AnyFuture { + Self { + a: Box::pin(a), + b: Box::pin(b) + } + } +} + +impl Future for AnyFuture { + type Output = Sum; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(v) = self.a.as_mut().poll(cx) { + return Poll::Ready(Sum::A(v)) + } + + if let Poll::Ready(v) = self.b.as_mut().poll(cx) { + return Poll::Ready(Sum::B(v)) + } + + Poll::Pending + } +} + +/// Creates a future that waits for one of multiple futures (with the same `Output` type) to complete concurrently. +// TODO: this is also broken, dunno how to do it +// #[macro_export] +#[allow(unused_macros)] +macro_rules! any { + (@FUTURE $fut:expr) => { + $fut + }; + (@FUTURE $fut:expr, $($tail:tt)+) => { + $crate::futures::AnyFuture::new($fut, all!(@FUTURE $($tail)+)) + }; + ($($fut:expr),+) => { + async { + let fut = any!(@FUTURE $($fut),+); + let res = fut.await; + todo!(); + } + }; +} + +/// A future that waits for one of multiple futures (with the same `Output` type) in a `Vec` to complete concurrently. +pub struct AnyVecFuture { + futs: Vec>>> +} + +impl AnyVecFuture { + pub fn new(futs: Vec>>) -> AnyVecFuture { + Self { + futs: futs.into_iter() + .map(|p| unsafe { Pin::new_unchecked(p) }) + .collect() + } + } +} + +impl Future for AnyVecFuture { + type Output = (usize, T); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + for (i, fut) in self.futs.iter_mut().enumerate() { + match fut.as_mut().poll(cx) { + Poll::Ready(v) => return Poll::Ready((i, v)), + Poll::Pending => {} + } + } + + Poll::Pending + } +} + +/// Creates a future that waits for one of multiple futures (with the same `Output` type) to complete concurrently using a `Vec`. +#[macro_export] +macro_rules! any_vec { + ($($fut:expr),+) => { + async { + let fut = $crate::futures::AnyVecFuture::new(::std::alloc::vec![$($fut),+]) + let res = fut.await; + res + } + }; +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..9f87fdf --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,98 @@ +/// Sets up the given `async fn` to be the entry point. +/// +/// ``` +/// # use messing_with_async::async_main; +/// async_main!(async_main); +/// async fn async_main() { +/// println!("Hello, world!"); +/// } +/// # main(); // needed cause the main generated by the macro isnt recognized by doctests +/// ``` +#[macro_export] +macro_rules! async_main { + ($ident:ident) => { + fn main() { + use ::core::future::Future; + + struct Task { + tx: ::std::sync::mpsc::SyncSender<()> + } + + impl ::futures::task::ArcWake for Task { + fn wake_by_ref(arc_self: &::std::sync::Arc) { + arc_self.tx.send(()).unwrap(); + } + } + + let (tx, rx) = ::std::sync::mpsc::sync_channel(10000); + tx.send(()).unwrap(); + let waker = ::futures::task::waker(::std::sync::Arc::new(Task { tx })); + let mut context = ::core::task::Context::from_waker(&waker); + let mut future = ::std::boxed::Box::pin($ident()); + + while let Ok(_) = rx.recv() { + match future.as_mut().poll(&mut context) { + ::core::task::Poll::Pending => {}, + ::core::task::Poll::Ready(v) => v + } + } + } + }; +} + +use std::{sync::{mpsc::{SyncSender, self}, Arc, Mutex}, pin::Pin, future::Future, task::Context}; + +use ::futures::task::{ArcWake, waker}; + +pub struct Executor {} + +impl Executor { + pub fn new() -> Self { + Executor {} + } + + pub fn run(&self, fut: impl Future + Send + Sync + 'static) { + let (tx, rx) = mpsc::sync_channel(10000); + + let main = Arc::new(Task { tx: tx.clone(), fut: Mutex::new(Box::pin(fut)) }); + + tx.send(main).unwrap(); + drop(tx); + + while let Ok(task) = rx.recv() { + let waker = waker(task.clone()); + let mut context = Context::from_waker(&waker); + #[allow(unused_must_use)] + { task.fut.lock().unwrap().as_mut().poll(&mut context); } + } + } +} +pub struct Task { + tx: SyncSender>, + fut: Mutex + Send + Sync>>> // dunno how to fix this, just hope everyhing is send/sync for now +} + +impl ArcWake for Task { + fn wake_by_ref(arc_self: &Arc) { + arc_self.tx.send(arc_self.clone()).unwrap(); + } +} + +pub mod futures; +use crate::futures::*; + + +pub trait FutureExt: Future + 'static { + fn and(self, other: impl Future + 'static) -> AllFuture; + fn or(self, other: impl Future + 'static) -> AnyFuture; +} + +impl FutureExt for F { + fn and(self, other: impl Future + 'static) -> AllFuture { + AllFuture::new(self, other) + } + + fn or(self, other: impl Future + 'static) -> AnyFuture { + AnyFuture::new(self, other) + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e48cb10 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,12 @@ +use messing_with_async::{async_main, all, sleep, FutureExt}; + +async_main!(async_main); +async fn async_main() { + println!("hi!"); + sleep!(1000).await; + println!("getting closer!"); + println!("{:#?}", all!(async { 5 }, async { "foobar" }, async { true }, sleep!(500)).await); + println!("almost there!"); + println!("{:#?}", async { 5 }.and(async { "baz" }).and(async { false }).and(sleep!(2000)).await); + println!("done!"); +} \ No newline at end of file