messing_with_async/src/futures.rs
2022-02-23 09:10:28 -06:00

417 lines
11 KiB
Rust

//! Various futures, such as [`TimerFuture`], [`AllFuture`], and [`AnyFuture`].
//!
//! Also includes macros such as [`any`] and [`all`] to better make use of these futures.
use std::{
future::Future,
sync::{Arc, Mutex},
thread,
time::Duration,
task::{Waker, Context, Poll},
pin::Pin
};
// TODO: Check consistent wording of doc comments
/// A future that waits for a specified amount of time.
///
/// ```
/// # use messing_with_async::{async_main, futures::TimerFuture};
/// # use std::time::Duration;
/// # async_main!(async_main);
/// # async fn async_main() {
/// println!("Hello, world!");
/// TimerFuture::new(Duration::from_secs(5)).await;
/// println!("Goodbye, world!");
/// # }
/// # main();
/// ```
pub struct TimerFuture {
done: Arc<Mutex<bool>>,
waker: Arc<Mutex<Option<Waker>>>
}
impl TimerFuture {
pub fn new(dur: Duration) -> TimerFuture {
let waker = Arc::new(Mutex::new(None));
let waker_clone = waker.clone();
let done = Arc::new(Mutex::new(false));
let done_clone = done.clone();
thread::spawn(move || {
thread::sleep(dur);
*done_clone.lock().unwrap() = true;
if let Some(waker) = waker_clone.lock().unwrap().take() {
let waker: Waker = waker; // needed for some reason
waker.wake();
}
});
TimerFuture {
done,
waker
}
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.waker.lock().unwrap().replace(cx.waker().clone());
if *self.done.lock().unwrap() { Poll::Ready(()) }
else { Poll::Pending }
}
}
/// Creates a future that waits for the given amount of milliseconds.
///
/// ```
/// # use messing_with_async::{async_main, futures::TimerFuture};
/// # use std::time::Duration;
/// # async_main!(async_main);
/// # async fn async_main() {
/// println!("Hello, world!");
/// sleep!(5000).await;
/// println!("Goodbye, world!");
/// # }
/// # main();
/// ```
#[macro_export]
macro_rules! sleep {
($millis:expr) => {
$crate::futures::TimerFuture::new(::core::time::Duration::from_millis($millis))
};
}
/// A future that waits for two futures to all complete concurrently.
///
/// ```
/// # use messing_with_async::{async_main, AllFuture};
/// # async_main!(async_main);
/// # async fn async_main() {
/// let fut = AllFuture::new(async { 5 }, async { "foobar" });
/// assert_eq!(fut.await, (5, "foobar"));
/// # }
/// # main();
/// ```
pub struct AllFuture<T, U> {
a: Option<Pin<Box<dyn Future<Output = T>>>>,
b: Option<Pin<Box<dyn Future<Output = U>>>>,
v: Option<T>,
w: Option<U>,
}
impl<T, U> AllFuture<T, U> {
pub fn new(a: impl Future<Output = T> + 'static, b: impl Future<Output = U> + 'static) -> AllFuture<T, U> { // TODO: "a and b dont live long enough, but i wont tell you how long it should live"
Self {
a: Some(Box::pin(a)),
b: Some(Box::pin(b)),
v: None,
w: None,
}
}
}
impl<T, U> Unpin for AllFuture<T, U> {} // needed cause T can be !Unpin, but either way Self is Unpin
impl<T, U> Future for AllFuture<T, U> {
type Output = (T, U);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if let Some(mut a) = this.a.take() {
if let Poll::Ready(v) = a.as_mut().poll(cx) {
this.v = Some(v);
} else {
this.a.replace(a);
}
}
if let Some(mut b) = this.b.take() {
if let Poll::Ready(w) = b.as_mut().poll(cx) {
this.w = Some(w);
} else {
this.b.replace(b);
}
}
if this.a.is_none() && this.b.is_none() {
Poll::Ready((this.v.take().unwrap(), this.w.take().unwrap()))
} else { Poll::Pending }
}
}
/// Creates a future that waits for multiple futures to all complete concurrently.
///
/// ```
/// # use messing_with_async::{async_main, all};
/// # async_main!(async_main);
/// # async fn async_main() {
/// let fut = all!(async { 5 }, async { "foobar" }, async { true });
/// assert_eq!(fut.await, (5, "foobar", true));
/// # }
/// # main();
/// ```
#[macro_export]
macro_rules! all {
(@FUTURE $fut:expr) => {
$fut
};
(@FUTURE $fut:expr, $($tail:tt)+) => {
$crate::futures::AllFuture::new($fut, all!(@FUTURE $($tail)+))
};
(@TUPLE $base:expr; $fut:expr; $(, $out:expr)*) => {
($($out),*, $base)
};
(@TUPLE $base:expr; $fut:expr, $($fut_tail:expr),+; $($out:tt)*) => {
all!(@TUPLE $base.1; $($fut_tail),+; $($out)*, $base.0)
};
($($fut:expr),+) => {
async {
let fut = all!(@FUTURE $($fut),+);
let res = fut.await;
all!(@TUPLE res; $($fut),+;)
}
};
}
/// A future that waits for multiple futures (with the same `Output` type) in a `Vec` to all complete concurrently.
///
/// ```
/// # use messing_with_async::{async_main, AllVecFuture};
/// # use std::future::Future;
/// # async_main!(async_main);
/// # async fn async_main() {
/// let futures: Vec<Box<dyn Future<Output = _>>> = vec![Box::new(async { 1 }), Box::new(async { 2 }), Box::new(async { 3 })];
/// let fut = AllVecFuture::new(futures);
/// assert_eq!(fut.await, vec![1, 2, 3])
/// # }
/// # main();
/// ```
pub struct AllVecFuture<T> {
futs: Vec<Option<Pin<Box<dyn Future<Output = T>>>>>,
vals: Vec<Option<T>>
}
impl<T> AllVecFuture<T> {
pub fn new(futs: Vec<Box<dyn Future<Output = T>>>) -> Self {
Self {
vals: futs.iter().map(|_| None).collect(),
futs: futs.into_iter()
.map(|p| unsafe { Some(Pin::new_unchecked(p)) } )
.collect()
}
}
}
impl<T> Unpin for AllVecFuture<T> {} // needed cause T can be !Unpin, but either way Self is Unpin
impl<T> Future for AllVecFuture<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
for (i, fut_opt) in this.futs.iter_mut().enumerate() {
if let Some(fut) = fut_opt {
match fut.as_mut().poll(cx) {
Poll::Ready(v) => {
fut_opt.take();
this.vals[i].replace(v);
},
Poll::Pending => {}
}
}
}
if this.vals.iter().all(Option::is_some) {
return Poll::Ready(this.vals.iter_mut().map(Option::take).map(Option::unwrap).collect())
}
Poll::Pending
}
}
/// Creates a future that waits for multiple futures (with the same `Output` type) to all complete concurrently using a `Vec`.
///
/// ```
/// # use messing_with_async::{async_main, all_vec};
/// # use std::future::Future;
/// # async_main!(async_main);
/// # async fn async_main() {
/// let fut = all_vec!(async { 1 }, async { 2 }, async { 3 });
/// assert_eq!(fut.await, vec![1, 2, 3]);
/// # }
/// # main();
/// ```
#[macro_export]
macro_rules! all_vec {
($($fut:expr),+) => {
async {
let fut = $crate::futures::AllVecFuture::new(::std::vec![$(::std::boxed::Box::new($fut)),+]);
let res = fut.await;
res
}
};
}
/// Represents a type which is either a `T` or a `U`.
#[derive(Debug, PartialEq, Eq)]
pub enum Sum<T, U> {
A(T),
B(U)
}
impl<T, U> Sum<T, U> {
pub fn is_a(&self) -> bool {
match self {
Self::A(_) => true,
_ => false
}
}
pub fn is_b(&self) -> bool {
match self {
Self::B(_) => true,
_ => false
}
}
pub fn take_a(self) -> Option<T> {
match self {
Self::A(v) => Some(v),
_ => None
}
}
pub fn take_b(self) -> Option<U> {
match self {
Self::B(v) => Some(v),
_ => None
}
}
pub fn unwrap_a(self) -> T {
match self {
Self::A(v) => v,
_ => panic!("Attempted to unwrap_a a Sum::B value!")
}
}
pub fn unwrap_b(self) -> U {
match self {
Self::B(v) => v,
_ => panic!("Attempted to unwrap_b a Sum::A value!")
}
}
}
/// A future that waits for one of two futures to complete concurrently.
///
/// ```
/// # use messing_with_async::{async_main, AnyFuture, Sum, TimerFuture};
/// # async_main!(async_main);
/// # async fn async_main() {
/// let fut = AnyFuture::new(async { TimerFuture::new(1000).await; 5 }, async { "foobar" });
/// assert_eq!(fut.await, Sum::B("foobar"));
/// # }
/// # main();
/// ```
// TODO: this future is still broken, in the above doc comment after 1 second main will be polled again and panic
pub struct AnyFuture<T, U> {
a: Pin<Box<dyn Future<Output = T>>>,
b: Pin<Box<dyn Future<Output = U>>>
}
impl<T, U> AnyFuture<T, U> {
pub fn new(a: impl Future<Output = T> + 'static, b: impl Future<Output = U> + 'static) -> AnyFuture<T, U> {
Self {
a: Box::pin(a),
b: Box::pin(b)
}
}
}
impl<T, U> Future for AnyFuture<T, U> {
type Output = Sum<T, U>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(v) = self.a.as_mut().poll(cx) {
return Poll::Ready(Sum::A(v))
}
if let Poll::Ready(v) = self.b.as_mut().poll(cx) {
return Poll::Ready(Sum::B(v))
}
Poll::Pending
}
}
/// Creates a future that waits for one of multiple futures (with the same `Output` type) to complete concurrently.
// TODO: this is also broken, dunno how to do it
// #[macro_export]
#[allow(unused_macros)]
macro_rules! any {
(@FUTURE $fut:expr) => {
$fut
};
(@FUTURE $fut:expr, $($tail:tt)+) => {
$crate::futures::AnyFuture::new($fut, all!(@FUTURE $($tail)+))
};
($($fut:expr),+) => {
async {
let fut = any!(@FUTURE $($fut),+);
let res = fut.await;
todo!();
}
};
}
/// A future that waits for one of multiple futures (with the same `Output` type) in a `Vec` to complete concurrently.
pub struct AnyVecFuture<T> {
futs: Vec<Pin<Box<dyn Future<Output = T>>>>
}
impl<T> AnyVecFuture<T> {
pub fn new(futs: Vec<Box<dyn Future<Output = T>>>) -> AnyVecFuture<T> {
Self {
futs: futs.into_iter()
.map(|p| unsafe { Pin::new_unchecked(p) })
.collect()
}
}
}
impl<T> Future for AnyVecFuture<T> {
type Output = (usize, T);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for (i, fut) in self.futs.iter_mut().enumerate() {
match fut.as_mut().poll(cx) {
Poll::Ready(v) => return Poll::Ready((i, v)),
Poll::Pending => {}
}
}
Poll::Pending
}
}
/// Creates a future that waits for one of multiple futures (with the same `Output` type) to complete concurrently using a `Vec`.
#[macro_export]
macro_rules! any_vec {
($($fut:expr),+) => {
async {
let fut = $crate::futures::AnyVecFuture::new(::std::alloc::vec![$($fut),+])
let res = fut.await;
res
}
};
}