2022-01-30 11:26:09 -06:00
use rusqlite ::{ params , OptionalExtension , Params } ;
2022-01-27 17:44:50 -06:00
use serde ::Serialize ;
2022-07-20 03:58:33 -05:00
use std ::collections ::HashMap ;
2022-01-02 14:58:54 -06:00
use tokio ::sync ::{
mpsc ::{ unbounded_channel , UnboundedReceiver , UnboundedSender } ,
oneshot ,
} ;
2022-07-19 08:07:55 -05:00
use tokio ::time ::Instant ;
2022-01-01 15:35:27 -06:00
#[ derive(Debug) ]
enum Task {
2022-07-17 13:10:52 -05:00
AddQuote ( oneshot ::Sender < rusqlite ::Result < ( ) > > , Quote ) ,
GetQuote (
oneshot ::Sender < rusqlite ::Result < Option < Quote > > > ,
Option < String > ,
) ,
2022-07-20 03:58:33 -05:00
StartSearch (
oneshot ::Sender < rusqlite ::Result < Vec < Quote > > > ,
String ,
String ,
usize ,
) ,
NextSearch (
oneshot ::Sender < rusqlite ::Result < Option < Vec < Quote > > > > ,
String ,
usize ,
) ,
2022-01-01 15:35:27 -06:00
}
pub struct DbExecutor {
rx : UnboundedReceiver < Task > ,
db : rusqlite ::Connection ,
}
2022-01-26 17:58:00 -06:00
#[ derive(Serialize, Debug) ]
pub struct Quote {
pub author : String ,
2022-01-27 17:44:50 -06:00
pub quote : String ,
2022-01-26 17:58:00 -06:00
}
2022-01-01 15:35:27 -06:00
impl DbExecutor {
pub fn create ( dbpath : & str ) -> rusqlite ::Result < ( Self , ExecutorConnection ) > {
let ( tx , rx ) = unbounded_channel ( ) ;
let db = rusqlite ::Connection ::open ( dbpath ) ? ;
2022-01-02 14:58:54 -06:00
db . execute (
2022-07-28 05:50:17 -05:00
" create virtual table if not exists quotes using fts5(username, quote) " ,
2022-01-02 14:58:54 -06:00
[ ] ,
) ? ;
2022-01-01 15:35:27 -06:00
tracing ::debug! ( " Database connected ({}) " , dbpath ) ;
Ok ( ( Self { rx , db } , ExecutorConnection { tx } ) )
}
pub fn run ( mut self ) {
2022-07-19 08:07:55 -05:00
let mut searches : HashMap < String , ( String , i64 ) > = HashMap ::new ( ) ;
2022-01-01 15:35:27 -06:00
while let Some ( task ) = self . rx . blocking_recv ( ) {
2022-07-19 08:07:55 -05:00
let before = Instant ::now ( ) ;
tracing ::debug! ( " got task {:?} " , task ) ;
2022-01-01 15:35:27 -06:00
match task {
2022-07-24 05:27:49 -05:00
Task ::AddQuote ( tx , mut quote ) = > {
quote . author . make_ascii_lowercase ( ) ;
2022-07-17 13:10:52 -05:00
let result = self
. db
. execute (
" insert into quotes(quote,username) values(?,?) " ,
params! [ quote . quote , quote . author ] ,
)
. map ( | _ | ( ) ) ;
tx . send ( result ) . unwrap ( ) ;
2022-01-01 15:35:27 -06:00
}
Task ::GetQuote ( tx , author ) = > {
2022-07-24 05:27:49 -05:00
let result = if let Some ( mut author ) = author {
author . make_ascii_lowercase ( ) ;
2022-07-28 05:50:17 -05:00
self . db . query_row ( " select quote,username from quotes where username match ? order by random() limit 1 " , params! [ author ] , | v | Ok ( Quote { quote :v . get ( 0 ) ? , author :v . get ( 1 ) ? } ) )
2022-01-01 15:35:27 -06:00
} else {
2022-01-26 17:58:00 -06:00
self . db . query_row ( " select quote,username from quotes order by random() limit 1 " , params! [ ] , | v | Ok ( Quote { quote :v . get ( 0 ) ? , author :v . get ( 1 ) ? } ) )
2022-07-17 13:10:52 -05:00
} . optional ( ) ;
tx . send ( result ) . unwrap ( ) ;
2022-01-01 15:35:27 -06:00
}
2022-07-19 08:07:55 -05:00
Task ::StartSearch ( tx , user , query , limit ) = > {
2022-07-20 03:58:33 -05:00
tx . send ( self . start_search ( & mut searches , user , query , limit ) )
. unwrap ( ) ;
2022-07-19 08:07:55 -05:00
}
Task ::NextSearch ( tx , user , limit ) = > {
2022-07-20 03:58:33 -05:00
tx . send ( self . next_search ( & mut searches , & user , limit ) )
. unwrap ( ) ;
2022-01-27 17:44:50 -06:00
}
2022-01-01 15:35:27 -06:00
}
2022-07-20 03:58:33 -05:00
tracing ::debug! (
" task took {}ms " ,
Instant ::now ( ) . duration_since ( before ) . as_secs_f64 ( ) / 1000.0
2022-07-28 11:39:05 -05:00
) ;
2022-01-01 15:35:27 -06:00
}
}
2022-01-30 11:26:09 -06:00
2022-07-20 03:58:33 -05:00
fn start_search (
& self ,
searches : & mut HashMap < String , ( String , i64 ) > ,
user : String ,
query : String ,
limit : usize ,
) -> rusqlite ::Result < Vec < Quote > > {
2022-07-28 06:21:17 -05:00
let ( quotes , oid ) = self . yield_quotes_oid (
" select oid,quote,username from quotes where quote match ? order by oid asc limit ? " ,
params! [ query , limit ] ,
) ? ;
2022-07-19 08:07:55 -05:00
searches . insert ( user , ( query , oid ) ) ;
Ok ( quotes )
}
2022-07-20 03:58:33 -05:00
fn next_search (
& self ,
searches : & mut HashMap < String , ( String , i64 ) > ,
user : & str ,
limit : usize ,
) -> rusqlite ::Result < Option < Vec < Quote > > > {
2022-07-19 08:07:55 -05:00
let ( query , old_oid ) = if let Some ( o ) = searches . get_mut ( user ) {
o
} else {
2022-07-20 03:58:33 -05:00
return Ok ( None ) ;
2022-07-19 08:07:55 -05:00
} ;
2022-07-28 05:50:17 -05:00
let ( quotes , new_oid ) = self . yield_quotes_oid ( " select oid,quote,username from quotes where oid > ? and quote match ? order by oid asc limit ? " , params! [ * old_oid , & * query , limit ] ) ? ;
2022-07-19 08:07:55 -05:00
if new_oid ! = - 1 {
* old_oid = new_oid ;
}
Ok ( Some ( quotes ) )
}
2022-07-20 03:58:33 -05:00
fn yield_quotes_oid < P : Params > (
& self ,
sql : & str ,
params : P ,
) -> rusqlite ::Result < ( Vec < Quote > , i64 ) > {
2022-07-19 08:07:55 -05:00
let mut lastoid = - 1 i64 ;
let quotes = self . db . prepare ( sql ) . and_then ( | mut v | {
2022-01-30 11:26:09 -06:00
v . query ( params ) . and_then ( | mut v | {
2022-07-17 13:10:52 -05:00
let mut quotes : Vec < Quote > = Vec ::new ( ) ;
2022-01-30 11:26:09 -06:00
while let Some ( row ) = v . next ( ) ? {
2022-07-19 08:07:55 -05:00
lastoid = row . get ( 0 ) ? ;
2022-01-30 11:26:09 -06:00
quotes . push ( Quote {
2022-07-19 08:07:55 -05:00
quote : row . get ( 1 ) ? ,
author : row . get ( 2 ) ? ,
2022-01-30 11:26:09 -06:00
} ) ;
}
Ok ( quotes )
} )
2022-07-19 08:07:55 -05:00
} ) ? ;
Ok ( ( quotes , lastoid ) )
2022-01-30 11:26:09 -06:00
}
2022-01-01 15:35:27 -06:00
}
pub struct ExecutorConnection {
tx : UnboundedSender < Task > ,
}
impl Clone for ExecutorConnection {
fn clone ( & self ) -> Self {
2022-01-02 14:58:54 -06:00
Self {
tx : self . tx . clone ( ) ,
}
2022-01-01 15:35:27 -06:00
}
}
2022-01-27 17:44:50 -06:00
macro_rules ! executor_wrapper {
2022-01-30 11:26:09 -06:00
( $name :ident , $task :expr , $ret :ty , $( $arg :ident : $ty :ty ) , * ) = > {
pub async fn $name ( & self , $( $arg : $ty ) , * ) -> $ret {
let ( otx , orx ) = oneshot ::channel ( ) ;
self . tx . send ( $task ( otx , $( $arg ) , * ) ) . unwrap ( ) ;
orx . await . unwrap ( )
}
} ;
( $name :ident , $task :expr , $ret :ty ) = > {
pub async fn $name ( & self ) -> $ret {
let ( otx , orx ) = oneshot ::channel ( ) ;
self . tx . send ( $task ( otx ) ) . unwrap ( ) ;
orx . await . unwrap ( )
}
} ;
2022-01-27 17:44:50 -06:00
}
impl ExecutorConnection {
// WARNING: these methods are NOT cancel-safe
2022-07-17 13:10:52 -05:00
executor_wrapper! (
add_quote ,
Task ::AddQuote ,
rusqlite ::Result < ( ) > ,
quote : Quote
) ;
2022-01-27 17:44:50 -06:00
executor_wrapper! (
get_quote ,
Task ::GetQuote ,
2022-07-17 13:10:52 -05:00
rusqlite ::Result < Option < Quote > > ,
2022-01-27 17:44:50 -06:00
author : Option < String >
) ;
2022-07-16 11:33:43 -05:00
executor_wrapper! (
search_quotes ,
2022-07-19 08:07:55 -05:00
Task ::StartSearch ,
2022-07-17 13:10:52 -05:00
rusqlite ::Result < Vec < Quote > > ,
2022-07-19 08:07:55 -05:00
user : String ,
2022-07-17 13:54:32 -05:00
query : String ,
limit : usize
2022-07-16 11:33:43 -05:00
) ;
2022-07-19 08:07:55 -05:00
executor_wrapper! (
advance_search ,
Task ::NextSearch ,
rusqlite ::Result < Option < Vec < Quote > > > ,
user : String ,
limit : usize
) ;
2022-01-01 15:35:27 -06:00
}