refactor(luz): error types

This commit is contained in:
cel 🌸 2025-02-25 23:29:44 +00:00
parent d797061786
commit 4dac2dbe1d
7 changed files with 375 additions and 237 deletions

View File

@ -16,3 +16,4 @@ tokio-util = "0.7.13"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
uuid = { version = "1.13.1", features = ["v4"] }
thiserror = "2.0.11"

View File

@ -20,7 +20,7 @@ use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
use crate::{
db::Db,
error::{Error, Reason},
error::{Error, ReadError, WriteError},
UpdateMessage,
};
@ -36,7 +36,7 @@ pub struct Supervisor {
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
@ -62,7 +62,7 @@ pub enum State {
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
),
),
}
@ -77,7 +77,7 @@ impl Supervisor {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
@ -180,7 +180,7 @@ impl Supervisor {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_state.close();
while let Some(msg) = write_state.recv().await {
let _ = msg.respond_to.send(Err(Reason::LostConnection));
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
@ -227,9 +227,9 @@ impl Supervisor {
Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_recv.close();
let _ = write_msg.respond_to.send(Err(Reason::LostConnection));
let _ = write_msg.respond_to.send(Err(WriteError::LostConnection));
while let Some(msg) = write_recv.recv().await {
let _ = msg.respond_to.send(Err(Reason::LostConnection));
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error to send?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
@ -278,10 +278,10 @@ impl Supervisor {
// if reconnection failure, respond to all current messages with lost connection error.
write_receiver.close();
if let Some(msg) = retry_msg {
msg.respond_to.send(Err(Reason::LostConnection));
msg.respond_to.send(Err(WriteError::LostConnection));
}
while let Some(msg) = write_receiver.recv().await {
msg.respond_to.send(Err(Reason::LostConnection));
msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
@ -342,7 +342,7 @@ impl SupervisorHandle {
on_shutdown: oneshot::Sender<()>,
jid: Arc<Mutex<JID>>,
password: Arc<String>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();

View File

@ -18,16 +18,13 @@ use uuid::Uuid;
use crate::{
chat::{Body, Message},
db::Db,
error::{Error, IqError, PresenceError, Reason, RecvMessageError},
error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
presence::{Offline, Online, Presence, Show},
roster::Contact,
UpdateMessage,
};
use super::{
write::{WriteHandle, WriteMessage},
SupervisorCommand,
};
use super::{write::WriteHandle, SupervisorCommand};
pub struct Read {
control_receiver: mpsc::Receiver<ReadControl>,
@ -38,7 +35,7 @@ pub struct Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@ -48,7 +45,7 @@ pub struct Read {
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
// TODO: use proper stanza ids
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
}
impl Read {
@ -61,7 +58,7 @@ impl Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@ -69,9 +66,9 @@ impl Read {
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> Self {
let (send, recv) = oneshot::channel();
let (_send, recv) = oneshot::channel();
Self {
control_receiver,
stream,
@ -162,7 +159,7 @@ impl Read {
// when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(Reason::LostConnection));
let _ = sender.send(Err(ReadError::LostConnection));
}
}
}
@ -178,7 +175,7 @@ async fn handle_stanza(
db: Db,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) {
match stanza {
Stanza::Message(stanza_message) => {
@ -207,7 +204,9 @@ async fn handle_stanza(
if let Err(e) = result {
tracing::error!("messagecreate");
let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
.send(UpdateMessage::Error(Error::MessageRecv(
MessageRecvError::MessageHistory(e.into()),
)))
.await;
}
let _ = update_sender
@ -215,8 +214,8 @@ async fn handle_stanza(
.await;
} else {
let _ = update_sender
.send(UpdateMessage::Error(Error::RecvMessage(
RecvMessageError::MissingFrom,
.send(UpdateMessage::Error(Error::MessageRecv(
MessageRecvError::MissingFrom,
)))
.await;
}
@ -229,9 +228,16 @@ async fn handle_stanza(
stanza::client::presence::PresenceType::Error => {
// TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
let _ = update_sender
.send(UpdateMessage::Error(Error::Presence(PresenceError::Error(
Reason::Stanza(presence.errors.first().cloned()),
))))
.send(UpdateMessage::Error(Error::Presence(
// TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
PresenceError::StanzaError(
presence
.errors
.first()
.cloned()
.expect("error MUST have error"),
),
)))
.await;
}
// should not happen (error to server)
@ -329,8 +335,8 @@ async fn handle_stanza(
let contact: Contact = item.into();
if let Err(e) = db.upsert_contact(contact.clone()).await {
let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(
e.into(),
.send(UpdateMessage::Error(Error::Roster(
RosterError::Cache(e.into()),
)))
.await;
}
@ -381,7 +387,7 @@ pub enum ReadControl {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
),
}
@ -414,13 +420,13 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@ -451,14 +457,14 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
)>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
tasks: JoinSet<()>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);

View File

@ -7,7 +7,7 @@ use tokio::{
task::JoinHandle,
};
use crate::error::{Error, Reason};
use crate::error::WriteError;
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
@ -17,9 +17,10 @@ pub struct Write {
on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
}
#[derive(Debug)]
pub struct WriteMessage {
pub stanza: Stanza,
pub respond_to: oneshot::Sender<Result<(), Reason>>,
pub respond_to: oneshot::Sender<Result<(), WriteError>>,
}
pub enum WriteControl {
@ -84,9 +85,9 @@ impl Write {
Err(e) => match &e {
peanuts::Error::ReadError(_error) => {
// if connection lost during disconnection, just send lost connection error to the write requests
let _ = msg.respond_to.send(Err(Reason::LostConnection));
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
while let Some(msg) = self.stanza_receiver.recv().await {
let _ = msg.respond_to.send(Err(Reason::LostConnection));
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
break;
}
@ -140,16 +141,16 @@ pub struct WriteHandle {
}
impl WriteHandle {
pub async fn write(&self, stanza: Stanza) -> Result<(), Reason> {
pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> {
let (send, recv) = oneshot::channel();
self.send(WriteMessage {
stanza,
respond_to: send,
})
.await
.map_err(|_| Reason::ChannelSend)?;
.map_err(|e| WriteError::Actor(e.into()))?;
// TODO: timeout
recv.await?
recv.await.map_err(|e| WriteError::Actor(e.into()))?
}
}

View File

@ -1,138 +1,158 @@
use stanza::client::Stanza;
use tokio::sync::oneshot::{self};
use std::sync::Arc;
#[derive(Debug)]
use stanza::client::Stanza;
use thiserror::Error;
use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError};
#[derive(Debug, Error, Clone)]
pub enum Error {
#[error("already connected")]
AlreadyConnected,
// TODO: change to Connecting(ConnectingError)
Connection(ConnectionError),
Presence(PresenceError),
SetStatus(Reason),
Roster(Reason),
Stream(stanza::stream::Error),
SendMessage(Reason),
RecvMessage(RecvMessageError),
#[error("connecting: {0}")]
Connecting(#[from] ConnectionError),
#[error("presence: {0}")]
Presence(#[from] PresenceError),
#[error("set status: {0}")]
SetStatus(#[from] StatusError),
// TODO: have different ones for get/update/set
#[error("roster: {0}")]
Roster(RosterError),
#[error("stream error: {0}")]
Stream(#[from] stanza::stream::Error),
#[error("message send error: {0}")]
MessageSend(MessageSendError),
#[error("message receive error: {0}")]
MessageRecv(MessageRecvError),
#[error("already disconnected")]
AlreadyDisconnected,
#[error("lost connection")]
LostConnection,
// TODO: should all cache update errors include the context?
CacheUpdate(Reason),
// TODO: Display for Content
#[error("received unrecognized/unsupported content: {0:?}")]
UnrecognizedContent(peanuts::element::Content),
#[error("iq receive error: {0}")]
Iq(IqError),
Cloned,
#[error("disconnected")]
Disconnected,
}
// TODO: this is horrifying, maybe just use tracing to forward error events???
impl Clone for Error {
fn clone(&self) -> Self {
Error::Cloned
}
#[derive(Debug, Error, Clone)]
pub enum MessageSendError {
#[error("could not add to message history: {0}")]
MessageHistory(#[from] DatabaseError),
}
#[derive(Debug)]
#[derive(Debug, Error, Clone)]
pub enum PresenceError {
Error(Reason),
#[error("unsupported")]
Unsupported,
#[error("missing from")]
MissingFrom,
#[error("stanza error: {0}")]
StanzaError(#[from] stanza::client::error::Error),
}
#[derive(Debug)]
#[derive(Debug, Error, Clone)]
// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc.
pub enum IqError {
#[error("no iq with id matching `{0}`")]
NoMatchingId(String),
}
#[derive(Debug)]
pub enum RecvMessageError {
#[derive(Debug, Error, Clone)]
pub enum MessageRecvError {
#[error("could not add to message history: {0}")]
MessageHistory(#[from] DatabaseError),
#[error("missing from")]
MissingFrom,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Error)]
pub enum ConnectionError {
ConnectionFailed(Reason),
RosterRetreival(Reason),
SendPresence(Reason),
NoCachedStatus(Reason),
#[error("connection failed: {0}")]
ConnectionFailed(#[from] jabber::Error),
#[error("failed roster retreival: {0}")]
RosterRetreival(#[from] RosterError),
#[error("failed to send available presence: {0}")]
SendPresence(#[from] WriteError),
#[error("cached status: {0}")]
StatusCacheError(#[from] DatabaseError),
}
#[derive(Debug)]
pub struct RosterError(pub Reason);
impl From<RosterError> for Error {
fn from(e: RosterError) -> Self {
Self::Roster(e.0)
}
}
impl From<RosterError> for ConnectionError {
fn from(e: RosterError) -> Self {
Self::RosterRetreival(e.0)
}
}
pub struct StatusError(pub Reason);
impl From<StatusError> for Error {
fn from(e: StatusError) -> Self {
Error::SetStatus(e.0)
}
}
impl From<StatusError> for ConnectionError {
fn from(e: StatusError) -> Self {
Self::SendPresence(e.0)
}
}
#[derive(Debug)]
pub enum Reason {
// TODO: organisastion of error into internal error thing
Timeout,
Stream(stanza::stream_error::Error),
Stanza(Option<stanza::client::error::Error>),
Jabber(jabber::Error),
XML(peanuts::Error),
SQL(sqlx::Error),
// JID(jid::ParseError),
LostConnection,
OneshotRecv(oneshot::error::RecvError),
#[derive(Debug, Error, Clone)]
pub enum RosterError {
#[error("cache: {0}")]
Cache(#[from] DatabaseError),
#[error("stream write: {0}")]
Write(#[from] WriteError),
// TODO: display for stanza, to show as xml, same for read error types.
#[error("unexpected reply: {0:?}")]
UnexpectedStanza(Stanza),
Disconnected,
ChannelSend,
Cloned,
#[error("stream read: {0}")]
Read(#[from] ReadError),
#[error("stanza error: {0}")]
StanzaError(#[from] stanza::client::error::Error),
}
// TODO: same here
impl Clone for Reason {
fn clone(&self) -> Self {
Reason::Cloned
}
}
#[derive(Debug, Error, Clone)]
#[error("database error: {0}")]
pub struct DatabaseError(Arc<sqlx::Error>);
impl From<oneshot::error::RecvError> for Reason {
fn from(e: oneshot::error::RecvError) -> Reason {
Self::OneshotRecv(e)
}
}
impl From<peanuts::Error> for Reason {
fn from(e: peanuts::Error) -> Self {
Self::XML(e)
}
}
// impl From<jid::ParseError> for Reason {
// fn from(e: jid::ParseError) -> Self {
// Self::JID(e)
// }
// }
impl From<sqlx::Error> for Reason {
impl From<sqlx::Error> for DatabaseError {
fn from(e: sqlx::Error) -> Self {
Self::SQL(e)
Self(Arc::new(e))
}
}
impl From<jabber::Error> for Reason {
fn from(e: jabber::Error) -> Self {
Self::Jabber(e)
#[derive(Debug, Error, Clone)]
pub enum StatusError {
#[error("cache: {0}")]
Cache(#[from] DatabaseError),
#[error("stream write: {0}")]
Write(#[from] WriteError),
}
#[derive(Debug, Error, Clone)]
pub enum WriteError {
#[error("xml: {0}")]
XML(#[from] peanuts::Error),
#[error("lost connection")]
LostConnection,
// TODO: should this be in writeerror or separate?
#[error("actor: {0}")]
Actor(#[from] ActorError),
#[error("disconnected")]
Disconnected,
}
// TODO: separate peanuts read and write error?
#[derive(Debug, Error, Clone)]
pub enum ReadError {
#[error("xml: {0}")]
XML(#[from] peanuts::Error),
#[error("lost connection")]
LostConnection,
}
#[derive(Debug, Error, Clone)]
pub enum ActorError {
#[error("receive timed out")]
Timeout,
#[error("could not send message to actor, channel closed")]
Send,
#[error("could not receive message from actor, channel closed")]
Receive,
}
impl<T> From<SendError<T>> for ActorError {
fn from(_e: SendError<T>) -> Self {
Self::Send
}
}
impl From<RecvError> for ActorError {
fn from(_e: RecvError) -> Self {
Self::Receive
}
}

View File

@ -7,7 +7,7 @@ use std::{
use chat::{Body, Chat, Message};
use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
use error::{ConnectionError, Reason, RosterError, StatusError};
use error::{ConnectionError, DatabaseError, ReadError, RosterError, StatusError, WriteError};
use futures::{future::Fuse, FutureExt};
use jabber::JID;
use presence::{Offline, Online, Presence};
@ -21,7 +21,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
};
use tracing::{debug, info, Instrument};
use tracing::{debug, info};
use user::User;
use uuid::Uuid;
@ -44,7 +44,7 @@ pub struct Luz {
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
@ -159,8 +159,8 @@ impl Luz {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connection(
ConnectionError::NoCachedStatus(
Error::Connecting(
ConnectionError::StatusCacheError(
e.into(),
),
),
@ -170,16 +170,20 @@ impl Luz {
}
};
let (send, recv) = oneshot::channel();
CommandMessage::SetStatus(online.clone(), send)
.handle_online(
writer.clone(),
supervisor.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)
.await;
CommandMessage::SendPresence(
None,
Presence::Online(online.clone()),
send,
)
.handle_online(
writer.clone(),
supervisor.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)
.await;
let set_status = recv.await;
match set_status {
Ok(s) => match s {
@ -198,13 +202,13 @@ impl Luz {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connection(e.into()),
Error::Connecting(e.into()),
))
.await;
}
},
Err(e) => {
let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await;
let _ = self.sender.send(UpdateMessage::Error(Error::Connecting(ConnectionError::SendPresence(WriteError::Actor(e.into()))))).await;
}
}
}
@ -212,7 +216,7 @@ impl Luz {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connection(e.into()),
Error::Connecting(e.into()),
))
.await;
}
@ -221,8 +225,12 @@ impl Luz {
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(Error::Connection(
ConnectionError::RosterRetreival(e.into()),
.send(UpdateMessage::Error(Error::Connecting(
ConnectionError::RosterRetreival(
RosterError::Write(WriteError::Actor(
e.into(),
)),
),
)))
.await;
}
@ -230,7 +238,7 @@ impl Luz {
}
Err(e) => {
let _ =
self.sender.send(UpdateMessage::Error(Error::Connection(
self.sender.send(UpdateMessage::Error(Error::Connecting(
ConnectionError::ConnectionFailed(e.into()),
)));
}
@ -286,7 +294,7 @@ impl Luz {
impl CommandMessage {
pub async fn handle_offline(
mut self,
self,
jid: Arc<Mutex<JID>>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@ -301,7 +309,7 @@ impl CommandMessage {
let _ = sender.send(Ok(roster));
}
Err(e) => {
let _ = sender.send(Err(RosterError(e.into())));
let _ = sender.send(Err(RosterError::Cache(e.into())));
}
}
}
@ -331,45 +339,48 @@ impl CommandMessage {
}
// TODO: offline queue to modify roster
CommandMessage::AddContact(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::BuddyRequest(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::SubscriptionRequest(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::AcceptBuddyRequest(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnsubscribeFromContact(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnsubscribeContact(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnfriendContact(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::DeleteContact(jid, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::UpdateContact(jid, contact_update, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::SetStatus(online, sender) => {
let result = db
.upsert_cached_status(online)
.await
.map_err(|e| StatusError(e.into()));
.map_err(|e| StatusError::Cache(e.into()));
sender.send(result);
}
// TODO: offline message queue
CommandMessage::SendMessage(jid, body, sender) => {
sender.send(Err(Reason::Disconnected));
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::SendPresence(jid, presence, sender) => {
sender.send(Err(WriteError::Disconnected));
}
}
}
@ -382,7 +393,7 @@ impl CommandMessage {
client_jid: Arc<Mutex<JID>>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) {
match self {
CommandMessage::Connect => unreachable!(),
@ -424,11 +435,12 @@ impl CommandMessage {
Ok(Ok(())) => info!("roster request sent"),
Ok(Err(e)) => {
// TODO: log errors if fail to send
let _ = result_sender.send(Err(RosterError(e.into())));
let _ = result_sender.send(Err(RosterError::Write(e.into())));
return;
}
Err(e) => {
let _ = result_sender.send(Err(RosterError(e.into())));
let _ = result_sender
.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
};
@ -448,23 +460,41 @@ impl CommandMessage {
items.into_iter().map(|item| item.into()).collect();
if let Err(e) = db.replace_cached_roster(contacts.clone()).await {
update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
.send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
e.into(),
))))
.await;
};
result_sender.send(Ok(contacts));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
result_sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s))));
result_sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Ok(Err(e)) => {
result_sender.send(Err(RosterError(e.into())));
result_sender.send(Err(RosterError::Read(e)));
return;
}
Err(e) => {
result_sender.send(Err(RosterError(e.into())));
result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@ -525,8 +555,8 @@ impl CommandMessage {
}
// TODO: write_handle send helper function
let result = write_handle.write(set_stanza).await;
if let Err(_) = result {
sender.send(result);
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
@ -545,24 +575,24 @@ impl CommandMessage {
sender.send(Ok(()));
return;
}
Stanza::Iq(Iq {
ref s @ Stanza::Iq(Iq {
from: _,
id,
ref id,
to: _,
r#type,
lang: _,
query: _,
errors,
}) if id == iq_id && r#type == IqType::Error => {
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(Reason::Stanza(Some(error.clone()))));
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(Reason::Stanza(None)));
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(Reason::UnexpectedStanza(s)));
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
@ -572,7 +602,7 @@ impl CommandMessage {
}
},
Err(e) => {
sender.send(Err(e.into()));
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@ -770,8 +800,8 @@ impl CommandMessage {
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let result = write_handle.write(set_stanza).await;
if let Err(_) = result {
sender.send(result);
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
@ -790,24 +820,24 @@ impl CommandMessage {
sender.send(Ok(()));
return;
}
Stanza::Iq(Iq {
ref s @ Stanza::Iq(Iq {
from: _,
id,
ref id,
to: _,
r#type,
lang: _,
query: _,
errors,
}) if id == iq_id && r#type == IqType::Error => {
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(Reason::Stanza(Some(error.clone()))));
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(Reason::Stanza(None)));
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(Reason::UnexpectedStanza(s)));
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
@ -817,7 +847,7 @@ impl CommandMessage {
}
},
Err(e) => {
sender.send(Err(e.into()));
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@ -858,8 +888,8 @@ impl CommandMessage {
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let result = write_handle.write(set_stanza).await;
if let Err(_) = result {
sender.send(result);
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
@ -878,24 +908,24 @@ impl CommandMessage {
sender.send(Ok(()));
return;
}
Stanza::Iq(Iq {
ref s @ Stanza::Iq(Iq {
from: _,
id,
ref id,
to: _,
r#type,
lang: _,
query: _,
errors,
}) if id == iq_id && r#type == IqType::Error => {
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(Reason::Stanza(Some(error.clone()))));
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(Reason::Stanza(None)));
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(Reason::UnexpectedStanza(s)));
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
@ -905,7 +935,7 @@ impl CommandMessage {
}
},
Err(e) => {
sender.send(Err(e.into()));
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
@ -914,13 +944,16 @@ impl CommandMessage {
let result = db.upsert_cached_status(online.clone()).await;
if let Err(e) = result {
let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
.send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
e.into(),
))))
.await;
}
let result = write_handle
.write(Stanza::Presence(online.into()))
.await
.map_err(|e| StatusError(e));
.map_err(|e| StatusError::Write(e));
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
// TODO: offline message queue
@ -956,7 +989,9 @@ impl CommandMessage {
};
if let Err(e) = db.create_message(message, jid).await.map_err(|e| e.into())
{
let _ = update_sender.send(UpdateMessage::Error(Error::CacheUpdate(e)));
let _ = update_sender.send(UpdateMessage::Error(Error::MessageSend(
error::MessageSendError::MessageHistory(e),
)));
}
let _ = sender.send(Ok(()));
}
@ -965,6 +1000,15 @@ impl CommandMessage {
}
}
}
CommandMessage::SendPresence(jid, presence, sender) => {
let mut presence: stanza::client::presence::Presence = presence.into();
if let Some(jid) = jid {
presence.to = Some(jid);
};
let result = write_handle.write(Stanza::Presence(presence)).await;
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
}
}
}
@ -999,11 +1043,12 @@ impl DerefMut for LuzHandle {
}
impl LuzHandle {
// TODO: database creation separate
pub async fn new(
jid: JID,
password: String,
db: &str,
) -> Result<(Self, mpsc::Receiver<UpdateMessage>), Reason> {
) -> Result<(Self, mpsc::Receiver<UpdateMessage>), DatabaseError> {
let db = SqlitePool::connect(db).await?;
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
@ -1030,8 +1075,59 @@ impl LuzHandle {
update_receiver,
))
}
pub async fn connect(&self) {
self.send(CommandMessage::Connect).await;
}
pub async fn disconnect(&self, offline: Offline) {
self.send(CommandMessage::Disconnect(offline)).await;
}
// pub async fn get_roster(&self) -> Result<Vec<Contact>, RosterError> {
// let (send, recv) = oneshot::channel();
// self.send(CommandMessage::GetRoster(send)).await.map_err(|e| RosterError::)?;
// Ok(recv.await?)
// }
// pub async fn get_chats(&self) -> Result<Vec<Chat>, Error> {}
// pub async fn get_chat(&self, jid: JID) -> Result<Chat, Error> {}
// pub async fn get_messages(&self, jid: JID) -> Result<Vec<Message>, Error> {}
// pub async fn delete_chat(&self, jid: JID) -> Result<(), Error> {}
// pub async fn delete_message(&self, id: Uuid) -> Result<(), Error> {}
// pub async fn get_user(&self, jid: JID) -> Result<User, Error> {}
// pub async fn add_contact(&self, jid: JID) -> Result<(), Error> {}
// pub async fn buddy_request(&self, jid: JID) -> Result<(), Error> {}
// pub async fn subscription_request(&self, jid: JID) -> Result<(), Error> {}
// pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), Error> {}
// pub async fn accept_subscription_request(&self, jid: JID) -> Result<(), Error> {}
// pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), Error> {}
// pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), Error> {}
// pub async fn unfriend_contact(&self, jid: JID) -> Result<(), Error> {}
// pub async fn delete_contact(&self, jid: JID) -> Result<(), Error> {}
// pub async fn update_contact(&self, jid: JID, update: ContactUpdate) -> Result<(), Error> {}
// pub async fn set_status(&self, online: Online) -> Result<(), Error> {}
// pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), Error> {}
}
// TODO: generate methods for each with a macro
pub enum CommandMessage {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
@ -1042,46 +1138,51 @@ pub enum CommandMessage {
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, Reason>>),
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, Reason>>),
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Reason>>),
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), Reason>>),
DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
DeleteMessage(Uuid, oneshot::Sender<Result<(), Reason>>),
DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
/// get a user from your users database
GetUser(JID, oneshot::Sender<Result<User, Reason>>),
GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
/// add a contact to your roster, with a status of none, no subscriptions.
// TODO: for all these, consider returning with oneshot::Sender<Result<(), Error>>
AddContact(JID, oneshot::Sender<Result<(), Reason>>),
AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
BuddyRequest(JID, oneshot::Sender<Result<(), Reason>>),
BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
SubscriptionRequest(JID, oneshot::Sender<Result<(), Reason>>),
SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), Reason>>),
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), Reason>>),
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// unsubscribe to a contact, but don't remove their subscription.
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), Reason>>),
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
UnsubscribeContact(JID, oneshot::Sender<Result<(), Reason>>),
UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove subscriptions to and from contact, but keep in roster.
UnfriendContact(JID, oneshot::Sender<Result<(), Reason>>),
UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
DeleteContact(JID, oneshot::Sender<Result<(), Reason>>),
DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Reason>>),
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send presence stanza
SendPresence(
Option<JID>,
Presence,
oneshot::Sender<Result<(), WriteError>>,
),
/// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>),
SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
#[derive(Debug, Clone)]

View File

@ -110,3 +110,12 @@ impl From<Offline> for stanza::client::presence::Presence {
}
}
}
impl From<Presence> for stanza::client::presence::Presence {
fn from(value: Presence) -> Self {
match value {
Presence::Online(online) => online.into(),
Presence::Offline(offline) => offline.into(),
}
}
}