Update examples

This commit is contained in:
Alex Orlenko 2020-04-19 01:23:42 +01:00
parent 222f4df668
commit d8897d867b
2 changed files with 100 additions and 68 deletions

View file

@ -1,8 +1,34 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use hyper::Client as HyperClient;
use bstr::BString;
use hyper::{body::Body as HyperBody, Client as HyperClient};
use tokio::stream::StreamExt;
use mlua::{Error, Lua, Result, Thread};
use mlua::{Error, Lua, Result, UserData, UserDataMethods};
#[derive(Clone)]
struct BodyReader(Rc<RefCell<HyperBody>>);
impl BodyReader {
fn new(body: HyperBody) -> Self {
BodyReader(Rc::new(RefCell::new(body)))
}
}
impl UserData for BodyReader {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("read", |_, reader, ()| async move {
let mut reader = reader.0.borrow_mut();
let bytes = reader.try_next().await.map_err(Error::external)?;
if let Some(bytes) = bytes {
return Ok(Some(BString::from(bytes.as_ref())));
}
Ok(None)
});
}
}
#[tokio::main]
async fn main() -> Result<()> {
@ -23,10 +49,9 @@ async fn main() -> Result<()> {
.or_insert(Vec::new())
.push(value.to_str().unwrap());
}
lua_resp.set("headers", headers)?;
let buf = hyper::body::to_bytes(resp).await.map_err(Error::external)?;
lua_resp.set("body", String::from_utf8_lossy(&buf).into_owned())?;
lua_resp.set("headers", headers)?;
lua_resp.set("body", BodyReader::new(resp.into_body()))?;
Ok(lua_resp)
})?;
@ -34,22 +59,25 @@ async fn main() -> Result<()> {
let globals = lua.globals();
globals.set("fetch_url", fetch_url)?;
let thread = lua
let f = lua
.load(
r#"
coroutine.create(function ()
local res = fetch_url("http://httpbin.org/ip");
print(res.status)
for key, vals in pairs(res.headers) do
for _, val in ipairs(vals) do
print(key..": "..val)
end
local res = fetch_url(...);
print(res.status)
for key, vals in pairs(res.headers) do
for _, val in ipairs(vals) do
print(key..": "..val)
end
print(res.body)
end)
end
repeat
local body = res.body:read()
if body then
print(body)
end
until not body
"#,
)
.eval::<Thread>()?;
.into_function()?;
thread.into_async(()).await
f.call_async("http://httpbin.org/ip").await
}

View file

@ -1,30 +1,37 @@
use std::cell::RefCell;
use std::net::Shutdown;
use std::rc::Rc;
use bstr::BString;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::sync::Mutex;
use tokio::task;
use mlua::{Function, Lua, Result, Thread, UserData, UserDataMethods};
use mlua::{Function, Lua, Result, UserData, UserDataMethods};
#[derive(Clone)]
struct LuaTcpListener(Option<Rc<Mutex<TcpListener>>>);
struct LuaTcp;
#[derive(Clone)]
struct LuaTcpStream(Rc<Mutex<TcpStream>>);
struct LuaTcpListener(Rc<RefCell<TcpListener>>);
#[derive(Clone)]
struct LuaTcpStream(Rc<RefCell<TcpStream>>);
impl UserData for LuaTcp {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_function("bind", |_, addr: String| async move {
let listener = TcpListener::bind(addr).await?;
Ok(LuaTcpListener(Rc::new(RefCell::new(listener))))
});
}
}
impl UserData for LuaTcpListener {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_function("bind", |_, addr: String| async {
let listener = TcpListener::bind(addr).await?;
Ok(LuaTcpListener(Some(Rc::new(Mutex::new(listener)))))
});
methods.add_async_method("accept", |_, listener, ()| async {
let (stream, _) = listener.0.unwrap().lock().await.accept().await?;
Ok(LuaTcpStream(Rc::new(Mutex::new(stream))))
methods.add_async_method("accept", |_, listener, ()| async move {
let (stream, _) = listener.0.borrow_mut().accept().await?;
Ok(LuaTcpStream(Rc::new(RefCell::new(stream))))
});
}
}
@ -32,25 +39,23 @@ impl UserData for LuaTcpListener {
impl UserData for LuaTcpStream {
fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
methods.add_async_method("peer_addr", |_, stream, ()| async move {
Ok(stream.0.lock().await.peer_addr()?.to_string())
Ok(stream.0.borrow().peer_addr()?.to_string())
});
methods.add_async_method("read", |_, stream, size: usize| async move {
let mut buf = vec![0; size];
let mut stream = stream.0.lock().await;
let n = stream.read(&mut buf).await?;
let n = stream.0.borrow_mut().read(&mut buf).await?;
buf.truncate(n);
Ok(BString::from(buf))
});
methods.add_async_method("write", |_, stream, data: BString| async move {
let mut stream = stream.0.lock().await;
let n = stream.write(&data).await?;
let n = stream.0.borrow_mut().write(&data).await?;
Ok(n)
});
methods.add_async_method("close", |_, stream, ()| async move {
stream.0.lock().await.shutdown(Shutdown::Both)?;
methods.add_method("close", |_, stream, ()| {
stream.0.borrow().shutdown(Shutdown::Both)?;
Ok(())
});
}
@ -60,44 +65,43 @@ impl UserData for LuaTcpStream {
async fn main() -> Result<()> {
let lua = Lua::new();
let spawn = lua.create_function(move |_, func: Function| {
task::spawn_local(async move { func.call_async::<_, ()>(()).await.unwrap() });
Ok(())
})?;
let globals = lua.globals();
globals.set("tcp", LuaTcpListener(None))?;
globals.set("tcp", LuaTcp)?;
globals.set("spawn", spawn)?;
globals.set(
"spawn",
lua.create_function(move |lua: &Lua, func: Function| {
let fut = lua.create_thread(func)?.into_async::<_, ()>(());
task::spawn_local(async move { fut.await.unwrap() });
Ok(())
})?,
)?;
let thread = lua
let server = lua
.load(
r#"
coroutine.create(function ()
local listener = tcp.bind("0.0.0.0:1234")
print("listening on 0.0.0.0:1234")
while true do
local stream = listener:accept()
print("connected from " .. stream:peer_addr())
spawn(function()
while true do
local data = stream:read(100)
data = data:match("^%s*(.-)%s*$") -- trim
print(data)
stream:write("got: "..data.."\n")
if data == "exit" then
stream:close()
break
end
local addr = ...
local listener = tcp.bind(addr)
print("listening on "..addr)
while true do
local stream = listener:accept()
local peer_addr = stream:peer_addr()
print("connected from "..peer_addr)
spawn(function()
while true do
local data = stream:read(100)
data = data:match("^%s*(.-)%s*$") -- trim
print("["..peer_addr.."] "..data)
stream:write("got: "..data.."\n")
if data == "exit" then
stream:close()
break
end
end)
end
end)
end
end)
end
"#,
)
.eval::<Thread>()?;
.into_function()?;
thread.into_async(()).await
task::LocalSet::new()
.run_until(server.call_async::<_, ()>("0.0.0.0:1234"))
.await
}