luz/luz/src/lib.rs

1436 lines
63 KiB
Rust

use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use chat::{Body, Chat, Message};
use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
use error::{
ActorError, CommandError, ConnectionError, DatabaseError, ReadError, RosterError, StatusError,
WriteError,
};
use futures::{future::Fuse, FutureExt};
use jabber::JID;
use presence::{Offline, Online, Presence};
use roster::{Contact, ContactUpdate};
use sqlx::SqlitePool;
use stanza::client::{
iq::{self, Iq, IqType},
Stanza,
};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
time::timeout,
};
use tracing::{debug, info};
use user::User;
use uuid::Uuid;
use crate::connection::write::WriteHandle;
use crate::connection::{SupervisorCommand, SupervisorHandle};
use crate::error::Error;
pub mod chat;
mod connection;
mod db;
mod error;
pub mod presence;
pub mod roster;
pub mod user;
pub struct Luz {
command_sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
// TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
tasks: JoinSet<()>,
}
impl Luz {
fn new(
command_sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
password: String,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
) -> Self {
Self {
jid,
password: Arc::new(password),
connected,
db: Db::new(db),
receiver,
sender,
tasks: JoinSet::new(),
connection_supervisor_shutdown,
pending_iqs: Arc::new(Mutex::new(HashMap::new())),
command_sender,
}
}
async fn run(mut self) {
loop {
let msg = tokio::select! {
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
// THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
_ = &mut self.connection_supervisor_shutdown => {
*self.connected.lock().await = None;
continue;
}
Some(msg) = self.receiver.recv() => {
msg
},
else => break,
};
// TODO: consider separating disconnect/connect and commands apart from commandmessage
// TODO: dispatch commands separate tasks
match msg {
CommandMessage::Connect => {
let mut connection_lock = self.connected.lock().await;
match connection_lock.as_ref() {
Some(_) => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected))
.await;
}
None => {
let streams_result;
{
let mut jid = self.jid.lock().await;
let mut domain = jid.domainpart.clone();
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
streams_result = jabber::connect_and_login(
&mut jid,
&*self.password,
&mut domain,
)
.await;
debug!("connected and logged in as {}", jid);
debug!("test");
}
match streams_result {
Ok(s) => {
debug!("ok stream result");
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
let (writer, supervisor) = SupervisorHandle::new(
s,
self.sender.clone(),
self.db.clone(),
shutdown_send,
self.jid.clone(),
self.password.clone(),
self.pending_iqs.clone(),
);
let shutdown_recv = shutdown_recv.fuse();
self.connection_supervisor_shutdown = shutdown_recv;
// TODO: get roster and send initial presence
let (send, recv) = oneshot::channel();
debug!("getting roster");
CommandMessage::GetRoster(send)
.handle_online(
writer.clone(),
supervisor.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)
.await;
debug!("sent roster req");
let roster = recv.await;
debug!("got roster");
match roster {
Ok(r) => {
match r {
Ok(roster) => {
let online = self.db.read_cached_status().await;
let online = match online {
Ok(online) => online,
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connecting(
ConnectionError::StatusCacheError(
e.into(),
),
),
))
.await;
Online::default()
}
};
let (send, recv) = oneshot::channel();
CommandMessage::SendPresence(
None,
Presence::Online(online.clone()),
send,
)
.handle_online(
writer.clone(),
supervisor.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)
.await;
let set_status = recv.await;
match set_status {
Ok(s) => match s {
Ok(()) => {
*connection_lock =
Some((writer, supervisor));
let _ = self
.sender
.send(UpdateMessage::Online(
online, roster,
))
.await;
continue;
}
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connecting(e.into()),
))
.await;
}
},
Err(e) => {
let _ = self.sender.send(UpdateMessage::Error(Error::Connecting(ConnectionError::SendPresence(WriteError::Actor(e.into()))))).await;
}
}
}
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connecting(e.into()),
))
.await;
}
}
}
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(Error::Connecting(
ConnectionError::RosterRetreival(
RosterError::Write(WriteError::Actor(
e.into(),
)),
),
)))
.await;
}
}
}
Err(e) => {
tracing::error!("error: {}", e);
let _ =
self.sender.send(UpdateMessage::Error(Error::Connecting(
ConnectionError::ConnectionFailed(e.into()),
)));
}
}
}
};
}
CommandMessage::Disconnect(offline) => {
match self.connected.lock().await.as_mut() {
None => {
let _ = self
.sender
.send(UpdateMessage::Error(Error::AlreadyDisconnected))
.await;
}
mut c => {
// TODO: send unavailable presence
if let Some((write_handle, supervisor_handle)) = c.take() {
let offline_presence: stanza::client::presence::Presence =
offline.clone().into();
let stanza = Stanza::Presence(offline_presence);
// TODO: timeout and error check
write_handle.write(stanza).await;
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
let _ = self.sender.send(UpdateMessage::Offline(offline)).await;
} else {
unreachable!()
};
}
}
}
_ => {
match self.connected.lock().await.as_ref() {
Some((w, s)) => self.tasks.spawn(msg.handle_online(
w.clone(),
s.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)),
None => self.tasks.spawn(msg.handle_offline(
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
)),
};
}
}
}
}
}
impl CommandMessage {
pub async fn handle_offline(
self,
jid: Arc<Mutex<JID>>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
) {
match self {
CommandMessage::Connect => unreachable!(),
CommandMessage::Disconnect(offline) => unreachable!(),
CommandMessage::GetRoster(sender) => {
let roster = db.read_cached_roster().await;
match roster {
Ok(roster) => {
let _ = sender.send(Ok(roster));
}
Err(e) => {
let _ = sender.send(Err(RosterError::Cache(e.into())));
}
}
}
CommandMessage::GetChats(sender) => {
let chats = db.read_chats().await.map_err(|e| e.into());
sender.send(chats);
}
CommandMessage::GetChat(jid, sender) => {
let chats = db.read_chat(jid).await.map_err(|e| e.into());
sender.send(chats);
}
CommandMessage::GetMessages(jid, sender) => {
let messages = db.read_message_history(jid).await.map_err(|e| e.into());
sender.send(messages);
}
CommandMessage::DeleteChat(jid, sender) => {
let result = db.delete_chat(jid).await.map_err(|e| e.into());
sender.send(result);
}
CommandMessage::DeleteMessage(uuid, sender) => {
let result = db.delete_message(uuid).await.map_err(|e| e.into());
sender.send(result);
}
CommandMessage::GetUser(jid, sender) => {
let user = db.read_user(jid).await.map_err(|e| e.into());
sender.send(user);
}
// TODO: offline queue to modify roster
CommandMessage::AddContact(jid, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::BuddyRequest(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::SubscriptionRequest(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::AcceptBuddyRequest(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnsubscribeFromContact(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnsubscribeContact(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::UnfriendContact(jid, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::DeleteContact(jid, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::UpdateContact(jid, contact_update, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
CommandMessage::SetStatus(online, sender) => {
let result = db
.upsert_cached_status(online)
.await
.map_err(|e| StatusError::Cache(e.into()));
sender.send(result);
}
// TODO: offline message queue
CommandMessage::SendMessage(jid, body, sender) => {
sender.send(Err(WriteError::Disconnected));
}
CommandMessage::SendPresence(jid, presence, sender) => {
sender.send(Err(WriteError::Disconnected));
}
}
}
pub async fn handle_online(
mut self,
write_handle: WriteHandle,
supervisor_control: SupervisorSender,
// TODO: jid could lose resource by the end
client_jid: Arc<Mutex<JID>>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
) {
match self {
CommandMessage::Connect => unreachable!(),
CommandMessage::Disconnect(_) => unreachable!(),
CommandMessage::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection
let owned_jid: JID;
debug!("before client_jid lock");
{
owned_jid = client_jid.lock().await.clone();
}
debug!("after client_jid lock");
let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel();
{
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: Vec::new(),
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
let _ = write_handle
.send(WriteMessage {
stanza,
respond_to: send,
})
.await;
// TODO: timeout
match recv.await {
Ok(Ok(())) => info!("roster request sent"),
Ok(Err(e)) => {
// TODO: log errors if fail to send
let _ = result_sender.send(Err(RosterError::Write(e.into())));
return;
}
Err(e) => {
let _ = result_sender
.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
};
// TODO: timeout
match iq_recv.await {
Ok(Ok(stanza)) => match stanza {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
let contacts: Vec<Contact> =
items.into_iter().map(|item| item.into()).collect();
if let Err(e) = db.replace_cached_roster(contacts.clone()).await {
update_sender
.send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
e.into(),
))))
.await;
};
result_sender.send(Ok(contacts));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
result_sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
result_sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Ok(Err(e)) => {
result_sender.send(Err(RosterError::Read(e)));
return;
}
Err(e) => {
result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
CommandMessage::GetChats(sender) => {
let chats = db.read_chats().await.map_err(|e| e.into());
sender.send(chats);
}
CommandMessage::GetChat(jid, sender) => {
let chats = db.read_chat(jid).await.map_err(|e| e.into());
sender.send(chats);
}
CommandMessage::GetMessages(jid, sender) => {
let messages = db.read_message_history(jid).await.map_err(|e| e.into());
sender.send(messages);
}
CommandMessage::DeleteChat(jid, sender) => {
let result = db.delete_chat(jid).await.map_err(|e| e.into());
sender.send(result);
}
CommandMessage::DeleteMessage(uuid, sender) => {
let result = db.delete_message(uuid).await.map_err(|e| e.into());
sender.send(result);
}
CommandMessage::GetUser(jid, sender) => {
let user = db.read_user(jid).await.map_err(|e| e.into());
sender.send(user);
}
// TODO: offline queue to modify roster
CommandMessage::AddContact(jid, sender) => {
let owned_jid;
{
owned_jid = client_jid.lock().await.clone();
}
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: None,
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
// TODO: write_handle send helper function
let result = write_handle.write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
CommandMessage::BuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
}
}
CommandMessage::SubscriptionRequest(jid, sender) => {
// TODO: i should probably have builders
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
CommandMessage::AcceptBuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
}
}
CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
CommandMessage::UnsubscribeFromContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
CommandMessage::UnsubscribeContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
CommandMessage::UnfriendContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
}
Ok(()) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
lang: None,
show: None,
status: None,
priority: None,
errors: Vec::new(),
});
let result = write_handle.write(presence).await;
let _ = sender.send(result);
}
}
}
CommandMessage::DeleteContact(jid, sender) => {
let owned_jid;
{
owned_jid = client_jid.lock().await.clone();
}
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: None,
subscription: Some(stanza::roster::Subscription::Remove),
groups: Vec::new(),
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let result = write_handle.write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
CommandMessage::UpdateContact(jid, contact_update, sender) => {
let owned_jid;
{
owned_jid = client_jid.lock().await.clone();
}
let iq_id = Uuid::new_v4().to_string();
let groups = Vec::from_iter(
contact_update
.groups
.into_iter()
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
lang: None,
query: Some(iq::Query::Roster(stanza::roster::Query {
ver: None,
items: vec![stanza::roster::Item {
approved: None,
ask: false,
jid,
name: contact_update.name,
subscription: None,
groups,
}],
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
{
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let result = write_handle.write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
}
let iq_result = recv.await;
match iq_result {
Ok(i) => match i {
Ok(iq_result) => match iq_result {
Stanza::Iq(Iq {
from: _,
id,
to: _,
r#type,
lang: _,
query: _,
errors: _,
}) if id == iq_id && r#type == IqType::Result => {
sender.send(Ok(()));
return;
}
ref s @ Stanza::Iq(Iq {
from: _,
ref id,
to: _,
r#type,
lang: _,
query: _,
ref errors,
}) if *id == iq_id && r#type == IqType::Error => {
if let Some(error) = errors.first() {
sender.send(Err(RosterError::StanzaError(error.clone())));
} else {
sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
}
return;
}
s => {
sender.send(Err(RosterError::UnexpectedStanza(s)));
return;
}
},
Err(e) => {
sender.send(Err(e.into()));
return;
}
},
Err(e) => {
sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
return;
}
}
}
CommandMessage::SetStatus(online, sender) => {
let result = db.upsert_cached_status(online.clone()).await;
if let Err(e) = result {
let _ = update_sender
.send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
e.into(),
))))
.await;
}
let result = write_handle
.write(Stanza::Presence(online.into()))
.await
.map_err(|e| StatusError::Write(e));
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
// TODO: offline message queue
CommandMessage::SendMessage(jid, body, sender) => {
let id = Uuid::new_v4();
let owned_jid: JID;
{
// TODO: timeout
owned_jid = client_jid.lock().await.clone();
}
let message = Stanza::Message(stanza::client::message::Message {
from: Some(owned_jid.clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
lang: None,
subject: None,
body: Some(stanza::client::message::Body {
lang: None,
body: Some(body.body.clone()),
}),
thread: None,
});
let _ = sender.send(Ok(()));
// let _ = sender.send(Ok(message.clone()));
let result = write_handle.write(message).await;
match result {
Ok(_) => {
let mut message = Message {
id,
from: owned_jid,
body,
};
info!("send message {:?}", message);
if let Err(e) = db
.create_message_with_self_resource_and_chat(
message.clone(),
jid.clone(),
)
.await
.map_err(|e| e.into())
{
tracing::error!("{}", e);
let _ = update_sender.send(UpdateMessage::Error(Error::MessageSend(
error::MessageSendError::MessageHistory(e),
)));
}
// TODO: don't do this, have separate from from details
message.from = message.from.as_bare();
let _ = update_sender
.send(UpdateMessage::Message { to: jid, message })
.await;
}
Err(_) => {
// let _ = sender.send(result);
}
}
}
CommandMessage::SendPresence(jid, presence, sender) => {
let mut presence: stanza::client::presence::Presence = presence.into();
if let Some(jid) = jid {
presence.to = Some(jid);
};
let result = write_handle.write(Stanza::Presence(presence)).await;
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
}
}
}
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
// #[derive(Clone)]
#[derive(Debug)]
pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>,
timeout: Duration,
}
impl Clone for LuzHandle {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
timeout: self.timeout,
}
}
}
impl Deref for LuzHandle {
type Target = mpsc::Sender<CommandMessage>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for LuzHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
impl LuzHandle {
// TODO: database creation separate
pub async fn new(
jid: JID,
password: String,
db: &str,
) -> Result<(Self, mpsc::Receiver<UpdateMessage>), DatabaseError> {
let db = SqlitePool::connect(db).await?;
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
let (sup_send, sup_recv) = oneshot::channel();
let mut sup_recv = sup_recv.fuse();
let actor = Luz::new(
command_sender.clone(),
command_receiver,
Arc::new(Mutex::new(jid)),
password,
Arc::new(Mutex::new(None)),
sup_recv,
db,
update_sender,
);
tokio::spawn(async move { actor.run().await });
Ok((
Self {
sender: command_sender,
// TODO: configure timeout
timeout: Duration::from_secs(10),
},
update_receiver,
))
}
pub async fn connect(&self) -> Result<(), ActorError> {
self.send(CommandMessage::Connect).await?;
Ok(())
}
pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
self.send(CommandMessage::Disconnect(offline)).await?;
Ok(())
}
pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::GetRoster(send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let roster = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(roster)
}
pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::GetChats(send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chats = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chats)
}
pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::GetChat(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let chat = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(chat)
}
pub async fn get_messages(
&self,
jid: JID,
) -> Result<Vec<Message>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::GetMessages(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let messages = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(messages)
}
pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::DeleteChat(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::DeleteMessage(id, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::GetUser(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::AddContact(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::BuddyRequest(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::SubscriptionRequest(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::AcceptBuddyRequest(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn accept_subscription_request(
&self,
jid: JID,
) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::AcceptSubscriptionRequest(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::UnsubscribeFromContact(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::UnsubscribeContact(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::UnfriendContact(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::DeleteContact(jid, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn update_contact(
&self,
jid: JID,
update: ContactUpdate,
) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::UpdateContact(jid, update, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::SetStatus(online, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CommandMessage::SendMessage(jid, body, send))
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
let result = timeout(self.timeout, recv)
.await
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
}
// TODO: generate methods for each with a macro
pub enum CommandMessage {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
Connect,
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect(Offline),
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
/// get a user from your users database
GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
/// add a contact to your roster, with a status of none, no subscriptions.
AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
/// unsubscribe to a contact, but don't remove their subscription.
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove subscriptions to and from contact, but keep in roster.
UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send presence stanza
SendPresence(
Option<JID>,
Presence,
oneshot::Sender<Result<(), WriteError>>,
),
/// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
#[derive(Debug, Clone)]
pub enum UpdateMessage {
Error(Error),
Online(Online, Vec<Contact>),
Offline(Offline),
/// received roster from jabber server (replace full app roster state with this)
/// is this needed?
FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
RosterUpdate(Contact),
RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
from: JID,
presence: Presence,
},
// TODO: receipts
// MessageDispatched(Uuid),
Message {
to: JID,
message: Message,
},
SubscriptionRequest(jid::JID),
}