Support async on Thread

This commit is contained in:
Alex Orlenko 2020-01-11 21:21:39 +00:00
parent 6978723030
commit 808306a43e
4 changed files with 95 additions and 0 deletions

View file

@ -35,6 +35,7 @@ vendored = ["lua-src", "luajit-src"]
[dependencies]
num-traits = { version = "0.2.6" }
bstr = { version = "0.2", features = ["std"], default_features = false }
futures = { version = "0.3.1" }
[build-dependencies]
cc = { version = "1.0" }

View file

@ -1,7 +1,12 @@
use std::os::raw::c_int;
use std::pin::Pin;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use crate::error::{Error, Result};
use crate::ffi;
use crate::lua::Lua;
use crate::types::LuaRef;
use crate::util::{
assert_stack, check_stack, error_traceback, pop_error, protect_lua_closure, StackGuard,
@ -27,6 +32,12 @@ pub enum ThreadStatus {
#[derive(Clone, Debug)]
pub struct Thread(pub(crate) LuaRef);
#[derive(Debug)]
pub struct ThreadStream {
thread: Thread,
args0: Option<Result<MultiValue>>,
}
impl Thread {
/// Resumes execution of this thread.
///
@ -142,6 +153,17 @@ impl Thread {
}
}
}
pub fn into_stream<A>(self, args: A) -> ThreadStream
where
A: ToLuaMulti
{
let args = args.to_lua_multi(&self.0.lua);
ThreadStream {
thread: self,
args0: Some(args),
}
}
}
impl PartialEq for Thread {
@ -149,3 +171,28 @@ impl PartialEq for Thread {
self.0 == other.0
}
}
impl Stream for ThreadStream {
type Item = Result<(Lua, MultiValue)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.thread.status() {
ThreadStatus::Resumable => {}
_ => return Poll::Ready(None),
}
let r: Result<MultiValue> = if let Some(args) = self.args0.take() {
match args {
Err(e) => return Poll::Ready(Some(Err(e))),
Ok(x) => self.thread.resume(x),
}
} else {
self.thread.resume(())
};
cx.waker().wake_by_ref();
let lua = self.thread.0.lua.clone();
return Poll::Ready(Some(r.map(move |x| (lua, x))));
}
}

View file

@ -2,6 +2,8 @@ use std::os::raw::{c_int, c_void};
use std::sync::{Arc, Mutex};
use std::{fmt, mem, ptr};
use futures::future::Future;
use crate::error::Result;
use crate::ffi;
use crate::lua::Lua;
@ -20,6 +22,11 @@ pub struct LightUserData(pub *mut c_void);
pub(crate) type Callback<'a> =
Box<dyn Fn(&Lua, MultiValue) -> Result<MultiValue> + 'a>;
pub(crate) type AsyncResult = Box<dyn Future<Output = Result<MultiValue>>>;
pub(crate) type AsyncCallback<'a> =
Box<dyn Fn(&Lua, MultiValue) -> AsyncResult + 'a>;
/// An auto generated key into the Lua registry.
///
/// This is a handle to a value stored inside the Lua registry. It is not directly usable like the

View file

@ -1,5 +1,9 @@
use std::panic::catch_unwind;
use futures::executor::block_on;
use futures::stream::TryStreamExt;
use futures::pin_mut;
use mlua::{Error, Function, Lua, Result, Thread, ThreadStatus};
#[test]
@ -93,6 +97,42 @@ fn test_thread() -> Result<()> {
Ok(())
}
#[test]
fn test_thread_stream() -> Result<()> {
let lua = Lua::new();
let thread = lua.create_thread(
lua.load(
r#"
function (s)
local sum = s
for i = 1,10 do
sum = sum + i
coroutine.yield(sum)
end
return sum
end
"#,
)
.eval()?,
)?;
let result = block_on(async {
let s = thread.into_stream(0);
pin_mut!(s);
let mut sum = 0;
while let Some((lua, item)) = s.try_next().await? {
let n: i64 = lua.unpack_multi(item)?;
sum += n;
}
Ok::<_, Error>(sum)
})?;
assert_eq!(result, 275);
Ok(())
}
#[test]
fn coroutine_from_closure() -> Result<()> {
let lua = Lua::new();