WIP: roster retrieval

This commit is contained in:
cel 🌸 2025-02-18 06:14:43 +00:00
parent 68a7d13670
commit 5dd488550f
11 changed files with 466 additions and 75 deletions

View File

@ -6,6 +6,7 @@ create table users(
jid text primary key not null, jid text primary key not null,
-- can receive presence status from non-contacts -- can receive presence status from non-contacts
cached_status_message text cached_status_message text
-- TODO: last_seen
); );
-- enum for subscription state -- enum for subscription state
@ -55,6 +56,8 @@ create table messages (
-- check ((chat_id == null) <> (channel_id == null)), -- check ((chat_id == null) <> (channel_id == null)),
-- check ((chat_id == null) or (channel_id == null)), -- check ((chat_id == null) or (channel_id == null)),
-- user is the current "owner" of the message -- user is the current "owner" of the message
-- TODO: queued messages offline
-- TODO: timestamp
-- TODO: icky -- TODO: icky
-- the user to show it coming from (not necessarily the original sender) -- the user to show it coming from (not necessarily the original sender)
@ -68,3 +71,19 @@ create table messages (
foreign key(from_jid) references users(jid), foreign key(from_jid) references users(jid),
foreign key(originally_from) references users(jid) foreign key(originally_from) references users(jid)
); );
-- enum for subscription state
create table show (
state text primary key not null
);
insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away');
create table cached_status (
id integer primary key not null,
show text,
message text,
foreign key(show) references show(state)
);
insert into cached_status (id) values (0);

View File

@ -17,7 +17,11 @@ use tokio::{
}; };
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
use crate::{db::Db, error::Error, UpdateMessage}; use crate::{
db::Db,
error::{Error, Reason},
UpdateMessage,
};
mod read; mod read;
pub(crate) mod write; pub(crate) mod write;
@ -31,7 +35,7 @@ pub struct Supervisor {
tokio::task::JoinSet<()>, tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle, writer_handle: WriteControlHandle,
@ -57,7 +61,7 @@ pub enum State {
tokio::task::JoinSet<()>, tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
), ),
), ),
} }
@ -72,7 +76,7 @@ impl Supervisor {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle, writer_handle: WriteControlHandle,
@ -172,9 +176,10 @@ impl Supervisor {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_state.close(); write_state.close();
while let Some(msg) = write_state.recv().await { while let Some(msg) = write_state.recv().await {
let _ = msg.respond_to.send(Err(Error::LostConnection)); let _ = msg.respond_to.send(Err(Reason::LostConnection));
} }
let _ = self.sender.send(UpdateMessage::Error(e.into())).await; // TODO: is this the correct error?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break; break;
}, },
} }
@ -218,11 +223,12 @@ impl Supervisor {
Err(e) => { Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_recv.close(); write_recv.close();
let _ = write_msg.respond_to.send(Err(Error::LostConnection)); let _ = write_msg.respond_to.send(Err(Reason::LostConnection));
while let Some(msg) = write_recv.recv().await { while let Some(msg) = write_recv.recv().await {
let _ = msg.respond_to.send(Err(Error::LostConnection)); let _ = msg.respond_to.send(Err(Reason::LostConnection));
} }
let _ = self.sender.send(UpdateMessage::Error(e.into())).await; // TODO: is this the correct error to send?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break; break;
}, },
} }
@ -268,12 +274,13 @@ impl Supervisor {
// if reconnection failure, respond to all current messages with lost connection error. // if reconnection failure, respond to all current messages with lost connection error.
write_receiver.close(); write_receiver.close();
if let Some(msg) = retry_msg { if let Some(msg) = retry_msg {
msg.respond_to.send(Err(Error::LostConnection)); msg.respond_to.send(Err(Reason::LostConnection));
} }
while let Some(msg) = write_receiver.recv().await { while let Some(msg) = write_receiver.recv().await {
msg.respond_to.send(Err(Error::LostConnection)); msg.respond_to.send(Err(Reason::LostConnection));
} }
let _ = self.sender.send(UpdateMessage::Error(e.into())).await; // TODO: is this the correct error?
let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break; break;
}, },
} }
@ -331,7 +338,7 @@ impl SupervisorHandle {
on_shutdown: oneshot::Sender<()>, on_shutdown: oneshot::Sender<()>,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
password: Arc<String>, password: Arc<String>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> (WriteHandle, Self) { ) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20); let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel(); let (writer_error_sender, writer_error_receiver) = oneshot::channel();

View File

@ -13,7 +13,11 @@ use tokio::{
}; };
use tracing::info; use tracing::info;
use crate::{db::Db, error::Error, UpdateMessage}; use crate::{
db::Db,
error::{Error, Reason},
UpdateMessage,
};
use super::{ use super::{
write::{WriteHandle, WriteMessage}, write::{WriteHandle, WriteMessage},
@ -29,7 +33,7 @@ pub struct Read {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
db: Db, db: Db,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
@ -39,7 +43,7 @@ pub struct Read {
disconnecting: bool, disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>, disconnect_timedout: oneshot::Receiver<()>,
// TODO: use proper stanza ids // TODO: use proper stanza ids
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
} }
impl Read { impl Read {
@ -52,7 +56,7 @@ impl Read {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
db: Db, db: Db,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
@ -60,7 +64,7 @@ impl Read {
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle, write_handle: WriteHandle,
tasks: JoinSet<()>, tasks: JoinSet<()>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> Self { ) -> Self {
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
Self { Self {
@ -153,7 +157,7 @@ impl Read {
// when it aborts, must clear iq map no matter what // when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await; let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() { for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(Error::LostConnection)); let _ = sender.send(Err(Reason::LostConnection));
} }
} }
} }
@ -182,7 +186,7 @@ pub enum ReadControl {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
), ),
} }
@ -215,13 +219,13 @@ impl ReadControlHandle {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
db: Db, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle, jabber_write: WriteHandle,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> Self { ) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20); let (control_sender, control_receiver) = mpsc::channel(20);
@ -252,14 +256,14 @@ impl ReadControlHandle {
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>, )>,
db: Db, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle, jabber_write: WriteHandle,
tasks: JoinSet<()>, tasks: JoinSet<()>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> Self { ) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20); let (control_sender, control_receiver) = mpsc::channel(20);

View File

@ -7,7 +7,7 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
use crate::error::Error; use crate::error::{Error, Reason};
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. // actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write { pub struct Write {
@ -19,7 +19,7 @@ pub struct Write {
pub struct WriteMessage { pub struct WriteMessage {
pub stanza: Stanza, pub stanza: Stanza,
pub respond_to: oneshot::Sender<Result<(), Error>>, pub respond_to: oneshot::Sender<Result<(), Reason>>,
} }
pub enum WriteControl { pub enum WriteControl {
@ -84,9 +84,9 @@ impl Write {
Err(e) => match &e { Err(e) => match &e {
peanuts::Error::ReadError(_error) => { peanuts::Error::ReadError(_error) => {
// if connection lost during disconnection, just send lost connection error to the write requests // if connection lost during disconnection, just send lost connection error to the write requests
let _ = msg.respond_to.send(Err(Error::LostConnection)); let _ = msg.respond_to.send(Err(Reason::LostConnection));
while let Some(msg) = self.stanza_receiver.recv().await { while let Some(msg) = self.stanza_receiver.recv().await {
let _ = msg.respond_to.send(Err(Error::LostConnection)); let _ = msg.respond_to.send(Err(Reason::LostConnection));
} }
break; break;
} }

View File

@ -6,6 +6,7 @@ use uuid::Uuid;
use crate::{ use crate::{
chat::{Chat, Message}, chat::{Chat, Message},
presence::Online,
roster::Contact, roster::Contact,
user::User, user::User,
}; };
@ -315,4 +316,29 @@ impl Db {
.await?; .await?;
Ok(messages) Ok(messages)
} }
pub async fn read_cached_status(&self) -> Result<Online, Error> {
let online: Online = sqlx::query_as("select * from cached_status where id = 0")
.fetch_one(&self.db)
.await?;
Ok(online)
}
pub async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
sqlx::query!(
"insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
status.show,
status.status,
status.show,
status.status
).execute(&self.db).await?;
Ok(())
}
pub async fn delete_cached_status(&self) -> Result<(), Error> {
sqlx::query!("update cached_status set show = null, message = null where id = 0")
.execute(&self.db)
.await?;
Ok(())
}
} }

View File

@ -1,6 +1,11 @@
use stanza::client::Stanza;
use tokio::sync::oneshot::{self};
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
AlreadyConnected, AlreadyConnected,
// TODO: change to Connecting(ConnectingError)
Connection(ConnectionError),
Presence(Reason), Presence(Reason),
Roster(Reason), Roster(Reason),
SendMessage(Reason), SendMessage(Reason),
@ -8,6 +13,42 @@ pub enum Error {
LostConnection, LostConnection,
} }
#[derive(Debug)]
pub enum ConnectionError {
ConnectionFailed(Reason),
RosterRetreival(Reason),
SendPresence(Reason),
NoCachedStatus(Reason),
}
pub struct RosterError(pub Reason);
impl From<RosterError> for Error {
fn from(e: RosterError) -> Self {
Self::Roster(e.0)
}
}
impl From<RosterError> for ConnectionError {
fn from(e: RosterError) -> Self {
Self::RosterRetreival(e.0)
}
}
pub struct StatusError(Reason);
impl From<StatusError> for Error {
fn from(e: StatusError) -> Self {
Error::Presence(e.0)
}
}
impl From<StatusError> for ConnectionError {
fn from(e: StatusError) -> Self {
Self::SendPresence(e.0)
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum Reason { pub enum Reason {
// TODO: organisastion of error into internal error thing // TODO: organisastion of error into internal error thing
@ -19,27 +60,35 @@ pub enum Reason {
SQL(sqlx::Error), SQL(sqlx::Error),
// JID(jid::ParseError), // JID(jid::ParseError),
LostConnection, LostConnection,
OneshotRecv(oneshot::error::RecvError),
UnexpectedStanza(Stanza),
} }
impl From<peanuts::Error> for Error { impl From<oneshot::error::RecvError> for Reason {
fn from(e: oneshot::error::RecvError) -> Reason {
Self::OneshotRecv(e)
}
}
impl From<peanuts::Error> for Reason {
fn from(e: peanuts::Error) -> Self { fn from(e: peanuts::Error) -> Self {
Self::XML(e) Self::XML(e)
} }
} }
// impl From<jid::ParseError> for Error { // impl From<jid::ParseError> for Reason {
// fn from(e: jid::ParseError) -> Self { // fn from(e: jid::ParseError) -> Self {
// Self::JID(e) // Self::JID(e)
// } // }
// } // }
impl From<sqlx::Error> for Error { impl From<sqlx::Error> for Reason {
fn from(e: sqlx::Error) -> Self { fn from(e: sqlx::Error) -> Self {
Self::SQL(e) Self::SQL(e)
} }
} }
impl From<jabber::Error> for Error { impl From<jabber::Error> for Reason {
fn from(e: jabber::Error) -> Self { fn from(e: jabber::Error) -> Self {
Self::Jabber(e) Self::Jabber(e)
} }

View File

@ -7,6 +7,7 @@ use std::{
use chat::{Body, Chat, Message}; use chat::{Body, Chat, Message};
use connection::{write::WriteMessage, SupervisorSender}; use connection::{write::WriteMessage, SupervisorSender};
use db::Db; use db::Db;
use error::{ConnectionError, Reason, RosterError, StatusError};
use jabber::JID; use jabber::JID;
use presence::{Offline, Online, Presence}; use presence::{Offline, Online, Presence};
use roster::{Contact, ContactUpdate}; use roster::{Contact, ContactUpdate};
@ -19,6 +20,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
task::JoinSet, task::JoinSet,
}; };
use tracing::info;
use user::User; use user::User;
use uuid::Uuid; use uuid::Uuid;
@ -35,12 +37,13 @@ mod roster;
mod user; mod user;
pub struct Luz { pub struct Luz {
command_sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<CommandMessage>, receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory // TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>, password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>, connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
db: Db, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
@ -52,6 +55,7 @@ pub struct Luz {
impl Luz { impl Luz {
fn new( fn new(
command_sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<CommandMessage>, receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
password: String, password: String,
@ -70,6 +74,7 @@ impl Luz {
tasks: JoinSet::new(), tasks: JoinSet::new(),
connection_supervisor_shutdown, connection_supervisor_shutdown,
pending_iqs: Arc::new(Mutex::new(HashMap::new())), pending_iqs: Arc::new(Mutex::new(HashMap::new())),
command_sender,
} }
} }
@ -117,30 +122,127 @@ impl Luz {
self.pending_iqs.clone(), self.pending_iqs.clone(),
); );
self.connection_supervisor_shutdown = shutdown_recv; self.connection_supervisor_shutdown = shutdown_recv;
*connection_lock = Some((writer, supervisor)); // TODO: get roster and send initial presence
self.sender.send(UpdateMessage::Connected(todo!())).await; let (send, recv) = oneshot::channel();
CommandMessage::GetRoster(send)
.handle_online(
writer.clone(),
supervisor.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)
.await;
let roster = recv.await;
match roster {
Ok(r) => {
match r {
Ok(roster) => {
let online = self.db.read_cached_status().await;
let online = match online {
Ok(online) => online,
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connection(
ConnectionError::NoCachedStatus(
e.into(),
),
),
))
.await;
Online::default()
}
};
let (send, recv) = oneshot::channel();
CommandMessage::SetStatus(online.clone(), send)
.handle_online(
writer.clone(),
supervisor.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
self.pending_iqs.clone(),
)
.await;
let set_status = recv.await;
match set_status {
Ok(s) => match s {
Ok(()) => {
*connection_lock =
Some((writer, supervisor));
let _ = self
.sender
.send(UpdateMessage::Online(
online, roster,
))
.await;
continue;
}
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connection(e.into()),
))
.await;
}
},
Err(e) => {
let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await;
}
}
}
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(
Error::Connection(e.into()),
))
.await;
}
}
}
Err(e) => {
let _ = self
.sender
.send(UpdateMessage::Error(Error::Connection(
ConnectionError::RosterRetreival(e.into()),
)))
.await;
}
}
} }
Err(e) => { Err(e) => {
self.sender.send(UpdateMessage::Error(e.into())); let _ =
self.sender.send(UpdateMessage::Error(Error::Connection(
ConnectionError::ConnectionFailed(e.into()),
)));
} }
} }
} }
}; };
} }
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { CommandMessage::Disconnect(_offline) => {
None => { match self.connected.lock().await.as_mut() {
self.sender None => {
.send(UpdateMessage::Error(Error::AlreadyDisconnected)) let _ = self
.await; .sender
.send(UpdateMessage::Error(Error::AlreadyDisconnected))
.await;
}
mut c => {
// TODO: send unavailable presence
if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
} else {
unreachable!()
};
}
} }
mut c => { }
if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
} else {
unreachable!()
};
}
},
_ => { _ => {
match self.connected.lock().await.as_ref() { match self.connected.lock().await.as_ref() {
Some((w, s)) => self.tasks.spawn(msg.handle_online( Some((w, s)) => self.tasks.spawn(msg.handle_online(
@ -168,9 +270,41 @@ impl CommandMessage {
mut self, mut self,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
db: Db, db: Db,
sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
) { ) {
todo!() match self {
CommandMessage::Connect => unreachable!(),
CommandMessage::Disconnect(offline) => unreachable!(),
CommandMessage::GetRoster(sender) => {
let roster = db.read_cached_roster().await;
match roster {
Ok(roster) => {
let _ = sender.send(Ok(roster));
}
Err(e) => {
let _ = sender.send(Err(RosterError(e.into())));
}
}
}
CommandMessage::GetChats(sender) => todo!(),
CommandMessage::GetChat(jid, sender) => todo!(),
CommandMessage::GetMessages(jid, sender) => todo!(),
CommandMessage::DeleteChat(jid, sender) => todo!(),
CommandMessage::DeleteMessage(uuid, sender) => todo!(),
CommandMessage::GetUser(jid, sender) => todo!(),
CommandMessage::AddContact(jid, sender) => todo!(),
CommandMessage::BuddyRequest(jid, sender) => todo!(),
CommandMessage::SubscriptionRequest(jid, sender) => todo!(),
CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(),
CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(),
CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(),
CommandMessage::UnsubscribeContact(jid, sender) => todo!(),
CommandMessage::UnfriendContact(jid, sender) => todo!(),
CommandMessage::DeleteContact(jid, sender) => todo!(),
CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(),
CommandMessage::SetStatus(online, sender) => todo!(),
CommandMessage::SendMessage(jid, body, sender) => todo!(),
}
} }
pub async fn handle_online( pub async fn handle_online(
@ -181,20 +315,25 @@ impl CommandMessage {
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
db: Db, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) { ) {
match self { match self {
CommandMessage::Connect => unreachable!(), CommandMessage::Connect => unreachable!(),
CommandMessage::Disconnect => unreachable!(), CommandMessage::Disconnect(_) => unreachable!(),
CommandMessage::GetRoster => { CommandMessage::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection // TODO: jid resource should probably be stored within the connection
let owned_jid: JID; let owned_jid: JID;
{ {
owned_jid = jid.lock().await.clone(); owned_jid = jid.lock().await.clone();
} }
let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel();
{
pending_iqs.lock().await.insert(iq_id.clone(), send);
}
let stanza = Stanza::Iq(Iq { let stanza = Stanza::Iq(Iq {
from: Some(owned_jid), from: Some(owned_jid),
id: "getting-roster".to_string(), id: iq_id.to_string(),
to: None, to: None,
r#type: IqType::Get, r#type: IqType::Get,
lang: None, lang: None,
@ -211,13 +350,69 @@ impl CommandMessage {
respond_to: send, respond_to: send,
}) })
.await; .await;
// TODO: timeout
match recv.await { match recv.await {
Ok(Ok(())) => println!("roster request sent"), Ok(Ok(())) => info!("roster request sent"),
e => println!("error: {:?}", e), Ok(Err(e)) => {
// TODO: log errors if fail to send
let _ = result_sender.send(Err(RosterError(e.into())));
return;
}
Err(e) => {
let _ = result_sender.send(Err(RosterError(e.into())));
return;
}
}; };
// TODO: timeout
match iq_recv.await {
Ok(Ok(stanza)) => match stanza {
Stanza::Iq(Iq {
from,
id,
to,
r#type,
lang,
query: Some(iq::Query::Roster(stanza::roster::Query { ver, items })),
errors,
}) if id == iq_id && r#type == IqType::Result => {
let contacts: Vec<Contact> =
items.into_iter().map(|item| item.into()).collect();
result_sender.send(Ok(contacts));
return;
}
s => {
result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s))));
return;
}
},
Ok(Err(e)) => {
result_sender.send(Err(RosterError(e.into())));
return;
}
Err(e) => {
result_sender.send(Err(RosterError(e.into())));
return;
}
}
} }
CommandMessage::SendMessage { id, to, body } => todo!(), CommandMessage::GetChats(sender) => todo!(),
_ => todo!(), CommandMessage::GetChat(jid, sender) => todo!(),
CommandMessage::GetMessages(jid, sender) => todo!(),
CommandMessage::DeleteChat(jid, sender) => todo!(),
CommandMessage::DeleteMessage(uuid, sender) => todo!(),
CommandMessage::GetUser(jid, sender) => todo!(),
CommandMessage::AddContact(jid, sender) => todo!(),
CommandMessage::BuddyRequest(jid, sender) => todo!(),
CommandMessage::SubscriptionRequest(jid, sender) => todo!(),
CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(),
CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(),
CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(),
CommandMessage::UnsubscribeContact(jid, sender) => todo!(),
CommandMessage::UnfriendContact(jid, sender) => todo!(),
CommandMessage::DeleteContact(jid, sender) => todo!(),
CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(),
CommandMessage::SetStatus(online, sender) => todo!(),
CommandMessage::SendMessage(jid, body, sender) => todo!(),
} }
} }
} }
@ -254,6 +449,7 @@ impl LuzHandle {
let (sup_send, sup_recv) = oneshot::channel(); let (sup_send, sup_recv) = oneshot::channel();
let actor = Luz::new( let actor = Luz::new(
command_sender.clone(),
command_receiver, command_receiver,
Arc::new(Mutex::new(jid)), Arc::new(Mutex::new(jid)),
password, password,
@ -280,10 +476,12 @@ pub enum CommandMessage {
/// disconnect from XMPP chat server, sending unavailable presence then closing stream. /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect(Offline), Disconnect(Offline),
/// get the roster. if offline, retreive cached version from database. should be stored in application memory /// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, Error>>), GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering // TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>), GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, Error>>),
/// get message history for chat (does appropriate mam things) /// get message history for chat (does appropriate mam things)
// TODO: paging and filtering // TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>), GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>),
@ -315,7 +513,7 @@ pub enum CommandMessage {
/// update contact /// update contact
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>), UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatusMessage(Option<String>, oneshot::Sender<Result<(), Error>>), SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send a directed presence (usually to a non-contact). /// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
@ -326,9 +524,10 @@ pub enum CommandMessage {
#[derive(Debug)] #[derive(Debug)]
pub enum UpdateMessage { pub enum UpdateMessage {
Error(Error), Error(Error),
Online(Online), Online(Online, Vec<Contact>),
Offline(Offline), Offline(Offline),
/// received roster from jabber server (replace full app roster state with this) /// received roster from jabber server (replace full app roster state with this)
/// is this needed?
FullRoster(Vec<Contact>), FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace) /// (only update app roster state, don't replace)
RosterUpdate(Contact), RosterUpdate(Contact),

View File

@ -18,6 +18,5 @@ async fn main() {
}); });
luz.send(CommandMessage::Connect).await.unwrap(); luz.send(CommandMessage::Connect).await.unwrap();
luz.send(CommandMessage::GetRoster).await.unwrap();
tokio::time::sleep(Duration::from_secs(15)).await; tokio::time::sleep(Duration::from_secs(15)).await;
} }

View File

@ -1,12 +1,57 @@
use stanza::client::presence::Show; use sqlx::Sqlite;
#[derive(Debug, Default)] #[derive(Debug, Default, sqlx::FromRow, Clone)]
pub struct Online { pub struct Online {
show: Option<Show>, pub show: Option<Show>,
status: Option<String>, pub status: Option<String>,
#[sqlx(skip)]
priority: Option<i8>, priority: Option<i8>,
} }
#[derive(Debug, Clone, Copy)]
pub enum Show {
Away,
Chat,
DoNotDisturb,
ExtendedAway,
}
impl sqlx::Type<Sqlite> for Show {
fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
<&str as sqlx::Type<Sqlite>>::type_info()
}
}
impl sqlx::Decode<'_, Sqlite> for Show {
fn decode(
value: <Sqlite as sqlx::Database>::ValueRef<'_>,
) -> Result<Self, sqlx::error::BoxDynError> {
let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
match value {
"away" => Ok(Self::Away),
"chat" => Ok(Self::Chat),
"do-not-disturb" => Ok(Self::DoNotDisturb),
"extended-away" => Ok(Self::ExtendedAway),
_ => unreachable!(),
}
}
}
impl sqlx::Encode<'_, Sqlite> for Show {
fn encode_by_ref(
&self,
buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
let value = match self {
Show::Away => "away",
Show::Chat => "chat",
Show::DoNotDisturb => "do-not-disturb",
Show::ExtendedAway => "extended-away",
};
<&str as sqlx::Encode<Sqlite>>::encode(value, buf)
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Offline { pub struct Offline {
status: Option<String>, status: Option<String>,

View File

@ -24,7 +24,7 @@ pub struct Contact {
} }
#[derive(Debug)] #[derive(Debug)]
enum Subscription { pub enum Subscription {
None, None,
PendingOut, PendingOut,
PendingIn, PendingIn,
@ -80,3 +80,46 @@ impl sqlx::Encode<'_, Sqlite> for Subscription {
<&str as sqlx::Encode<Sqlite>>::encode(value, buf) <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
} }
} }
// none
// >
// >>
// <
// <<
// ><
// >><
// ><<
// >><<
impl From<stanza::roster::Item> for Contact {
fn from(value: stanza::roster::Item) -> Self {
let subscription = match value.ask {
true => match value.subscription {
Some(s) => match s {
stanza::roster::Subscription::Both => Subscription::Buddy,
stanza::roster::Subscription::From => Subscription::InPendingOut,
stanza::roster::Subscription::None => Subscription::PendingOut,
stanza::roster::Subscription::Remove => Subscription::PendingOut,
stanza::roster::Subscription::To => Subscription::OnlyOut,
},
None => Subscription::PendingOut,
},
false => match value.subscription {
Some(s) => match s {
stanza::roster::Subscription::Both => Subscription::Buddy,
stanza::roster::Subscription::From => Subscription::OnlyIn,
stanza::roster::Subscription::None => Subscription::None,
stanza::roster::Subscription::Remove => Subscription::None,
stanza::roster::Subscription::To => Subscription::OnlyOut,
},
None => Subscription::None,
},
};
Contact {
user_jid: value.jid,
subscription,
name: value.name,
groups: HashSet::from_iter(value.groups.into_iter().filter_map(|group| group.0)),
}
}
}

View File

@ -37,16 +37,16 @@ impl IntoElement for Query {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Item { pub struct Item {
/// signals subscription pre-approval (server only) /// signals subscription pre-approval (server only)
approved: Option<bool>, pub approved: Option<bool>,
/// signals subscription sub-states (server only) /// signals subscription sub-states (server only)
ask: bool, pub ask: bool,
/// uniquely identifies item /// uniquely identifies item
jid: JID, pub jid: JID,
/// handle that is determined by user, not contact /// handle that is determined by user, not contact
name: Option<String>, pub name: Option<String>,
/// state of the presence subscription /// state of the presence subscription
subscription: Option<Subscription>, pub subscription: Option<Subscription>,
groups: Vec<Group>, pub groups: Vec<Group>,
} }
impl FromElement for Item { impl FromElement for Item {
@ -140,7 +140,7 @@ impl ToString for Subscription {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
// TODO: check if should be option or not // TODO: check if should be option or not
pub struct Group(Option<String>); pub struct Group(pub Option<String>);
impl FromElement for Group { impl FromElement for Group {
fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> { fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> {