diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 45a847f..90d321c 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -16,3 +16,4 @@ tokio-util = "0.7.13" tracing = "0.1.41" tracing-subscriber = "0.3.19" uuid = { version = "1.13.1", features = ["v4"] } +thiserror = "2.0.11" diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index 95aae1a..84109bc 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -20,7 +20,7 @@ use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; use crate::{ db::Db, - error::{Error, Reason}, + error::{Error, ReadError, WriteError}, UpdateMessage, }; @@ -36,7 +36,7 @@ pub struct Supervisor { tokio::task::JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, sender: mpsc::Sender, writer_handle: WriteControlHandle, @@ -62,7 +62,7 @@ pub enum State { tokio::task::JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, ), ), } @@ -77,7 +77,7 @@ impl Supervisor { JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, sender: mpsc::Sender, writer_handle: WriteControlHandle, @@ -180,7 +180,7 @@ impl Supervisor { // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. write_state.close(); while let Some(msg) = write_state.recv().await { - let _ = msg.respond_to.send(Err(Reason::LostConnection)); + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error? let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; @@ -227,9 +227,9 @@ impl Supervisor { Err(e) => { // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. write_recv.close(); - let _ = write_msg.respond_to.send(Err(Reason::LostConnection)); + let _ = write_msg.respond_to.send(Err(WriteError::LostConnection)); while let Some(msg) = write_recv.recv().await { - let _ = msg.respond_to.send(Err(Reason::LostConnection)); + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error to send? let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; @@ -278,10 +278,10 @@ impl Supervisor { // if reconnection failure, respond to all current messages with lost connection error. write_receiver.close(); if let Some(msg) = retry_msg { - msg.respond_to.send(Err(Reason::LostConnection)); + msg.respond_to.send(Err(WriteError::LostConnection)); } while let Some(msg) = write_receiver.recv().await { - msg.respond_to.send(Err(Reason::LostConnection)); + msg.respond_to.send(Err(WriteError::LostConnection)); } // TODO: is this the correct error? let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; @@ -342,7 +342,7 @@ impl SupervisorHandle { on_shutdown: oneshot::Sender<()>, jid: Arc>, password: Arc, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, ) -> (WriteHandle, Self) { let (command_sender, command_receiver) = mpsc::channel(20); let (writer_error_sender, writer_error_receiver) = oneshot::channel(); diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 4390e00..0590ce3 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -18,16 +18,13 @@ use uuid::Uuid; use crate::{ chat::{Body, Message}, db::Db, - error::{Error, IqError, PresenceError, Reason, RecvMessageError}, + error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, presence::{Offline, Online, Presence, Show}, roster::Contact, UpdateMessage, }; -use super::{ - write::{WriteHandle, WriteMessage}, - SupervisorCommand, -}; +use super::{write::WriteHandle, SupervisorCommand}; pub struct Read { control_receiver: mpsc::Receiver, @@ -38,7 +35,7 @@ pub struct Read { JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, db: Db, update_sender: mpsc::Sender, @@ -48,7 +45,7 @@ pub struct Read { disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, // TODO: use proper stanza ids - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, } impl Read { @@ -61,7 +58,7 @@ impl Read { JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, db: Db, update_sender: mpsc::Sender, @@ -69,9 +66,9 @@ impl Read { supervisor_control: mpsc::Sender, write_handle: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, ) -> Self { - let (send, recv) = oneshot::channel(); + let (_send, recv) = oneshot::channel(); Self { control_receiver, stream, @@ -162,7 +159,7 @@ impl Read { // when it aborts, must clear iq map no matter what let mut iqs = self.pending_iqs.lock().await; for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(Reason::LostConnection)); + let _ = sender.send(Err(ReadError::LostConnection)); } } } @@ -178,7 +175,7 @@ async fn handle_stanza( db: Db, supervisor_control: mpsc::Sender, write_handle: WriteHandle, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, ) { match stanza { Stanza::Message(stanza_message) => { @@ -207,7 +204,9 @@ async fn handle_stanza( if let Err(e) = result { tracing::error!("messagecreate"); let _ = update_sender - .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MessageHistory(e.into()), + ))) .await; } let _ = update_sender @@ -215,8 +214,8 @@ async fn handle_stanza( .await; } else { let _ = update_sender - .send(UpdateMessage::Error(Error::RecvMessage( - RecvMessageError::MissingFrom, + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MissingFrom, ))) .await; } @@ -229,9 +228,16 @@ async fn handle_stanza( stanza::client::presence::PresenceType::Error => { // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. let _ = update_sender - .send(UpdateMessage::Error(Error::Presence(PresenceError::Error( - Reason::Stanza(presence.errors.first().cloned()), - )))) + .send(UpdateMessage::Error(Error::Presence( + // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display + PresenceError::StanzaError( + presence + .errors + .first() + .cloned() + .expect("error MUST have error"), + ), + ))) .await; } // should not happen (error to server) @@ -329,8 +335,8 @@ async fn handle_stanza( let contact: Contact = item.into(); if let Err(e) = db.upsert_contact(contact.clone()).await { let _ = update_sender - .send(UpdateMessage::Error(Error::CacheUpdate( - e.into(), + .send(UpdateMessage::Error(Error::Roster( + RosterError::Cache(e.into()), ))) .await; } @@ -381,7 +387,7 @@ pub enum ReadControl { JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, ), } @@ -414,13 +420,13 @@ impl ReadControlHandle { JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, db: Db, sender: mpsc::Sender, supervisor_control: mpsc::Sender, jabber_write: WriteHandle, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -451,14 +457,14 @@ impl ReadControlHandle { JoinSet<()>, mpsc::Sender, WriteHandle, - Arc>>>>, + Arc>>>>, )>, db: Db, sender: mpsc::Sender, supervisor_control: mpsc::Sender, jabber_write: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs index 2273fac..3333d38 100644 --- a/luz/src/connection/write.rs +++ b/luz/src/connection/write.rs @@ -7,7 +7,7 @@ use tokio::{ task::JoinHandle, }; -use crate::error::{Error, Reason}; +use crate::error::WriteError; // actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. pub struct Write { @@ -17,9 +17,10 @@ pub struct Write { on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, } +#[derive(Debug)] pub struct WriteMessage { pub stanza: Stanza, - pub respond_to: oneshot::Sender>, + pub respond_to: oneshot::Sender>, } pub enum WriteControl { @@ -84,9 +85,9 @@ impl Write { Err(e) => match &e { peanuts::Error::ReadError(_error) => { // if connection lost during disconnection, just send lost connection error to the write requests - let _ = msg.respond_to.send(Err(Reason::LostConnection)); + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); while let Some(msg) = self.stanza_receiver.recv().await { - let _ = msg.respond_to.send(Err(Reason::LostConnection)); + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); } break; } @@ -140,16 +141,16 @@ pub struct WriteHandle { } impl WriteHandle { - pub async fn write(&self, stanza: Stanza) -> Result<(), Reason> { + pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> { let (send, recv) = oneshot::channel(); self.send(WriteMessage { stanza, respond_to: send, }) .await - .map_err(|_| Reason::ChannelSend)?; + .map_err(|e| WriteError::Actor(e.into()))?; // TODO: timeout - recv.await? + recv.await.map_err(|e| WriteError::Actor(e.into()))? } } diff --git a/luz/src/error.rs b/luz/src/error.rs index f0b956e..efd3937 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -1,138 +1,158 @@ -use stanza::client::Stanza; -use tokio::sync::oneshot::{self}; +use std::sync::Arc; -#[derive(Debug)] +use stanza::client::Stanza; +use thiserror::Error; +use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; + +#[derive(Debug, Error, Clone)] pub enum Error { + #[error("already connected")] AlreadyConnected, // TODO: change to Connecting(ConnectingError) - Connection(ConnectionError), - Presence(PresenceError), - SetStatus(Reason), - Roster(Reason), - Stream(stanza::stream::Error), - SendMessage(Reason), - RecvMessage(RecvMessageError), + #[error("connecting: {0}")] + Connecting(#[from] ConnectionError), + #[error("presence: {0}")] + Presence(#[from] PresenceError), + #[error("set status: {0}")] + SetStatus(#[from] StatusError), + // TODO: have different ones for get/update/set + #[error("roster: {0}")] + Roster(RosterError), + #[error("stream error: {0}")] + Stream(#[from] stanza::stream::Error), + #[error("message send error: {0}")] + MessageSend(MessageSendError), + #[error("message receive error: {0}")] + MessageRecv(MessageRecvError), + #[error("already disconnected")] AlreadyDisconnected, + #[error("lost connection")] LostConnection, - // TODO: should all cache update errors include the context? - CacheUpdate(Reason), + // TODO: Display for Content + #[error("received unrecognized/unsupported content: {0:?}")] UnrecognizedContent(peanuts::element::Content), + #[error("iq receive error: {0}")] Iq(IqError), - Cloned, + #[error("disconnected")] + Disconnected, } -// TODO: this is horrifying, maybe just use tracing to forward error events??? -impl Clone for Error { - fn clone(&self) -> Self { - Error::Cloned - } +#[derive(Debug, Error, Clone)] +pub enum MessageSendError { + #[error("could not add to message history: {0}")] + MessageHistory(#[from] DatabaseError), } -#[derive(Debug)] +#[derive(Debug, Error, Clone)] pub enum PresenceError { - Error(Reason), + #[error("unsupported")] Unsupported, + #[error("missing from")] MissingFrom, + #[error("stanza error: {0}")] + StanzaError(#[from] stanza::client::error::Error), } -#[derive(Debug)] +#[derive(Debug, Error, Clone)] +// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc. pub enum IqError { + #[error("no iq with id matching `{0}`")] NoMatchingId(String), } -#[derive(Debug)] -pub enum RecvMessageError { +#[derive(Debug, Error, Clone)] +pub enum MessageRecvError { + #[error("could not add to message history: {0}")] + MessageHistory(#[from] DatabaseError), + #[error("missing from")] MissingFrom, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Error)] pub enum ConnectionError { - ConnectionFailed(Reason), - RosterRetreival(Reason), - SendPresence(Reason), - NoCachedStatus(Reason), + #[error("connection failed: {0}")] + ConnectionFailed(#[from] jabber::Error), + #[error("failed roster retreival: {0}")] + RosterRetreival(#[from] RosterError), + #[error("failed to send available presence: {0}")] + SendPresence(#[from] WriteError), + #[error("cached status: {0}")] + StatusCacheError(#[from] DatabaseError), } -#[derive(Debug)] -pub struct RosterError(pub Reason); - -impl From for Error { - fn from(e: RosterError) -> Self { - Self::Roster(e.0) - } -} - -impl From for ConnectionError { - fn from(e: RosterError) -> Self { - Self::RosterRetreival(e.0) - } -} - -pub struct StatusError(pub Reason); - -impl From for Error { - fn from(e: StatusError) -> Self { - Error::SetStatus(e.0) - } -} - -impl From for ConnectionError { - fn from(e: StatusError) -> Self { - Self::SendPresence(e.0) - } -} - -#[derive(Debug)] -pub enum Reason { - // TODO: organisastion of error into internal error thing - Timeout, - Stream(stanza::stream_error::Error), - Stanza(Option), - Jabber(jabber::Error), - XML(peanuts::Error), - SQL(sqlx::Error), - // JID(jid::ParseError), - LostConnection, - OneshotRecv(oneshot::error::RecvError), +#[derive(Debug, Error, Clone)] +pub enum RosterError { + #[error("cache: {0}")] + Cache(#[from] DatabaseError), + #[error("stream write: {0}")] + Write(#[from] WriteError), + // TODO: display for stanza, to show as xml, same for read error types. + #[error("unexpected reply: {0:?}")] UnexpectedStanza(Stanza), - Disconnected, - ChannelSend, - Cloned, + #[error("stream read: {0}")] + Read(#[from] ReadError), + #[error("stanza error: {0}")] + StanzaError(#[from] stanza::client::error::Error), } -// TODO: same here -impl Clone for Reason { - fn clone(&self) -> Self { - Reason::Cloned - } -} +#[derive(Debug, Error, Clone)] +#[error("database error: {0}")] +pub struct DatabaseError(Arc); -impl From for Reason { - fn from(e: oneshot::error::RecvError) -> Reason { - Self::OneshotRecv(e) - } -} - -impl From for Reason { - fn from(e: peanuts::Error) -> Self { - Self::XML(e) - } -} - -// impl From for Reason { -// fn from(e: jid::ParseError) -> Self { -// Self::JID(e) -// } -// } - -impl From for Reason { +impl From for DatabaseError { fn from(e: sqlx::Error) -> Self { - Self::SQL(e) + Self(Arc::new(e)) } } -impl From for Reason { - fn from(e: jabber::Error) -> Self { - Self::Jabber(e) +#[derive(Debug, Error, Clone)] +pub enum StatusError { + #[error("cache: {0}")] + Cache(#[from] DatabaseError), + #[error("stream write: {0}")] + Write(#[from] WriteError), +} + +#[derive(Debug, Error, Clone)] +pub enum WriteError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, + // TODO: should this be in writeerror or separate? + #[error("actor: {0}")] + Actor(#[from] ActorError), + #[error("disconnected")] + Disconnected, +} + +// TODO: separate peanuts read and write error? +#[derive(Debug, Error, Clone)] +pub enum ReadError { + #[error("xml: {0}")] + XML(#[from] peanuts::Error), + #[error("lost connection")] + LostConnection, +} + +#[derive(Debug, Error, Clone)] +pub enum ActorError { + #[error("receive timed out")] + Timeout, + #[error("could not send message to actor, channel closed")] + Send, + #[error("could not receive message from actor, channel closed")] + Receive, +} + +impl From> for ActorError { + fn from(_e: SendError) -> Self { + Self::Send + } +} + +impl From for ActorError { + fn from(_e: RecvError) -> Self { + Self::Receive } } diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 293bc08..91bcfdf 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -7,7 +7,7 @@ use std::{ use chat::{Body, Chat, Message}; use connection::{write::WriteMessage, SupervisorSender}; use db::Db; -use error::{ConnectionError, Reason, RosterError, StatusError}; +use error::{ConnectionError, DatabaseError, ReadError, RosterError, StatusError, WriteError}; use futures::{future::Fuse, FutureExt}; use jabber::JID; use presence::{Offline, Online, Presence}; @@ -21,7 +21,7 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::JoinSet, }; -use tracing::{debug, info, Instrument}; +use tracing::{debug, info}; use user::User; use uuid::Uuid; @@ -44,7 +44,7 @@ pub struct Luz { // TODO: use a dyn passwordprovider trait to avoid storing password in memory password: Arc, connected: Arc>>, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, db: Db, sender: mpsc::Sender, /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected @@ -159,8 +159,8 @@ impl Luz { let _ = self .sender .send(UpdateMessage::Error( - Error::Connection( - ConnectionError::NoCachedStatus( + Error::Connecting( + ConnectionError::StatusCacheError( e.into(), ), ), @@ -170,16 +170,20 @@ impl Luz { } }; let (send, recv) = oneshot::channel(); - CommandMessage::SetStatus(online.clone(), send) - .handle_online( - writer.clone(), - supervisor.sender(), - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - self.pending_iqs.clone(), - ) - .await; + CommandMessage::SendPresence( + None, + Presence::Online(online.clone()), + send, + ) + .handle_online( + writer.clone(), + supervisor.sender(), + self.jid.clone(), + self.db.clone(), + self.sender.clone(), + self.pending_iqs.clone(), + ) + .await; let set_status = recv.await; match set_status { Ok(s) => match s { @@ -198,13 +202,13 @@ impl Luz { let _ = self .sender .send(UpdateMessage::Error( - Error::Connection(e.into()), + Error::Connecting(e.into()), )) .await; } }, Err(e) => { - let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await; + let _ = self.sender.send(UpdateMessage::Error(Error::Connecting(ConnectionError::SendPresence(WriteError::Actor(e.into()))))).await; } } } @@ -212,7 +216,7 @@ impl Luz { let _ = self .sender .send(UpdateMessage::Error( - Error::Connection(e.into()), + Error::Connecting(e.into()), )) .await; } @@ -221,8 +225,12 @@ impl Luz { Err(e) => { let _ = self .sender - .send(UpdateMessage::Error(Error::Connection( - ConnectionError::RosterRetreival(e.into()), + .send(UpdateMessage::Error(Error::Connecting( + ConnectionError::RosterRetreival( + RosterError::Write(WriteError::Actor( + e.into(), + )), + ), ))) .await; } @@ -230,7 +238,7 @@ impl Luz { } Err(e) => { let _ = - self.sender.send(UpdateMessage::Error(Error::Connection( + self.sender.send(UpdateMessage::Error(Error::Connecting( ConnectionError::ConnectionFailed(e.into()), ))); } @@ -286,7 +294,7 @@ impl Luz { impl CommandMessage { pub async fn handle_offline( - mut self, + self, jid: Arc>, db: Db, update_sender: mpsc::Sender, @@ -301,7 +309,7 @@ impl CommandMessage { let _ = sender.send(Ok(roster)); } Err(e) => { - let _ = sender.send(Err(RosterError(e.into()))); + let _ = sender.send(Err(RosterError::Cache(e.into()))); } } } @@ -331,45 +339,48 @@ impl CommandMessage { } // TODO: offline queue to modify roster CommandMessage::AddContact(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(RosterError::Write(WriteError::Disconnected))); } CommandMessage::BuddyRequest(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::SubscriptionRequest(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::AcceptBuddyRequest(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::AcceptSubscriptionRequest(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::UnsubscribeFromContact(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::UnsubscribeContact(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::UnfriendContact(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); } CommandMessage::DeleteContact(jid, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(RosterError::Write(WriteError::Disconnected))); } CommandMessage::UpdateContact(jid, contact_update, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(RosterError::Write(WriteError::Disconnected))); } CommandMessage::SetStatus(online, sender) => { let result = db .upsert_cached_status(online) .await - .map_err(|e| StatusError(e.into())); + .map_err(|e| StatusError::Cache(e.into())); sender.send(result); } // TODO: offline message queue CommandMessage::SendMessage(jid, body, sender) => { - sender.send(Err(Reason::Disconnected)); + sender.send(Err(WriteError::Disconnected)); + } + CommandMessage::SendPresence(jid, presence, sender) => { + sender.send(Err(WriteError::Disconnected)); } } } @@ -382,7 +393,7 @@ impl CommandMessage { client_jid: Arc>, db: Db, update_sender: mpsc::Sender, - pending_iqs: Arc>>>>, + pending_iqs: Arc>>>>, ) { match self { CommandMessage::Connect => unreachable!(), @@ -424,11 +435,12 @@ impl CommandMessage { Ok(Ok(())) => info!("roster request sent"), Ok(Err(e)) => { // TODO: log errors if fail to send - let _ = result_sender.send(Err(RosterError(e.into()))); + let _ = result_sender.send(Err(RosterError::Write(e.into()))); return; } Err(e) => { - let _ = result_sender.send(Err(RosterError(e.into()))); + let _ = result_sender + .send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } }; @@ -448,23 +460,41 @@ impl CommandMessage { items.into_iter().map(|item| item.into()).collect(); if let Err(e) = db.replace_cached_roster(contacts.clone()).await { update_sender - .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) + .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( + e.into(), + )))) .await; }; result_sender.send(Ok(contacts)); return; } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + result_sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } s => { - result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s)))); + result_sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Ok(Err(e)) => { - result_sender.send(Err(RosterError(e.into()))); + result_sender.send(Err(RosterError::Read(e))); return; } Err(e) => { - result_sender.send(Err(RosterError(e.into()))); + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } @@ -525,8 +555,8 @@ impl CommandMessage { } // TODO: write_handle send helper function let result = write_handle.write(set_stanza).await; - if let Err(_) = result { - sender.send(result); + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; @@ -545,24 +575,24 @@ impl CommandMessage { sender.send(Ok(())); return; } - Stanza::Iq(Iq { + ref s @ Stanza::Iq(Iq { from: _, - id, + ref id, to: _, r#type, lang: _, query: _, - errors, - }) if id == iq_id && r#type == IqType::Error => { + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { - sender.send(Err(Reason::Stanza(Some(error.clone())))); + sender.send(Err(RosterError::StanzaError(error.clone()))); } else { - sender.send(Err(Reason::Stanza(None))); + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { - sender.send(Err(Reason::UnexpectedStanza(s))); + sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, @@ -572,7 +602,7 @@ impl CommandMessage { } }, Err(e) => { - sender.send(Err(e.into())); + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } @@ -770,8 +800,8 @@ impl CommandMessage { pending_iqs.lock().await.insert(iq_id.clone(), send); } let result = write_handle.write(set_stanza).await; - if let Err(_) = result { - sender.send(result); + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; @@ -790,24 +820,24 @@ impl CommandMessage { sender.send(Ok(())); return; } - Stanza::Iq(Iq { + ref s @ Stanza::Iq(Iq { from: _, - id, + ref id, to: _, r#type, lang: _, query: _, - errors, - }) if id == iq_id && r#type == IqType::Error => { + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { - sender.send(Err(Reason::Stanza(Some(error.clone())))); + sender.send(Err(RosterError::StanzaError(error.clone()))); } else { - sender.send(Err(Reason::Stanza(None))); + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { - sender.send(Err(Reason::UnexpectedStanza(s))); + sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, @@ -817,7 +847,7 @@ impl CommandMessage { } }, Err(e) => { - sender.send(Err(e.into())); + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } @@ -858,8 +888,8 @@ impl CommandMessage { pending_iqs.lock().await.insert(iq_id.clone(), send); } let result = write_handle.write(set_stanza).await; - if let Err(_) = result { - sender.send(result); + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; @@ -878,24 +908,24 @@ impl CommandMessage { sender.send(Ok(())); return; } - Stanza::Iq(Iq { + ref s @ Stanza::Iq(Iq { from: _, - id, + ref id, to: _, r#type, lang: _, query: _, - errors, - }) if id == iq_id && r#type == IqType::Error => { + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { - sender.send(Err(Reason::Stanza(Some(error.clone())))); + sender.send(Err(RosterError::StanzaError(error.clone()))); } else { - sender.send(Err(Reason::Stanza(None))); + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { - sender.send(Err(Reason::UnexpectedStanza(s))); + sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, @@ -905,7 +935,7 @@ impl CommandMessage { } }, Err(e) => { - sender.send(Err(e.into())); + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } @@ -914,13 +944,16 @@ impl CommandMessage { let result = db.upsert_cached_status(online.clone()).await; if let Err(e) = result { let _ = update_sender - .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) + .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( + e.into(), + )))) .await; } let result = write_handle .write(Stanza::Presence(online.into())) .await - .map_err(|e| StatusError(e)); + .map_err(|e| StatusError::Write(e)); + // .map_err(|e| StatusError::Write(e)); let _ = sender.send(result); } // TODO: offline message queue @@ -956,7 +989,9 @@ impl CommandMessage { }; if let Err(e) = db.create_message(message, jid).await.map_err(|e| e.into()) { - let _ = update_sender.send(UpdateMessage::Error(Error::CacheUpdate(e))); + let _ = update_sender.send(UpdateMessage::Error(Error::MessageSend( + error::MessageSendError::MessageHistory(e), + ))); } let _ = sender.send(Ok(())); } @@ -965,6 +1000,15 @@ impl CommandMessage { } } } + CommandMessage::SendPresence(jid, presence, sender) => { + let mut presence: stanza::client::presence::Presence = presence.into(); + if let Some(jid) = jid { + presence.to = Some(jid); + }; + let result = write_handle.write(Stanza::Presence(presence)).await; + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } } } } @@ -999,11 +1043,12 @@ impl DerefMut for LuzHandle { } impl LuzHandle { + // TODO: database creation separate pub async fn new( jid: JID, password: String, db: &str, - ) -> Result<(Self, mpsc::Receiver), Reason> { + ) -> Result<(Self, mpsc::Receiver), DatabaseError> { let db = SqlitePool::connect(db).await?; let (command_sender, command_receiver) = mpsc::channel(20); let (update_sender, update_receiver) = mpsc::channel(20); @@ -1030,8 +1075,59 @@ impl LuzHandle { update_receiver, )) } + + pub async fn connect(&self) { + self.send(CommandMessage::Connect).await; + } + + pub async fn disconnect(&self, offline: Offline) { + self.send(CommandMessage::Disconnect(offline)).await; + } + + // pub async fn get_roster(&self) -> Result, RosterError> { + // let (send, recv) = oneshot::channel(); + // self.send(CommandMessage::GetRoster(send)).await.map_err(|e| RosterError::)?; + // Ok(recv.await?) + // } + + // pub async fn get_chats(&self) -> Result, Error> {} + + // pub async fn get_chat(&self, jid: JID) -> Result {} + + // pub async fn get_messages(&self, jid: JID) -> Result, Error> {} + + // pub async fn delete_chat(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn delete_message(&self, id: Uuid) -> Result<(), Error> {} + + // pub async fn get_user(&self, jid: JID) -> Result {} + + // pub async fn add_contact(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn buddy_request(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn subscription_request(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn accept_subscription_request(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn unfriend_contact(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn delete_contact(&self, jid: JID) -> Result<(), Error> {} + + // pub async fn update_contact(&self, jid: JID, update: ContactUpdate) -> Result<(), Error> {} + + // pub async fn set_status(&self, online: Online) -> Result<(), Error> {} + + // pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), Error> {} } +// TODO: generate methods for each with a macro pub enum CommandMessage { // TODO: login invisible xep-0186 /// connect to XMPP chat server. gets roster and publishes initial presence. @@ -1042,46 +1138,51 @@ pub enum CommandMessage { GetRoster(oneshot::Sender, RosterError>>), /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) // TODO: paging and filtering - GetChats(oneshot::Sender, Reason>>), + GetChats(oneshot::Sender, DatabaseError>>), /// get a specific chat by jid - GetChat(JID, oneshot::Sender>), + GetChat(JID, oneshot::Sender>), /// get message history for chat (does appropriate mam things) // TODO: paging and filtering - GetMessages(JID, oneshot::Sender, Reason>>), + GetMessages(JID, oneshot::Sender, DatabaseError>>), /// delete a chat from your chat history, along with all the corresponding messages - DeleteChat(JID, oneshot::Sender>), + DeleteChat(JID, oneshot::Sender>), /// delete a message from your chat history - DeleteMessage(Uuid, oneshot::Sender>), + DeleteMessage(Uuid, oneshot::Sender>), /// get a user from your users database - GetUser(JID, oneshot::Sender>), + GetUser(JID, oneshot::Sender>), /// add a contact to your roster, with a status of none, no subscriptions. - // TODO: for all these, consider returning with oneshot::Sender> - AddContact(JID, oneshot::Sender>), + AddContact(JID, oneshot::Sender>), /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. - BuddyRequest(JID, oneshot::Sender>), + BuddyRequest(JID, oneshot::Sender>), /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. - SubscriptionRequest(JID, oneshot::Sender>), + SubscriptionRequest(JID, oneshot::Sender>), /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. - AcceptBuddyRequest(JID, oneshot::Sender>), + AcceptBuddyRequest(JID, oneshot::Sender>), /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. - AcceptSubscriptionRequest(JID, oneshot::Sender>), + AcceptSubscriptionRequest(JID, oneshot::Sender>), /// unsubscribe to a contact, but don't remove their subscription. - UnsubscribeFromContact(JID, oneshot::Sender>), + UnsubscribeFromContact(JID, oneshot::Sender>), /// stop a contact from being subscribed, but stay subscribed to the contact. - UnsubscribeContact(JID, oneshot::Sender>), + UnsubscribeContact(JID, oneshot::Sender>), /// remove subscriptions to and from contact, but keep in roster. - UnfriendContact(JID, oneshot::Sender>), + UnfriendContact(JID, oneshot::Sender>), /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. - DeleteContact(JID, oneshot::Sender>), + DeleteContact(JID, oneshot::Sender>), /// update contact. contact details will be overwritten with the contents of the contactupdate struct. - UpdateContact(JID, ContactUpdate, oneshot::Sender>), + UpdateContact(JID, ContactUpdate, oneshot::Sender>), /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. SetStatus(Online, oneshot::Sender>), + /// send presence stanza + SendPresence( + Option, + Presence, + oneshot::Sender>, + ), /// send a directed presence (usually to a non-contact). // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a /// chatroom). if disconnected, will be cached so when client connects, message will be sent. - SendMessage(JID, Body, oneshot::Sender>), + SendMessage(JID, Body, oneshot::Sender>), } #[derive(Debug, Clone)] diff --git a/luz/src/presence.rs b/luz/src/presence.rs index 31a0d30..4bc1993 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -110,3 +110,12 @@ impl From for stanza::client::presence::Presence { } } } + +impl From for stanza::client::presence::Presence { + fn from(value: Presence) -> Self { + match value { + Presence::Online(online) => online.into(), + Presence::Offline(offline) => offline.into(), + } + } +}