Compare commits
No commits in common. "ec41f1d4ff07a00223b6ed34fc5b65c38d3cd535" and "1ed6317272fe819e7e12b1be6fcff62d409c8f03" have entirely different histories.
ec41f1d4ff
...
1ed6317272
|
@ -195,6 +195,9 @@ where
|
||||||
// While we aren't finished, receive more data from the other party
|
// While we aren't finished, receive more data from the other party
|
||||||
let response = Response::new(str::from_utf8(&sasl_data)?.to_string());
|
let response = Response::new(str::from_utf8(&sasl_data)?.to_string());
|
||||||
debug!("response: {:?}", response);
|
debug!("response: {:?}", response);
|
||||||
|
let stdout = tokio::io::stdout();
|
||||||
|
let mut writer = Writer::new(stdout);
|
||||||
|
writer.write_full(&response).await?;
|
||||||
self.writer.write_full(&response).await?;
|
self.writer.write_full(&response).await?;
|
||||||
debug!("response written");
|
debug!("response written");
|
||||||
|
|
||||||
|
@ -412,7 +415,7 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::connection::Connection;
|
use crate::{connection::Connection};
|
||||||
use futures::sink;
|
use futures::sink;
|
||||||
use test_log::test;
|
use test_log::test;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
luz.db
|
|
|
@ -13,5 +13,3 @@ stanza = { version = "0.1.0", path = "../stanza" }
|
||||||
tokio = "1.42.0"
|
tokio = "1.42.0"
|
||||||
tokio-stream = "0.1.17"
|
tokio-stream = "0.1.17"
|
||||||
tokio-util = "0.7.13"
|
tokio-util = "0.7.13"
|
||||||
tracing = "0.1.41"
|
|
||||||
tracing-subscriber = "0.3.19"
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
|
// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
|
@ -11,7 +10,6 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
|
||||||
use jid::JID;
|
use jid::JID;
|
||||||
use read::{ReadControl, ReadControlHandle};
|
use read::{ReadControl, ReadControlHandle};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use stanza::client::Stanza;
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot, Mutex},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
|
@ -32,7 +30,6 @@ pub struct Supervisor {
|
||||||
tokio::task::JoinSet<()>,
|
tokio::task::JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
writer_handle: WriteControlHandle,
|
writer_handle: WriteControlHandle,
|
||||||
|
@ -59,7 +56,6 @@ impl Supervisor {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
writer_handle: WriteControlHandle,
|
writer_handle: WriteControlHandle,
|
||||||
|
@ -112,7 +108,7 @@ impl Supervisor {
|
||||||
// consider awaiting/aborting the read and write threads
|
// consider awaiting/aborting the read and write threads
|
||||||
let (send, recv) = oneshot::channel();
|
let (send, recv) = oneshot::channel();
|
||||||
let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
|
let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
|
||||||
let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = tokio::select! {
|
let (db, update_sender, tasks, supervisor_command, write_sender) = tokio::select! {
|
||||||
Ok(s) = recv => s,
|
Ok(s) = recv => s,
|
||||||
Ok(s) = &mut self.reader_crash => s,
|
Ok(s) = &mut self.reader_crash => s,
|
||||||
// in case, just break as irrecoverable
|
// in case, just break as irrecoverable
|
||||||
|
@ -138,8 +134,7 @@ impl Supervisor {
|
||||||
update_sender,
|
update_sender,
|
||||||
supervisor_command,
|
supervisor_command,
|
||||||
write_sender,
|
write_sender,
|
||||||
tasks,
|
tasks
|
||||||
pending_iqs,
|
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -154,7 +149,7 @@ impl Supervisor {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok((db, update_sender, tasks, supervisor_control, write_handle, pending_iqs)) = &mut self.reader_crash => {
|
Ok((db, update_sender, tasks, supervisor_control, write_handle)) = &mut self.reader_crash => {
|
||||||
let (send, recv) = oneshot::channel();
|
let (send, recv) = oneshot::channel();
|
||||||
let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
|
let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
|
||||||
let (retry_msg, mut write_receiver) = tokio::select! {
|
let (retry_msg, mut write_receiver) = tokio::select! {
|
||||||
|
@ -187,8 +182,7 @@ impl Supervisor {
|
||||||
update_sender,
|
update_sender,
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
write_handle,
|
write_handle,
|
||||||
tasks,
|
tasks
|
||||||
pending_iqs,
|
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -258,7 +252,6 @@ impl SupervisorHandle {
|
||||||
on_shutdown: oneshot::Sender<()>,
|
on_shutdown: oneshot::Sender<()>,
|
||||||
jid: Arc<Mutex<JID>>,
|
jid: Arc<Mutex<JID>>,
|
||||||
password: Arc<String>,
|
password: Arc<String>,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
) -> (WriteHandle, Self) {
|
) -> (WriteHandle, Self) {
|
||||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||||
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
||||||
|
@ -274,7 +267,6 @@ impl SupervisorHandle {
|
||||||
update_sender.clone(),
|
update_sender.clone(),
|
||||||
command_sender.clone(),
|
command_sender.clone(),
|
||||||
write_handle.clone(),
|
write_handle.clone(),
|
||||||
pending_iqs,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let actor = Supervisor::new(
|
let actor = Supervisor::new(
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -9,7 +7,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use stanza::client::Stanza;
|
use stanza::client::Stanza;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot, Mutex},
|
sync::{mpsc, oneshot},
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -30,7 +28,6 @@ pub struct Read {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
@ -39,8 +36,6 @@ pub struct Read {
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
disconnecting: bool,
|
disconnecting: bool,
|
||||||
disconnect_timedout: oneshot::Receiver<()>,
|
disconnect_timedout: oneshot::Receiver<()>,
|
||||||
// TODO: use proper stanza ids
|
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Read {
|
impl Read {
|
||||||
|
@ -53,7 +48,6 @@ impl Read {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
@ -61,7 +55,6 @@ impl Read {
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
write_handle: WriteHandle,
|
write_handle: WriteHandle,
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (send, recv) = oneshot::channel();
|
let (send, recv) = oneshot::channel();
|
||||||
Self {
|
Self {
|
||||||
|
@ -75,20 +68,14 @@ impl Read {
|
||||||
tasks,
|
tasks,
|
||||||
disconnecting: false,
|
disconnecting: false,
|
||||||
disconnect_timedout: recv,
|
disconnect_timedout: recv,
|
||||||
pending_iqs,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
println!("started read thread");
|
|
||||||
// let stanza = self.stream.read::<Stanza>().await;
|
|
||||||
// println!("{:?}", stanza);
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// if still haven't received the end tag in time, just kill itself
|
// if still haven't received the end tag in time, just kill itself
|
||||||
// TODO: is this okay??? what if notification thread dies?
|
_ = &mut self.disconnect_timedout => {
|
||||||
Ok(()) = &mut self.disconnect_timedout => {
|
|
||||||
println!("disconnect_timedout");
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Some(msg) = self.control_receiver.recv() => {
|
Some(msg) = self.control_receiver.recv() => {
|
||||||
|
@ -104,19 +91,17 @@ impl Read {
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
ReadControl::Abort(sender) => {
|
ReadControl::Abort(sender) => {
|
||||||
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
|
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
s = self.stream.read::<Stanza>() => {
|
stanza = self.stream.read::<Stanza>() => {
|
||||||
println!("read stanza");
|
match stanza {
|
||||||
match s {
|
|
||||||
Ok(s) => {
|
Ok(s) => {
|
||||||
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
|
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("error: {:?}", e);
|
|
||||||
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
|
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
|
||||||
// match e {
|
// match e {
|
||||||
// peanuts::Error::ReadError(error) => todo!(),
|
// peanuts::Error::ReadError(error) => todo!(),
|
||||||
|
@ -141,7 +126,7 @@ impl Read {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
|
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
|
||||||
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
|
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
|
@ -150,12 +135,6 @@ impl Read {
|
||||||
else => break
|
else => break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
println!("stopping read thread");
|
|
||||||
// when it aborts, must clear iq map no matter what
|
|
||||||
let mut iqs = self.pending_iqs.lock().await;
|
|
||||||
for (_id, sender) in iqs.drain() {
|
|
||||||
let _ = sender.send(Err(Error::LostConnection));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +150,7 @@ async fn handle_stanza(
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
write_handle: WriteHandle,
|
write_handle: WriteHandle,
|
||||||
) {
|
) {
|
||||||
println!("{:?}", stanza)
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum ReadControl {
|
pub enum ReadControl {
|
||||||
|
@ -183,7 +162,6 @@ pub enum ReadControl {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
@ -216,13 +194,11 @@ impl ReadControlHandle {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
jabber_write: WriteHandle,
|
jabber_write: WriteHandle,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
@ -235,7 +211,6 @@ impl ReadControlHandle {
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
jabber_write,
|
jabber_write,
|
||||||
JoinSet::new(),
|
JoinSet::new(),
|
||||||
pending_iqs,
|
|
||||||
);
|
);
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
@ -253,14 +228,12 @@ impl ReadControlHandle {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
jabber_write: WriteHandle,
|
jabber_write: WriteHandle,
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
@ -273,7 +246,6 @@ impl ReadControlHandle {
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
jabber_write,
|
jabber_write,
|
||||||
tasks,
|
tasks,
|
||||||
pending_iqs,
|
|
||||||
);
|
);
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ pub struct Write {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WriteMessage {
|
pub struct WriteMessage {
|
||||||
pub stanza: Stanza,
|
stanza: Stanza,
|
||||||
pub respond_to: oneshot::Sender<Result<(), Error>>,
|
pub respond_to: oneshot::Sender<Result<(), Error>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
AlreadyConnected,
|
AlreadyConnected,
|
||||||
Jabber(jabber::Error),
|
Jabber(jabber::Error),
|
||||||
|
|
100
luz/src/lib.rs
100
luz/src/lib.rs
|
@ -1,19 +1,9 @@
|
||||||
use std::{
|
use std::sync::Arc;
|
||||||
collections::HashMap,
|
|
||||||
ops::{Deref, DerefMut},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use connection::{write::WriteMessage, SupervisorSender};
|
use connection::SupervisorSender;
|
||||||
use jabber::JID;
|
use jabber::JID;
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use stanza::{
|
use stanza::roster;
|
||||||
client::{
|
|
||||||
iq::{self, Iq, IqType},
|
|
||||||
Stanza,
|
|
||||||
},
|
|
||||||
roster::{self, Query},
|
|
||||||
};
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot, Mutex},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
|
@ -32,7 +22,6 @@ pub struct Luz {
|
||||||
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
||||||
password: Arc<String>,
|
password: Arc<String>,
|
||||||
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
|
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
|
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
|
||||||
|
@ -61,14 +50,12 @@ impl Luz {
|
||||||
sender,
|
sender,
|
||||||
tasks: JoinSet::new(),
|
tasks: JoinSet::new(),
|
||||||
connection_supervisor_shutdown,
|
connection_supervisor_shutdown,
|
||||||
pending_iqs: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
|
|
||||||
_ = &mut self.connection_supervisor_shutdown => {
|
_ = &mut self.connection_supervisor_shutdown => {
|
||||||
*self.connected.lock().await = None
|
*self.connected.lock().await = None
|
||||||
}
|
}
|
||||||
|
@ -77,8 +64,7 @@ impl Luz {
|
||||||
// TODO: dispatch commands separate tasks
|
// TODO: dispatch commands separate tasks
|
||||||
match msg {
|
match msg {
|
||||||
CommandMessage::Connect => {
|
CommandMessage::Connect => {
|
||||||
let mut connection_lock = self.connected.lock().await;
|
match self.connected.lock().await.as_ref() {
|
||||||
match connection_lock.as_ref() {
|
|
||||||
Some(_) => {
|
Some(_) => {
|
||||||
self.sender
|
self.sender
|
||||||
.send(UpdateMessage::Error(Error::AlreadyConnected))
|
.send(UpdateMessage::Error(Error::AlreadyConnected))
|
||||||
|
@ -101,13 +87,9 @@ impl Luz {
|
||||||
shutdown_send,
|
shutdown_send,
|
||||||
self.jid.clone(),
|
self.jid.clone(),
|
||||||
self.password.clone(),
|
self.password.clone(),
|
||||||
self.pending_iqs.clone(),
|
|
||||||
);
|
);
|
||||||
self.connection_supervisor_shutdown = shutdown_recv;
|
self.connection_supervisor_shutdown = shutdown_recv;
|
||||||
*connection_lock = Some((writer, supervisor));
|
*self.connected.lock().await = Some((writer, supervisor));
|
||||||
self.sender
|
|
||||||
.send(UpdateMessage::Connected)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.sender.send(UpdateMessage::Error(e.into()));
|
self.sender.send(UpdateMessage::Error(e.into()));
|
||||||
|
@ -139,7 +121,6 @@ impl Luz {
|
||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
self.sender.clone(),
|
self.sender.clone(),
|
||||||
// TODO: iq hashmap
|
// TODO: iq hashmap
|
||||||
self.pending_iqs.clone()
|
|
||||||
)),
|
)),
|
||||||
None => self.tasks.spawn(msg.handle_offline(
|
None => self.tasks.spawn(msg.handle_offline(
|
||||||
self.jid.clone(),
|
self.jid.clone(),
|
||||||
|
@ -174,72 +155,19 @@ impl CommandMessage {
|
||||||
jid: Arc<Mutex<JID>>,
|
jid: Arc<Mutex<JID>>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
|
||||||
) {
|
) {
|
||||||
match self {
|
todo!()
|
||||||
CommandMessage::Connect => unreachable!(),
|
|
||||||
CommandMessage::Disconnect => unreachable!(),
|
|
||||||
CommandMessage::GetRoster => {
|
|
||||||
// TODO: jid resource should probably be stored within the connection
|
|
||||||
let owned_jid: JID;
|
|
||||||
{
|
|
||||||
owned_jid = jid.lock().await.clone();
|
|
||||||
}
|
|
||||||
let stanza = Stanza::Iq(Iq {
|
|
||||||
from: Some(owned_jid),
|
|
||||||
id: "getting-roster".to_string(),
|
|
||||||
to: None,
|
|
||||||
r#type: IqType::Get,
|
|
||||||
lang: None,
|
|
||||||
query: Some(iq::Query::Roster(roster::Query {
|
|
||||||
ver: None,
|
|
||||||
items: Vec::new(),
|
|
||||||
})),
|
|
||||||
errors: Vec::new(),
|
|
||||||
});
|
|
||||||
let (send, recv) = oneshot::channel();
|
|
||||||
let _ = write_handle
|
|
||||||
.send(WriteMessage {
|
|
||||||
stanza,
|
|
||||||
respond_to: send,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
match recv.await {
|
|
||||||
Ok(Ok(())) => println!("roster request sent"),
|
|
||||||
e => println!("error: {:?}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
CommandMessage::SendMessage(jid, _) => todo!(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
|
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
|
||||||
// #[derive(Clone)]
|
|
||||||
pub struct LuzHandle {
|
pub struct LuzHandle {
|
||||||
sender: mpsc::Sender<CommandMessage>,
|
sender: mpsc::Sender<CommandMessage>,
|
||||||
}
|
receiver: mpsc::Receiver<UpdateMessage>,
|
||||||
|
|
||||||
impl Deref for LuzHandle {
|
|
||||||
type Target = mpsc::Sender<CommandMessage>;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.sender
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DerefMut for LuzHandle {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.sender
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LuzHandle {
|
impl LuzHandle {
|
||||||
pub fn new(
|
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
|
||||||
jid: JID,
|
|
||||||
password: String,
|
|
||||||
db: SqlitePool,
|
|
||||||
) -> (Self, mpsc::Receiver<UpdateMessage>) {
|
|
||||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||||
let (update_sender, update_receiver) = mpsc::channel(20);
|
let (update_sender, update_receiver) = mpsc::channel(20);
|
||||||
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
|
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
|
||||||
|
@ -256,12 +184,10 @@ impl LuzHandle {
|
||||||
);
|
);
|
||||||
tokio::spawn(async move { actor.run().await });
|
tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
(
|
Self {
|
||||||
Self {
|
sender: command_sender,
|
||||||
sender: command_sender,
|
receiver: update_receiver,
|
||||||
},
|
}
|
||||||
update_receiver,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,9 +199,7 @@ pub enum CommandMessage {
|
||||||
SendMessage(JID, String),
|
SendMessage(JID, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum UpdateMessage {
|
pub enum UpdateMessage {
|
||||||
Error(Error),
|
Error(Error),
|
||||||
Connected,
|
|
||||||
Roster(Vec<roster::Item>),
|
Roster(Vec<roster::Item>),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,23 +1,3 @@
|
||||||
use std::time::Duration;
|
fn main() {
|
||||||
|
println!("Hello, world!");
|
||||||
use luz::{CommandMessage, LuzHandle};
|
|
||||||
use sqlx::SqlitePool;
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
let db = SqlitePool::connect("./luz.db").await.unwrap();
|
|
||||||
let (luz, mut recv) =
|
|
||||||
LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(msg) = recv.recv().await {
|
|
||||||
println!("{:#?}", msg)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
luz.send(CommandMessage::Connect).await.unwrap();
|
|
||||||
luz.send(CommandMessage::GetRoster).await.unwrap();
|
|
||||||
tokio::time::sleep(Duration::from_secs(15)).await;
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue