This commit is contained in:
Alex Orlenko 2020-02-21 20:21:22 +00:00
parent 35b0a6bad8
commit ffc6aa1d33
7 changed files with 72 additions and 46 deletions

View file

@ -26,11 +26,12 @@ members = [
[features]
default = ["lua53"]
lua53 = ["futures-task", "futures-util"]
lua52 = ["futures-task", "futures-util"]
lua53 = ["async"]
lua52 = ["async"]
lua51 = []
luajit = []
vendored = ["lua-src", "luajit-src"]
async = ["futures-task", "futures-util"]
[dependencies]
bstr = { version = "0.2", features = ["std"], default_features = false }
@ -57,3 +58,7 @@ futures-timer = "3.0"
[[bench]]
name = "benchmark"
harness = false
[[example]]
name = "async_tcp_server"
required-features = ["async"]

View file

@ -81,6 +81,10 @@ fn main() {
#[cfg(all(feature = "lua51", feature = "luajit"))]
panic!("You can enable only one of the features: lua53, lua52, lua51, luajit");
// Async
#[cfg(all(feature = "async", not(any(feature = "lua53", feature = "lua52"))))]
panic!("You can enable async only for: lua53, lua52");
let include_dir = find::probe_lua();
build_glue(&include_dir);
}

View file

@ -16,7 +16,6 @@ struct LuaTcpListener(Option<Rc<Mutex<TcpListener>>>);
struct LuaTcpStream(Rc<Mutex<TcpStream>>);
impl UserData for LuaTcpListener {
#[cfg(any(feature = "lua53", feature = "lua52"))]
fn add_async_methods<M: UserDataAsyncMethods<Self>>(methods: &mut M) {
methods.add_function("bind", |_, addr: String| async {
let listener = TcpListener::bind(addr).await?;
@ -31,7 +30,6 @@ impl UserData for LuaTcpListener {
}
impl UserData for LuaTcpStream {
#[cfg(any(feature = "lua53", feature = "lua52"))]
fn add_async_methods<M: UserDataAsyncMethods<Self>>(methods: &mut M) {
methods.add_method("peer_addr", |_, stream, ()| async move {
Ok(stream.0.lock().await.peer_addr()?.to_string())
@ -68,8 +66,7 @@ async fn main() -> Result<()> {
globals.set(
"spawn",
lua.create_function(move |lua: &Lua, func: Function| {
let thr = lua.create_thread(func)?;
let fut = thr.into_async::<_, ()>(());
let fut = lua.create_thread(func)?.into_async::<_, ()>(());
task::spawn_local(async move { fut.await.unwrap() });
Ok(())
})?,
@ -86,6 +83,7 @@ async fn main() -> Result<()> {
spawn(function()
while true do
local data = stream:read(100)
data = data:match("^%s*(.-)%s*$") -- trim
print(data)
stream:write("got: "..data)
if data == "exit" then
@ -100,7 +98,5 @@ async fn main() -> Result<()> {
)
.eval::<Thread>()?;
task::LocalSet::new()
.run_until(thread.into_async(()))
.await
task::LocalSet::new().run_until(thread.into_async(())).await
}

View file

@ -10,7 +10,7 @@ use std::{mem, ptr, str};
use futures_core::future::LocalBoxFuture;
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
use {
crate::userdata::UserDataAsyncMethods,
futures_task::noop_waker,
@ -523,7 +523,7 @@ impl Lua {
///
/// [`Thread`]: struct.Thread.html
/// [`ThreadStream`]: struct.ThreadStream.html
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
pub fn create_async_function<A, R, F, FR>(&self, func: F) -> Result<Function>
where
A: FromLuaMulti,
@ -1045,6 +1045,8 @@ impl Lua {
let mut methods = StaticUserDataMethods::default();
T::add_methods(&mut methods);
#[cfg(feature = "async")]
T::add_async_methods(&mut methods);
protect_lua_closure(self.state, 0, 1, |state| {
ffi::lua_newtable(state);
@ -1071,7 +1073,7 @@ impl Lua {
ffi::lua_rawset(state, -3);
})?;
}
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
for (k, m) in methods.async_methods {
push_string(self.state, &k)?;
self.push_value(Value::Function(self.create_async_callback(m)?))?;
@ -1159,7 +1161,7 @@ impl Lua {
}
}
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
pub(crate) fn create_async_callback<'callback>(
&self,
func: AsyncCallback<'static>,
@ -1608,7 +1610,7 @@ impl<T: 'static + UserData> UserDataMethods<T> for StaticUserDataMethods<T> {
}
}
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
impl<T: 'static + UserData + Clone> UserDataAsyncMethods<T> for StaticUserDataMethods<T> {
fn add_method<S, A, R, M, MR>(&mut self, name: &S, method: M)
where
@ -1682,7 +1684,7 @@ impl<T: 'static + UserData> StaticUserDataMethods<T> {
})
}
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
fn box_async_method<A, R, M, MR>(method: M) -> AsyncCallback<'static>
where
T: Clone,
@ -1744,7 +1746,7 @@ impl<T: 'static + UserData> StaticUserDataMethods<T> {
})
}
#[cfg(any(feature = "lua53", feature = "lua52"))]
#[cfg(feature = "async")]
fn box_async_function<A, R, F, FR>(function: F) -> AsyncCallback<'static>
where
A: FromLuaMulti,

View file

@ -10,3 +10,6 @@ pub use crate::{
Thread as LuaThread, ThreadStatus as LuaThreadStatus, ToLua, ToLuaMulti,
UserData as LuaUserData, UserDataMethods as LuaUserDataMethods, Value as LuaValue,
};
#[cfg(feature = "async")]
pub use crate::{AsyncThread as LuaAsyncThread, UserDataAsyncMethods as LuaUserDataAsyncMethods};

View file

@ -159,12 +159,16 @@ impl Thread {
}
}
/// Converts thread to an async Future or Stream.
/// Converts Thread to an AsyncThread which implements Future and Stream traits.
///
/// Passes `args` as arguments to the thread and return `ThreadStream` object.
/// `args` are passed as arguments to the thread function for first call.
/// The object call `resume()` while polling and also allows to run rust futures
/// to completion using an executor.
///
/// Using AsyncThread as a Stream allows to iterate through `coroutine.yield()`
/// values whereas Future version discards that values and poll until the final
/// one (returned from the thread function).
///
/// # Examples
///
/// ```
@ -231,13 +235,12 @@ where
_ => return Poll::Ready(None),
};
set_waker(&lua, cx.waker().clone())?;
let _wg = WakerGuard::new(lua.state, cx.waker().clone());
let ret: MultiValue = if let Some(args) = self.args0.borrow_mut().take() {
self.thread.resume(args?)?
} else {
self.thread.resume(())?
};
unset_waker(&lua);
if is_poll_pending(&lua, &ret) {
return Poll::Pending;
@ -262,13 +265,12 @@ where
_ => return Poll::Ready(Err("Thread already finished".to_lua_err())),
};
set_waker(&lua, cx.waker().clone())?;
let _wg = WakerGuard::new(lua.state, cx.waker().clone());
let ret: MultiValue = if let Some(args) = self.args0.borrow_mut().take() {
self.thread.resume(args?)?
} else {
self.thread.resume(())?
};
unset_waker(&lua);
if is_poll_pending(&lua, &ret) {
return Poll::Pending;
@ -284,29 +286,6 @@ where
}
}
fn set_waker(lua: &Lua, waker: Waker) -> Result<()> {
unsafe {
let _sg = StackGuard::new(lua.state);
assert_stack(lua.state, 6);
ffi::lua_pushlightuserdata(lua.state, &WAKER_REGISTRY_KEY as *const u8 as *mut c_void);
push_gc_userdata(lua.state, waker)?;
ffi::lua_rawset(lua.state, ffi::LUA_REGISTRYINDEX);
}
Ok(())
}
fn unset_waker(lua: &Lua) {
unsafe {
let _sg = StackGuard::new(lua.state);
assert_stack(lua.state, 2);
ffi::lua_pushlightuserdata(lua.state, &WAKER_REGISTRY_KEY as *const u8 as *mut c_void);
ffi::lua_pushnil(lua.state);
ffi::lua_rawset(lua.state, ffi::LUA_REGISTRYINDEX);
}
}
fn is_poll_pending(lua: &Lua, val: &MultiValue) -> bool {
if val.len() != 1 {
return false;
@ -329,3 +308,34 @@ fn is_poll_pending(lua: &Lua, val: &MultiValue) -> bool {
false
}
struct WakerGuard(*mut ffi::lua_State);
impl WakerGuard {
pub fn new(state: *mut ffi::lua_State, waker: Waker) -> Result<WakerGuard> {
unsafe {
let _sg = StackGuard::new(state);
assert_stack(state, 6);
ffi::lua_pushlightuserdata(state, &WAKER_REGISTRY_KEY as *const u8 as *mut c_void);
push_gc_userdata(state, waker)?;
ffi::lua_rawset(state, ffi::LUA_REGISTRYINDEX);
Ok(WakerGuard(state))
}
}
}
impl Drop for WakerGuard {
fn drop(&mut self) {
unsafe {
let state = self.0;
let _sg = StackGuard::new(state);
assert_stack(state, 2);
ffi::lua_pushlightuserdata(state, &WAKER_REGISTRY_KEY as *const u8 as *mut c_void);
ffi::lua_pushnil(state);
ffi::lua_rawset(state, ffi::LUA_REGISTRYINDEX);
}
}
}

View file

@ -232,6 +232,12 @@ pub trait UserDataMethods<T: UserData> {
///
/// [`UserData`]: trait.UserData.html
pub trait UserDataAsyncMethods<T: UserData + Clone> {
/// Add an async method which accepts a `T` as the first parameter and returns Future.
/// The passed `T` is cloned from the original value.
///
/// Refer to [`add_method`] for more information about the implementation.
///
/// [`add_method`]: #method.add_method
fn add_method<S, A, R, M, MR>(&mut self, name: &S, method: M)
where
S: ?Sized + AsRef<[u8]>,
@ -313,7 +319,7 @@ pub trait UserDataAsyncMethods<T: UserData + Clone> {
/// [`ToLua`]: trait.ToLua.html
/// [`FromLua`]: trait.FromLua.html
/// [`UserDataMethods`]: trait.UserDataMethods.html
pub trait UserData: Sized {
pub trait UserData: Sized + Clone {
/// Adds custom methods and operators specific to this userdata.
fn add_methods<M: UserDataMethods<Self>>(_methods: &mut M) {}