diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql index 028ae24..7b33dd3 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/luz/migrations/20240113011930_luz.sql @@ -3,12 +3,37 @@ PRAGMA foreign_keys = on; -- a user jid will never change, only a chat user will change -- TODO: avatar, nick, etc. create table users( + -- TODO: enforce bare jid jid text primary key not null, -- can receive presence status from non-contacts cached_status_message text -- TODO: last_seen ); +-- -- links to messages, jabber users, stores jid history, etc. +-- create table identities( +-- id text primary key not null +-- ); + +-- create table identities_users( +-- id text not null, +-- jid text not null, +-- -- whichever has the newest timestamp is the active one. +-- -- what to do when somebody moves, but then the old jid is used again without having explicitly moved back? create new identity to assign ownership to? +-- -- merging of identities? +-- activated_timestamp not null, +-- foreign key(id) references identities(id), +-- foreign key(jid) references users(jid), +-- primary key(activated timestamp, id, jid) +-- ); + +create table resources( + bare_jid text not null, + resource text not null, + foreign key(bare_jid) references users(jid), + primary key(bare_jid, resource) +); + -- enum for subscription state create table subscription( state text primary key not null @@ -61,15 +86,19 @@ create table messages ( -- TODO: icky -- the user to show it coming from (not necessarily the original sender) + -- from_identity text not null, + -- original sender details (only from jabber supported for now) from_jid text not null, - originally_from text not null, + -- resource can be null + from_resource text, -- check (from_jid != original_sender), -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user) -- TODO: read bool not null, foreign key(chat_id) references chats(id) on delete cascade, + -- foreign key(from_identity) references identities(id), foreign key(from_jid) references users(jid), - foreign key(originally_from) references users(jid) + foreign key(from_jid, from_resource) references resources(bare_jid, resource) ); -- enum for subscription state diff --git a/luz/src/chat.rs b/luz/src/chat.rs index 4fb8579..7bb99e1 100644 --- a/luz/src/chat.rs +++ b/luz/src/chat.rs @@ -28,7 +28,7 @@ pub struct Body { #[derive(sqlx::FromRow)] pub struct Chat { - correspondent: JID, + pub correspondent: JID, // message history is not stored in chat, retreived separately. // pub message_history: Vec, } diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index fda2b90..95aae1a 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -15,6 +15,7 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; +use tracing::info; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; use crate::{ @@ -104,8 +105,10 @@ impl Supervisor { Some(msg) = self.connection_commands.recv() => { match msg { SupervisorCommand::Disconnect => { + info!("disconnecting"); let _ = self.writer_handle.send(WriteControl::Disconnect).await; let _ = self.reader_handle.send(ReadControl::Disconnect).await; + info!("sent disconnect command"); tokio::select! { _ = async { tokio::join!( async { let _ = (&mut self.writer_handle.handle).await; }, @@ -116,6 +119,7 @@ impl Supervisor { (&mut self.writer_handle.handle).abort(); } } + info!("disconnected"); break; }, SupervisorCommand::Reconnect(state) => { diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 46f1dc9..4390e00 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -202,9 +202,10 @@ async fn handle_stanza( }; // TODO: can this be more efficient? let result = db - .create_message_with_user_and_chat(message.clone(), from.clone()) + .create_message_with_user_resource_and_chat(message.clone(), from.clone()) .await; if let Err(e) = result { + tracing::error!("messagecreate"); let _ = update_sender .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) .await; diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs index 4202163..3a1d73d 100644 --- a/luz/src/db/mod.rs +++ b/luz/src/db/mod.rs @@ -213,14 +213,14 @@ impl Db { pub async fn replace_cached_roster(&self, roster: Vec) -> Result<(), Error> { sqlx::query!("delete from roster").execute(&self.db).await?; for contact in roster { - self.create_contact(contact).await?; + self.upsert_contact(contact).await?; } Ok(()) } pub async fn read_cached_roster(&self) -> Result, Error> { let mut roster: Vec = - sqlx::query_as("select * from roster full outer join users on jid = user_jid") + sqlx::query_as("select * from roster join users on jid = user_jid") .fetch_all(&self.db) .await?; for contact in &mut roster { @@ -303,6 +303,7 @@ impl Db { struct Row { id: Uuid, } + let chat = chat.as_bare(); let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?") .bind(chat) .fetch_one(&self.db) @@ -327,19 +328,24 @@ impl Db { /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> { // TODO: one query + let bare_jid = message.from.as_bare(); + let resource = message.from.resourcepart; let chat_id = self.read_chat_id(chat).await?; - sqlx::query!("insert into messages (id, body, chat_id, from_jid, originally_from) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, message.from, message.from).execute(&self.db).await?; + sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource).execute(&self.db).await?; Ok(()) } - pub async fn create_message_with_user_and_chat( + // create direct message + pub async fn create_message_with_user_resource_and_chat( &self, message: Message, chat: JID, ) -> Result<(), Error> { + let bare_chat = chat.as_bare(); + let resource = &chat.resourcepart; sqlx::query!( "insert into users (jid) values (?) on conflict do nothing", - chat + bare_chat ) .execute(&self.db) .await?; @@ -347,10 +353,19 @@ impl Db { sqlx::query!( "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", id, - chat + bare_chat ) .execute(&self.db) .await?; + if let Some(resource) = resource { + sqlx::query!( + "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", + bare_chat, + resource + ) + .execute(&self.db) + .await?; + } self.create_message(message, chat).await?; Ok(()) } diff --git a/luz/src/error.rs b/luz/src/error.rs index 4fdce79..f0b956e 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -18,6 +18,14 @@ pub enum Error { CacheUpdate(Reason), UnrecognizedContent(peanuts::element::Content), Iq(IqError), + Cloned, +} + +// TODO: this is horrifying, maybe just use tracing to forward error events??? +impl Clone for Error { + fn clone(&self) -> Self { + Error::Cloned + } } #[derive(Debug)] @@ -37,7 +45,7 @@ pub enum RecvMessageError { MissingFrom, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ConnectionError { ConnectionFailed(Reason), RosterRetreival(Reason), @@ -45,6 +53,7 @@ pub enum ConnectionError { NoCachedStatus(Reason), } +#[derive(Debug)] pub struct RosterError(pub Reason); impl From for Error { @@ -88,6 +97,14 @@ pub enum Reason { UnexpectedStanza(Stanza), Disconnected, ChannelSend, + Cloned, +} + +// TODO: same here +impl Clone for Reason { + fn clone(&self) -> Self { + Reason::Cloned + } } impl From for Reason { diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 901553b..4c95ab6 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -32,9 +32,9 @@ pub mod chat; mod connection; mod db; mod error; -mod presence; -mod roster; -mod user; +pub mod presence; +pub mod roster; +pub mod user; pub struct Luz { command_sender: mpsc::Sender, @@ -82,7 +82,9 @@ impl Luz { loop { let msg = tokio::select! { // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy + // THIS IS NOT OKAY LOLLLL _ = &mut self.connection_supervisor_shutdown => { + info!("got this"); *self.connected.lock().await = None; continue; } @@ -247,11 +249,13 @@ impl Luz { // TODO: send unavailable presence if let Some((_write_handle, supervisor_handle)) = c.take() { let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + c = None; } else { unreachable!() }; } } + info!("lock released") } _ => { match self.connected.lock().await.as_ref() { @@ -962,10 +966,19 @@ impl CommandMessage { // TODO: separate sender and receiver, store handle to Luz process to ensure dropping // #[derive(Clone)] +#[derive(Debug)] pub struct LuzHandle { sender: mpsc::Sender, } +impl Clone for LuzHandle { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + } + } +} + impl Deref for LuzHandle { type Target = mpsc::Sender; @@ -981,11 +994,12 @@ impl DerefMut for LuzHandle { } impl LuzHandle { - pub fn new( + pub async fn new( jid: JID, password: String, - db: SqlitePool, - ) -> (Self, mpsc::Receiver) { + db: &str, + ) -> Result<(Self, mpsc::Receiver), Reason> { + let db = SqlitePool::connect(db).await?; let (command_sender, command_receiver) = mpsc::channel(20); let (update_sender, update_receiver) = mpsc::channel(20); // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) @@ -1003,12 +1017,12 @@ impl LuzHandle { ); tokio::spawn(async move { actor.run().await }); - ( + Ok(( Self { sender: command_sender, }, update_receiver, - ) + )) } } @@ -1064,7 +1078,7 @@ pub enum CommandMessage { SendMessage(JID, Body, oneshot::Sender>), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum UpdateMessage { Error(Error), Online(Online, Vec), diff --git a/luz/src/presence.rs b/luz/src/presence.rs index 1df20a7..40d79c5 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -4,6 +4,7 @@ use stanza::client::presence::String1024; #[derive(Debug, Default, sqlx::FromRow, Clone)] pub struct Online { pub show: Option, + #[sqlx(rename = "message")] pub status: Option, #[sqlx(skip)] pub priority: Option, @@ -53,12 +54,12 @@ impl sqlx::Encode<'_, Sqlite> for Show { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct Offline { pub status: Option, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Presence { Online(Online), Offline(Offline), diff --git a/luz/src/roster.rs b/luz/src/roster.rs index 0e43a8a..43c32f5 100644 --- a/luz/src/roster.rs +++ b/luz/src/roster.rs @@ -58,7 +58,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription { "out-pending-in" => Ok(Self::OutPendingIn), "in-pending-out" => Ok(Self::InPendingOut), "buddy" => Ok(Self::Buddy), - _ => unreachable!(), + _ => panic!("unexpected subscription `{value}`"), } } }