From ad0054ea56747abc6454aa81f20b9c0653aa0da1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?cel=20=F0=9F=8C=B8?= Date: Tue, 11 Feb 2025 22:20:16 +0000 Subject: [PATCH] actors complete ? --- luz/.gitignore | 1 + luz/Cargo.toml | 2 + luz/src/connection/read.rs | 30 +++++++----- luz/src/connection/write.rs | 2 +- luz/src/error.rs | 1 + luz/src/lib.rs | 95 ++++++++++++++++++++++++++++++++----- luz/src/main.rs | 24 +++++++++- 7 files changed, 129 insertions(+), 26 deletions(-) create mode 100644 luz/.gitignore diff --git a/luz/.gitignore b/luz/.gitignore new file mode 100644 index 0000000..1a2cec2 --- /dev/null +++ b/luz/.gitignore @@ -0,0 +1 @@ +luz.db diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 6b2066b..ebff7d9 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -13,3 +13,5 @@ stanza = { version = "0.1.0", path = "../stanza" } tokio = "1.42.0" tokio-stream = "0.1.17" tokio-util = "0.7.13" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index c1e37b4..3c61780 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -80,10 +80,15 @@ impl Read { } async fn run(mut self) { + println!("started read thread"); + // let stanza = self.stream.read::().await; + // println!("{:?}", stanza); loop { tokio::select! { // if still haven't received the end tag in time, just kill itself - _ = &mut self.disconnect_timedout => { + // TODO: is this okay??? what if notification thread dies? + Ok(()) = &mut self.disconnect_timedout => { + println!("disconnect_timedout"); break; } Some(msg) = self.control_receiver.recv() => { @@ -99,17 +104,19 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs)); + let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); break; }, }; }, - stanza = self.stream.read::() => { - match stanza { + s = self.stream.read::() => { + println!("read stanza"); + match s { Ok(s) => { self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); }, Err(e) => { + println!("error: {:?}", e); // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true // match e { // peanuts::Error::ReadError(error) => todo!(), @@ -134,7 +141,7 @@ impl Read { break; } else { // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around - let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs)); + let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); } break; }, @@ -142,11 +149,12 @@ impl Read { }, else => break } - // when it aborts, must clear iq map no matter what - let mut iqs = self.pending_iqs.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(Error::LostConnection)); - } + } + println!("stopping read thread"); + // when it aborts, must clear iq map no matter what + let mut iqs = self.pending_iqs.lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(Error::LostConnection)); } } } @@ -163,7 +171,7 @@ async fn handle_stanza( supervisor_control: mpsc::Sender, write_handle: WriteHandle, ) { - todo!() + println!("{:?}", stanza) } pub enum ReadControl { diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs index 09638a8..18dba5c 100644 --- a/luz/src/connection/write.rs +++ b/luz/src/connection/write.rs @@ -18,7 +18,7 @@ pub struct Write { } pub struct WriteMessage { - stanza: Stanza, + pub stanza: Stanza, pub respond_to: oneshot::Sender>, } diff --git a/luz/src/error.rs b/luz/src/error.rs index 2809e8d..6c3fb5d 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -1,3 +1,4 @@ +#[derive(Debug)] pub enum Error { AlreadyConnected, Jabber(jabber::Error), diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 9d8ea66..4d36a81 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -1,9 +1,19 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + sync::Arc, +}; -use connection::SupervisorSender; +use connection::{write::WriteMessage, SupervisorSender}; use jabber::JID; use sqlx::SqlitePool; -use stanza::{client::Stanza, roster}; +use stanza::{ + client::{ + iq::{self, Iq, IqType}, + Stanza, + }, + roster::{self, Query}, +}; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::JoinSet, @@ -58,6 +68,7 @@ impl Luz { async fn run(mut self) { loop { tokio::select! { + // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy _ = &mut self.connection_supervisor_shutdown => { *self.connected.lock().await = None } @@ -66,7 +77,8 @@ impl Luz { // TODO: dispatch commands separate tasks match msg { CommandMessage::Connect => { - match self.connected.lock().await.as_ref() { + let mut connection_lock = self.connected.lock().await; + match connection_lock.as_ref() { Some(_) => { self.sender .send(UpdateMessage::Error(Error::AlreadyConnected)) @@ -92,7 +104,10 @@ impl Luz { self.pending_iqs.clone(), ); self.connection_supervisor_shutdown = shutdown_recv; - *self.connected.lock().await = Some((writer, supervisor)); + *connection_lock = Some((writer, supervisor)); + self.sender + .send(UpdateMessage::Connected) + .await; } Err(e) => { self.sender.send(UpdateMessage::Error(e.into())); @@ -161,18 +176,70 @@ impl CommandMessage { sender: mpsc::Sender, pending_iqs: Arc>>>>, ) { - todo!() + match self { + CommandMessage::Connect => unreachable!(), + CommandMessage::Disconnect => unreachable!(), + CommandMessage::GetRoster => { + // TODO: jid resource should probably be stored within the connection + let owned_jid: JID; + { + owned_jid = jid.lock().await.clone(); + } + let stanza = Stanza::Iq(Iq { + from: Some(owned_jid), + id: "getting-roster".to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + let _ = write_handle + .send(WriteMessage { + stanza, + respond_to: send, + }) + .await; + match recv.await { + Ok(Ok(())) => println!("roster request sent"), + e => println!("error: {:?}", e), + }; + } + CommandMessage::SendMessage(jid, _) => todo!(), + } } } // TODO: separate sender and receiver, store handle to Luz process to ensure dropping +// #[derive(Clone)] pub struct LuzHandle { sender: mpsc::Sender, - receiver: mpsc::Receiver, +} + +impl Deref for LuzHandle { + type Target = mpsc::Sender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for LuzHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } } impl LuzHandle { - pub fn new(jid: JID, password: String, db: SqlitePool) -> Self { + pub fn new( + jid: JID, + password: String, + db: SqlitePool, + ) -> (Self, mpsc::Receiver) { let (command_sender, command_receiver) = mpsc::channel(20); let (update_sender, update_receiver) = mpsc::channel(20); // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) @@ -189,10 +256,12 @@ impl LuzHandle { ); tokio::spawn(async move { actor.run().await }); - Self { - sender: command_sender, - receiver: update_receiver, - } + ( + Self { + sender: command_sender, + }, + update_receiver, + ) } } @@ -204,7 +273,9 @@ pub enum CommandMessage { SendMessage(JID, String), } +#[derive(Debug)] pub enum UpdateMessage { Error(Error), + Connected, Roster(Vec), } diff --git a/luz/src/main.rs b/luz/src/main.rs index e7a11a9..7b3815f 100644 --- a/luz/src/main.rs +++ b/luz/src/main.rs @@ -1,3 +1,23 @@ -fn main() { - println!("Hello, world!"); +use std::time::Duration; + +use luz::{CommandMessage, LuzHandle}; +use sqlx::SqlitePool; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let db = SqlitePool::connect("./luz.db").await.unwrap(); + let (luz, mut recv) = + LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db); + + tokio::spawn(async move { + while let Some(msg) = recv.recv().await { + println!("{:#?}", msg) + } + }); + + luz.send(CommandMessage::Connect).await.unwrap(); + luz.send(CommandMessage::GetRoster).await.unwrap(); + tokio::time::sleep(Duration::from_secs(15)).await; }