Compare commits

..

3 Commits

Author SHA1 Message Date
cel 🌸 ec41f1d4ff remove stupid stdout sasl response bit 2025-02-11 22:20:42 +00:00
cel 🌸 ad0054ea56 actors complete ? 2025-02-11 22:20:16 +00:00
cel 🌸 3634828531 add iq hashmap for iq requests 2025-02-11 10:54:16 +00:00
9 changed files with 163 additions and 30 deletions

View File

@ -195,9 +195,6 @@ where
// While we aren't finished, receive more data from the other party // While we aren't finished, receive more data from the other party
let response = Response::new(str::from_utf8(&sasl_data)?.to_string()); let response = Response::new(str::from_utf8(&sasl_data)?.to_string());
debug!("response: {:?}", response); debug!("response: {:?}", response);
let stdout = tokio::io::stdout();
let mut writer = Writer::new(stdout);
writer.write_full(&response).await?;
self.writer.write_full(&response).await?; self.writer.write_full(&response).await?;
debug!("response written"); debug!("response written");
@ -415,7 +412,7 @@ mod tests {
use std::time::Duration; use std::time::Duration;
use super::*; use super::*;
use crate::{connection::Connection}; use crate::connection::Connection;
use futures::sink; use futures::sink;
use test_log::test; use test_log::test;
use tokio::time::sleep; use tokio::time::sleep;

1
luz/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
luz.db

View File

@ -13,3 +13,5 @@ stanza = { version = "0.1.0", path = "../stanza" }
tokio = "1.42.0" tokio = "1.42.0"
tokio-stream = "0.1.17" tokio-stream = "0.1.17"
tokio-util = "0.7.13" tokio-util = "0.7.13"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"

View File

@ -1,6 +1,7 @@
// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly // TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
use std::{ use std::{
collections::HashMap,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::Arc,
time::Duration, time::Duration,
@ -10,6 +11,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use jid::JID; use jid::JID;
use read::{ReadControl, ReadControlHandle}; use read::{ReadControl, ReadControlHandle};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use stanza::client::Stanza;
use tokio::{ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet}, task::{JoinHandle, JoinSet},
@ -30,6 +32,7 @@ pub struct Supervisor {
tokio::task::JoinSet<()>, tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle, writer_handle: WriteControlHandle,
@ -56,6 +59,7 @@ impl Supervisor {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle, writer_handle: WriteControlHandle,
@ -108,7 +112,7 @@ impl Supervisor {
// consider awaiting/aborting the read and write threads // consider awaiting/aborting the read and write threads
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
let _ = self.reader_handle.send(ReadControl::Abort(send)).await; let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
let (db, update_sender, tasks, supervisor_command, write_sender) = tokio::select! { let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = tokio::select! {
Ok(s) = recv => s, Ok(s) = recv => s,
Ok(s) = &mut self.reader_crash => s, Ok(s) = &mut self.reader_crash => s,
// in case, just break as irrecoverable // in case, just break as irrecoverable
@ -134,7 +138,8 @@ impl Supervisor {
update_sender, update_sender,
supervisor_command, supervisor_command,
write_sender, write_sender,
tasks tasks,
pending_iqs,
); );
}, },
Err(e) => { Err(e) => {
@ -149,7 +154,7 @@ impl Supervisor {
}, },
} }
}, },
Ok((db, update_sender, tasks, supervisor_control, write_handle)) = &mut self.reader_crash => { Ok((db, update_sender, tasks, supervisor_control, write_handle, pending_iqs)) = &mut self.reader_crash => {
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
let _ = self.writer_handle.send(WriteControl::Abort(send)).await; let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
let (retry_msg, mut write_receiver) = tokio::select! { let (retry_msg, mut write_receiver) = tokio::select! {
@ -182,7 +187,8 @@ impl Supervisor {
update_sender, update_sender,
supervisor_control, supervisor_control,
write_handle, write_handle,
tasks tasks,
pending_iqs,
); );
}, },
Err(e) => { Err(e) => {
@ -252,6 +258,7 @@ impl SupervisorHandle {
on_shutdown: oneshot::Sender<()>, on_shutdown: oneshot::Sender<()>,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
password: Arc<String>, password: Arc<String>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> (WriteHandle, Self) { ) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20); let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel(); let (writer_error_sender, writer_error_receiver) = oneshot::channel();
@ -267,6 +274,7 @@ impl SupervisorHandle {
update_sender.clone(), update_sender.clone(),
command_sender.clone(), command_sender.clone(),
write_handle.clone(), write_handle.clone(),
pending_iqs,
); );
let actor = Supervisor::new( let actor = Supervisor::new(

View File

@ -1,5 +1,7 @@
use std::{ use std::{
collections::HashMap,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc,
time::Duration, time::Duration,
}; };
@ -7,7 +9,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use stanza::client::Stanza; use stanza::client::Stanza;
use tokio::{ use tokio::{
sync::{mpsc, oneshot}, sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet}, task::{JoinHandle, JoinSet},
}; };
@ -28,6 +30,7 @@ pub struct Read {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
@ -36,6 +39,8 @@ pub struct Read {
tasks: JoinSet<()>, tasks: JoinSet<()>,
disconnecting: bool, disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>, disconnect_timedout: oneshot::Receiver<()>,
// TODO: use proper stanza ids
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
} }
impl Read { impl Read {
@ -48,6 +53,7 @@ impl Read {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
@ -55,6 +61,7 @@ impl Read {
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle, write_handle: WriteHandle,
tasks: JoinSet<()>, tasks: JoinSet<()>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> Self { ) -> Self {
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
Self { Self {
@ -68,14 +75,20 @@ impl Read {
tasks, tasks,
disconnecting: false, disconnecting: false,
disconnect_timedout: recv, disconnect_timedout: recv,
pending_iqs,
} }
} }
async fn run(mut self) { async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
// println!("{:?}", stanza);
loop { loop {
tokio::select! { tokio::select! {
// if still haven't received the end tag in time, just kill itself // if still haven't received the end tag in time, just kill itself
_ = &mut self.disconnect_timedout => { // TODO: is this okay??? what if notification thread dies?
Ok(()) = &mut self.disconnect_timedout => {
println!("disconnect_timedout");
break; break;
} }
Some(msg) = self.control_receiver.recv() => { Some(msg) = self.control_receiver.recv() => {
@ -91,17 +104,19 @@ impl Read {
}) })
}, },
ReadControl::Abort(sender) => { ReadControl::Abort(sender) => {
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle)); let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
break; break;
}, },
}; };
}, },
stanza = self.stream.read::<Stanza>() => { s = self.stream.read::<Stanza>() => {
match stanza { println!("read stanza");
match s {
Ok(s) => { Ok(s) => {
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
}, },
Err(e) => { Err(e) => {
println!("error: {:?}", e);
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
// match e { // match e {
// peanuts::Error::ReadError(error) => todo!(), // peanuts::Error::ReadError(error) => todo!(),
@ -126,7 +141,7 @@ impl Read {
break; break;
} else { } else {
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle)); let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
} }
break; break;
}, },
@ -135,6 +150,12 @@ impl Read {
else => break else => break
} }
} }
println!("stopping read thread");
// when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(Error::LostConnection));
}
} }
} }
@ -150,7 +171,7 @@ async fn handle_stanza(
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle, write_handle: WriteHandle,
) { ) {
todo!() println!("{:?}", stanza)
} }
pub enum ReadControl { pub enum ReadControl {
@ -162,6 +183,7 @@ pub enum ReadControl {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
), ),
} }
@ -194,11 +216,13 @@ impl ReadControlHandle {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle, jabber_write: WriteHandle,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> Self { ) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20); let (control_sender, control_receiver) = mpsc::channel(20);
@ -211,6 +235,7 @@ impl ReadControlHandle {
supervisor_control, supervisor_control,
jabber_write, jabber_write,
JoinSet::new(), JoinSet::new(),
pending_iqs,
); );
let handle = tokio::spawn(async move { actor.run().await }); let handle = tokio::spawn(async move { actor.run().await });
@ -228,12 +253,14 @@ impl ReadControlHandle {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle, jabber_write: WriteHandle,
tasks: JoinSet<()>, tasks: JoinSet<()>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) -> Self { ) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20); let (control_sender, control_receiver) = mpsc::channel(20);
@ -246,6 +273,7 @@ impl ReadControlHandle {
supervisor_control, supervisor_control,
jabber_write, jabber_write,
tasks, tasks,
pending_iqs,
); );
let handle = tokio::spawn(async move { actor.run().await }); let handle = tokio::spawn(async move { actor.run().await });

View File

@ -18,7 +18,7 @@ pub struct Write {
} }
pub struct WriteMessage { pub struct WriteMessage {
stanza: Stanza, pub stanza: Stanza,
pub respond_to: oneshot::Sender<Result<(), Error>>, pub respond_to: oneshot::Sender<Result<(), Error>>,
} }

View File

@ -1,3 +1,4 @@
#[derive(Debug)]
pub enum Error { pub enum Error {
AlreadyConnected, AlreadyConnected,
Jabber(jabber::Error), Jabber(jabber::Error),

View File

@ -1,9 +1,19 @@
use std::sync::Arc; use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::Arc,
};
use connection::SupervisorSender; use connection::{write::WriteMessage, SupervisorSender};
use jabber::JID; use jabber::JID;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use stanza::roster; use stanza::{
client::{
iq::{self, Iq, IqType},
Stanza,
},
roster::{self, Query},
};
use tokio::{ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
task::JoinSet, task::JoinSet,
@ -22,6 +32,7 @@ pub struct Luz {
// TODO: use a dyn passwordprovider trait to avoid storing password in memory // TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>, password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>, connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
db: SqlitePool, db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
@ -50,12 +61,14 @@ impl Luz {
sender, sender,
tasks: JoinSet::new(), tasks: JoinSet::new(),
connection_supervisor_shutdown, connection_supervisor_shutdown,
pending_iqs: Arc::new(Mutex::new(HashMap::new())),
} }
} }
async fn run(mut self) { async fn run(mut self) {
loop { loop {
tokio::select! { tokio::select! {
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
_ = &mut self.connection_supervisor_shutdown => { _ = &mut self.connection_supervisor_shutdown => {
*self.connected.lock().await = None *self.connected.lock().await = None
} }
@ -64,7 +77,8 @@ impl Luz {
// TODO: dispatch commands separate tasks // TODO: dispatch commands separate tasks
match msg { match msg {
CommandMessage::Connect => { CommandMessage::Connect => {
match self.connected.lock().await.as_ref() { let mut connection_lock = self.connected.lock().await;
match connection_lock.as_ref() {
Some(_) => { Some(_) => {
self.sender self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected)) .send(UpdateMessage::Error(Error::AlreadyConnected))
@ -87,9 +101,13 @@ impl Luz {
shutdown_send, shutdown_send,
self.jid.clone(), self.jid.clone(),
self.password.clone(), self.password.clone(),
self.pending_iqs.clone(),
); );
self.connection_supervisor_shutdown = shutdown_recv; self.connection_supervisor_shutdown = shutdown_recv;
*self.connected.lock().await = Some((writer, supervisor)); *connection_lock = Some((writer, supervisor));
self.sender
.send(UpdateMessage::Connected)
.await;
} }
Err(e) => { Err(e) => {
self.sender.send(UpdateMessage::Error(e.into())); self.sender.send(UpdateMessage::Error(e.into()));
@ -121,6 +139,7 @@ impl Luz {
self.db.clone(), self.db.clone(),
self.sender.clone(), self.sender.clone(),
// TODO: iq hashmap // TODO: iq hashmap
self.pending_iqs.clone()
)), )),
None => self.tasks.spawn(msg.handle_offline( None => self.tasks.spawn(msg.handle_offline(
self.jid.clone(), self.jid.clone(),
@ -155,19 +174,72 @@ impl CommandMessage {
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
db: SqlitePool, db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) { ) {
todo!() match self {
CommandMessage::Connect => unreachable!(),
CommandMessage::Disconnect => unreachable!(),
CommandMessage::GetRoster => {
// TODO: jid resource should probably be stored within the connection
let owned_jid: JID;
{
owned_jid = jid.lock().await.clone();
}
let stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
id: "getting-roster".to_string(),
to: None,
r#type: IqType::Get,
lang: None,
query: Some(iq::Query::Roster(roster::Query {
ver: None,
items: Vec::new(),
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
let _ = write_handle
.send(WriteMessage {
stanza,
respond_to: send,
})
.await;
match recv.await {
Ok(Ok(())) => println!("roster request sent"),
e => println!("error: {:?}", e),
};
}
CommandMessage::SendMessage(jid, _) => todo!(),
}
} }
} }
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping // TODO: separate sender and receiver, store handle to Luz process to ensure dropping
// #[derive(Clone)]
pub struct LuzHandle { pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>, sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<UpdateMessage>, }
impl Deref for LuzHandle {
type Target = mpsc::Sender<CommandMessage>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for LuzHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
} }
impl LuzHandle { impl LuzHandle {
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self { pub fn new(
jid: JID,
password: String,
db: SqlitePool,
) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20); let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20); let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting) // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
@ -184,10 +256,12 @@ impl LuzHandle {
); );
tokio::spawn(async move { actor.run().await }); tokio::spawn(async move { actor.run().await });
Self { (
sender: command_sender, Self {
receiver: update_receiver, sender: command_sender,
} },
update_receiver,
)
} }
} }
@ -199,7 +273,9 @@ pub enum CommandMessage {
SendMessage(JID, String), SendMessage(JID, String),
} }
#[derive(Debug)]
pub enum UpdateMessage { pub enum UpdateMessage {
Error(Error), Error(Error),
Connected,
Roster(Vec<roster::Item>), Roster(Vec<roster::Item>),
} }

View File

@ -1,3 +1,23 @@
fn main() { use std::time::Duration;
println!("Hello, world!");
use luz::{CommandMessage, LuzHandle};
use sqlx::SqlitePool;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let db = SqlitePool::connect("./luz.db").await.unwrap();
let (luz, mut recv) =
LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
tokio::spawn(async move {
while let Some(msg) = recv.recv().await {
println!("{:#?}", msg)
}
});
luz.send(CommandMessage::Connect).await.unwrap();
luz.send(CommandMessage::GetRoster).await.unwrap();
tokio::time::sleep(Duration::from_secs(15)).await;
} }