diff --git a/luz/src/error.rs b/luz/src/error.rs index efd3937..74239e8 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -2,7 +2,10 @@ use std::sync::Arc; use stanza::client::Stanza; use thiserror::Error; -use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; +use tokio::{ + sync::{mpsc::error::SendError, oneshot::error::RecvError}, + time::error::Elapsed, +}; #[derive(Debug, Error, Clone)] pub enum Error { @@ -37,6 +40,14 @@ pub enum Error { Disconnected, } +#[derive(Debug, Error, Clone)] +pub enum CommandError { + #[error("actor: {0}")] + Actor(ActorError), + #[error("{0}")] + Error(#[from] T), +} + #[derive(Debug, Error, Clone)] pub enum MessageSendError { #[error("could not add to message history: {0}")] @@ -145,6 +156,12 @@ pub enum ActorError { Receive, } +impl From for ActorError { + fn from(_e: Elapsed) -> Self { + Self::Timeout + } +} + impl From> for ActorError { fn from(_e: SendError) -> Self { Self::Send diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 91bcfdf..399fec2 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -2,12 +2,16 @@ use std::{ collections::HashMap, ops::{Deref, DerefMut}, sync::Arc, + time::Duration, }; use chat::{Body, Chat, Message}; use connection::{write::WriteMessage, SupervisorSender}; use db::Db; -use error::{ConnectionError, DatabaseError, ReadError, RosterError, StatusError, WriteError}; +use error::{ + ActorError, CommandError, ConnectionError, DatabaseError, ReadError, RosterError, StatusError, + WriteError, +}; use futures::{future::Fuse, FutureExt}; use jabber::JID; use presence::{Offline, Online, Presence}; @@ -20,6 +24,7 @@ use stanza::client::{ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::JoinSet, + time::timeout, }; use tracing::{debug, info}; use user::User; @@ -1018,12 +1023,14 @@ impl CommandMessage { #[derive(Debug)] pub struct LuzHandle { sender: mpsc::Sender, + timeout: Duration, } impl Clone for LuzHandle { fn clone(&self) -> Self { Self { sender: self.sender.clone(), + timeout: self.timeout, } } } @@ -1071,60 +1078,260 @@ impl LuzHandle { Ok(( Self { sender: command_sender, + // TODO: configure timeout + timeout: Duration::from_secs(10), }, update_receiver, )) } - pub async fn connect(&self) { - self.send(CommandMessage::Connect).await; + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(CommandMessage::Connect).await?; + Ok(()) } - pub async fn disconnect(&self, offline: Offline) { - self.send(CommandMessage::Disconnect(offline)).await; + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(CommandMessage::Disconnect(offline)).await?; + Ok(()) } - // pub async fn get_roster(&self) -> Result, RosterError> { - // let (send, recv) = oneshot::channel(); - // self.send(CommandMessage::GetRoster(send)).await.map_err(|e| RosterError::)?; - // Ok(recv.await?) - // } + pub async fn get_roster(&self) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::GetRoster(send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let roster = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(roster) + } - // pub async fn get_chats(&self) -> Result, Error> {} + pub async fn get_chats(&self) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::GetChats(send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(chats) + } - // pub async fn get_chat(&self, jid: JID) -> Result {} + pub async fn get_chat(&self, jid: JID) -> Result> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::GetChat(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let chat = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(chat) + } - // pub async fn get_messages(&self, jid: JID) -> Result, Error> {} + pub async fn get_messages( + &self, + jid: JID, + ) -> Result, CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::GetMessages(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let messages = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(messages) + } - // pub async fn delete_chat(&self, jid: JID) -> Result<(), Error> {} + pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::DeleteChat(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn delete_message(&self, id: Uuid) -> Result<(), Error> {} + pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::DeleteMessage(id, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn get_user(&self, jid: JID) -> Result {} + pub async fn get_user(&self, jid: JID) -> Result> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::GetUser(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn add_contact(&self, jid: JID) -> Result<(), Error> {} + pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::AddContact(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn buddy_request(&self, jid: JID) -> Result<(), Error> {} + pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::BuddyRequest(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn subscription_request(&self, jid: JID) -> Result<(), Error> {} + pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::SubscriptionRequest(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), Error> {} + pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::AcceptBuddyRequest(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn accept_subscription_request(&self, jid: JID) -> Result<(), Error> {} + pub async fn accept_subscription_request( + &self, + jid: JID, + ) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::AcceptSubscriptionRequest(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), Error> {} + pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::UnsubscribeFromContact(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), Error> {} + pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::UnsubscribeContact(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn unfriend_contact(&self, jid: JID) -> Result<(), Error> {} + pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::UnfriendContact(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn delete_contact(&self, jid: JID) -> Result<(), Error> {} + pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::DeleteContact(jid, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn update_contact(&self, jid: JID, update: ContactUpdate) -> Result<(), Error> {} + pub async fn update_contact( + &self, + jid: JID, + update: ContactUpdate, + ) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::UpdateContact(jid, update, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn set_status(&self, online: Online) -> Result<(), Error> {} + pub async fn set_status(&self, online: Online) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::SetStatus(online, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } - // pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), Error> {} + pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError> { + let (send, recv) = oneshot::channel(); + self.send(CommandMessage::SendMessage(jid, body, send)) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::::into(e)))? + .map_err(|e| CommandError::Actor(Into::::into(e)))??; + Ok(result) + } } // TODO: generate methods for each with a macro