This commit is contained in:
cel 🌸 2025-02-24 08:41:58 +00:00
parent 2e6ad369c5
commit 66e37108cd
9 changed files with 104 additions and 23 deletions

View File

@ -3,12 +3,37 @@ PRAGMA foreign_keys = on;
-- a user jid will never change, only a chat user will change -- a user jid will never change, only a chat user will change
-- TODO: avatar, nick, etc. -- TODO: avatar, nick, etc.
create table users( create table users(
-- TODO: enforce bare jid
jid text primary key not null, jid text primary key not null,
-- can receive presence status from non-contacts -- can receive presence status from non-contacts
cached_status_message text cached_status_message text
-- TODO: last_seen -- TODO: last_seen
); );
-- -- links to messages, jabber users, stores jid history, etc.
-- create table identities(
-- id text primary key not null
-- );
-- create table identities_users(
-- id text not null,
-- jid text not null,
-- -- whichever has the newest timestamp is the active one.
-- -- what to do when somebody moves, but then the old jid is used again without having explicitly moved back? create new identity to assign ownership to?
-- -- merging of identities?
-- activated_timestamp not null,
-- foreign key(id) references identities(id),
-- foreign key(jid) references users(jid),
-- primary key(activated timestamp, id, jid)
-- );
create table resources(
bare_jid text not null,
resource text not null,
foreign key(bare_jid) references users(jid),
primary key(bare_jid, resource)
);
-- enum for subscription state -- enum for subscription state
create table subscription( create table subscription(
state text primary key not null state text primary key not null
@ -61,15 +86,19 @@ create table messages (
-- TODO: icky -- TODO: icky
-- the user to show it coming from (not necessarily the original sender) -- the user to show it coming from (not necessarily the original sender)
-- from_identity text not null,
-- original sender details (only from jabber supported for now)
from_jid text not null, from_jid text not null,
originally_from text not null, -- resource can be null
from_resource text,
-- check (from_jid != original_sender), -- check (from_jid != original_sender),
-- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user) -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)
-- TODO: read bool not null, -- TODO: read bool not null,
foreign key(chat_id) references chats(id) on delete cascade, foreign key(chat_id) references chats(id) on delete cascade,
-- foreign key(from_identity) references identities(id),
foreign key(from_jid) references users(jid), foreign key(from_jid) references users(jid),
foreign key(originally_from) references users(jid) foreign key(from_jid, from_resource) references resources(bare_jid, resource)
); );
-- enum for subscription state -- enum for subscription state

View File

@ -28,7 +28,7 @@ pub struct Body {
#[derive(sqlx::FromRow)] #[derive(sqlx::FromRow)]
pub struct Chat { pub struct Chat {
correspondent: JID, pub correspondent: JID,
// message history is not stored in chat, retreived separately. // message history is not stored in chat, retreived separately.
// pub message_history: Vec<Message>, // pub message_history: Vec<Message>,
} }

View File

@ -15,6 +15,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet}, task::{JoinHandle, JoinSet},
}; };
use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
use crate::{ use crate::{
@ -104,8 +105,10 @@ impl Supervisor {
Some(msg) = self.connection_commands.recv() => { Some(msg) = self.connection_commands.recv() => {
match msg { match msg {
SupervisorCommand::Disconnect => { SupervisorCommand::Disconnect => {
info!("disconnecting");
let _ = self.writer_handle.send(WriteControl::Disconnect).await; let _ = self.writer_handle.send(WriteControl::Disconnect).await;
let _ = self.reader_handle.send(ReadControl::Disconnect).await; let _ = self.reader_handle.send(ReadControl::Disconnect).await;
info!("sent disconnect command");
tokio::select! { tokio::select! {
_ = async { tokio::join!( _ = async { tokio::join!(
async { let _ = (&mut self.writer_handle.handle).await; }, async { let _ = (&mut self.writer_handle.handle).await; },
@ -116,6 +119,7 @@ impl Supervisor {
(&mut self.writer_handle.handle).abort(); (&mut self.writer_handle.handle).abort();
} }
} }
info!("disconnected");
break; break;
}, },
SupervisorCommand::Reconnect(state) => { SupervisorCommand::Reconnect(state) => {

View File

@ -202,9 +202,10 @@ async fn handle_stanza(
}; };
// TODO: can this be more efficient? // TODO: can this be more efficient?
let result = db let result = db
.create_message_with_user_and_chat(message.clone(), from.clone()) .create_message_with_user_resource_and_chat(message.clone(), from.clone())
.await; .await;
if let Err(e) = result { if let Err(e) = result {
tracing::error!("messagecreate");
let _ = update_sender let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
.await; .await;

View File

@ -213,14 +213,14 @@ impl Db {
pub async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { pub async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
sqlx::query!("delete from roster").execute(&self.db).await?; sqlx::query!("delete from roster").execute(&self.db).await?;
for contact in roster { for contact in roster {
self.create_contact(contact).await?; self.upsert_contact(contact).await?;
} }
Ok(()) Ok(())
} }
pub async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { pub async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
let mut roster: Vec<Contact> = let mut roster: Vec<Contact> =
sqlx::query_as("select * from roster full outer join users on jid = user_jid") sqlx::query_as("select * from roster join users on jid = user_jid")
.fetch_all(&self.db) .fetch_all(&self.db)
.await?; .await?;
for contact in &mut roster { for contact in &mut roster {
@ -303,6 +303,7 @@ impl Db {
struct Row { struct Row {
id: Uuid, id: Uuid,
} }
let chat = chat.as_bare();
let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?") let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
.bind(chat) .bind(chat)
.fetch_one(&self.db) .fetch_one(&self.db)
@ -327,19 +328,24 @@ impl Db {
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> { pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
// TODO: one query // TODO: one query
let bare_jid = message.from.as_bare();
let resource = message.from.resourcepart;
let chat_id = self.read_chat_id(chat).await?; let chat_id = self.read_chat_id(chat).await?;
sqlx::query!("insert into messages (id, body, chat_id, from_jid, originally_from) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, message.from, message.from).execute(&self.db).await?; sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource).execute(&self.db).await?;
Ok(()) Ok(())
} }
pub async fn create_message_with_user_and_chat( // create direct message
pub async fn create_message_with_user_resource_and_chat(
&self, &self,
message: Message, message: Message,
chat: JID, chat: JID,
) -> Result<(), Error> { ) -> Result<(), Error> {
let bare_chat = chat.as_bare();
let resource = &chat.resourcepart;
sqlx::query!( sqlx::query!(
"insert into users (jid) values (?) on conflict do nothing", "insert into users (jid) values (?) on conflict do nothing",
chat bare_chat
) )
.execute(&self.db) .execute(&self.db)
.await?; .await?;
@ -347,10 +353,19 @@ impl Db {
sqlx::query!( sqlx::query!(
"insert into chats (id, correspondent) values (?, ?) on conflict do nothing", "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
id, id,
chat bare_chat
) )
.execute(&self.db) .execute(&self.db)
.await?; .await?;
if let Some(resource) = resource {
sqlx::query!(
"insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
bare_chat,
resource
)
.execute(&self.db)
.await?;
}
self.create_message(message, chat).await?; self.create_message(message, chat).await?;
Ok(()) Ok(())
} }

View File

@ -18,6 +18,14 @@ pub enum Error {
CacheUpdate(Reason), CacheUpdate(Reason),
UnrecognizedContent(peanuts::element::Content), UnrecognizedContent(peanuts::element::Content),
Iq(IqError), Iq(IqError),
Cloned,
}
// TODO: this is horrifying, maybe just use tracing to forward error events???
impl Clone for Error {
fn clone(&self) -> Self {
Error::Cloned
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -37,7 +45,7 @@ pub enum RecvMessageError {
MissingFrom, MissingFrom,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum ConnectionError { pub enum ConnectionError {
ConnectionFailed(Reason), ConnectionFailed(Reason),
RosterRetreival(Reason), RosterRetreival(Reason),
@ -45,6 +53,7 @@ pub enum ConnectionError {
NoCachedStatus(Reason), NoCachedStatus(Reason),
} }
#[derive(Debug)]
pub struct RosterError(pub Reason); pub struct RosterError(pub Reason);
impl From<RosterError> for Error { impl From<RosterError> for Error {
@ -88,6 +97,14 @@ pub enum Reason {
UnexpectedStanza(Stanza), UnexpectedStanza(Stanza),
Disconnected, Disconnected,
ChannelSend, ChannelSend,
Cloned,
}
// TODO: same here
impl Clone for Reason {
fn clone(&self) -> Self {
Reason::Cloned
}
} }
impl From<oneshot::error::RecvError> for Reason { impl From<oneshot::error::RecvError> for Reason {

View File

@ -32,9 +32,9 @@ pub mod chat;
mod connection; mod connection;
mod db; mod db;
mod error; mod error;
mod presence; pub mod presence;
mod roster; pub mod roster;
mod user; pub mod user;
pub struct Luz { pub struct Luz {
command_sender: mpsc::Sender<CommandMessage>, command_sender: mpsc::Sender<CommandMessage>,
@ -82,7 +82,9 @@ impl Luz {
loop { loop {
let msg = tokio::select! { let msg = tokio::select! {
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
// THIS IS NOT OKAY LOLLLL
_ = &mut self.connection_supervisor_shutdown => { _ = &mut self.connection_supervisor_shutdown => {
info!("got this");
*self.connected.lock().await = None; *self.connected.lock().await = None;
continue; continue;
} }
@ -247,11 +249,13 @@ impl Luz {
// TODO: send unavailable presence // TODO: send unavailable presence
if let Some((_write_handle, supervisor_handle)) = c.take() { if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
c = None;
} else { } else {
unreachable!() unreachable!()
}; };
} }
} }
info!("lock released")
} }
_ => { _ => {
match self.connected.lock().await.as_ref() { match self.connected.lock().await.as_ref() {
@ -962,10 +966,19 @@ impl CommandMessage {
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping // TODO: separate sender and receiver, store handle to Luz process to ensure dropping
// #[derive(Clone)] // #[derive(Clone)]
#[derive(Debug)]
pub struct LuzHandle { pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>, sender: mpsc::Sender<CommandMessage>,
} }
impl Clone for LuzHandle {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl Deref for LuzHandle { impl Deref for LuzHandle {
type Target = mpsc::Sender<CommandMessage>; type Target = mpsc::Sender<CommandMessage>;
@ -981,11 +994,12 @@ impl DerefMut for LuzHandle {
} }
impl LuzHandle { impl LuzHandle {
pub fn new( pub async fn new(
jid: JID, jid: JID,
password: String, password: String,
db: SqlitePool, db: &str,
) -> (Self, mpsc::Receiver<UpdateMessage>) { ) -> Result<(Self, mpsc::Receiver<UpdateMessage>), Reason> {
let db = SqlitePool::connect(db).await?;
let (command_sender, command_receiver) = mpsc::channel(20); let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20); let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting) // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
@ -1003,12 +1017,12 @@ impl LuzHandle {
); );
tokio::spawn(async move { actor.run().await }); tokio::spawn(async move { actor.run().await });
( Ok((
Self { Self {
sender: command_sender, sender: command_sender,
}, },
update_receiver, update_receiver,
) ))
} }
} }
@ -1064,7 +1078,7 @@ pub enum CommandMessage {
SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>), SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>),
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum UpdateMessage { pub enum UpdateMessage {
Error(Error), Error(Error),
Online(Online, Vec<Contact>), Online(Online, Vec<Contact>),

View File

@ -4,6 +4,7 @@ use stanza::client::presence::String1024;
#[derive(Debug, Default, sqlx::FromRow, Clone)] #[derive(Debug, Default, sqlx::FromRow, Clone)]
pub struct Online { pub struct Online {
pub show: Option<Show>, pub show: Option<Show>,
#[sqlx(rename = "message")]
pub status: Option<String>, pub status: Option<String>,
#[sqlx(skip)] #[sqlx(skip)]
pub priority: Option<i8>, pub priority: Option<i8>,
@ -53,12 +54,12 @@ impl sqlx::Encode<'_, Sqlite> for Show {
} }
} }
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct Offline { pub struct Offline {
pub status: Option<String>, pub status: Option<String>,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum Presence { pub enum Presence {
Online(Online), Online(Online),
Offline(Offline), Offline(Offline),

View File

@ -58,7 +58,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription {
"out-pending-in" => Ok(Self::OutPendingIn), "out-pending-in" => Ok(Self::OutPendingIn),
"in-pending-out" => Ok(Self::InPendingOut), "in-pending-out" => Ok(Self::InPendingOut),
"buddy" => Ok(Self::Buddy), "buddy" => Ok(Self::Buddy),
_ => unreachable!(), _ => panic!("unexpected subscription `{value}`"),
} }
} }
} }