database work

This commit is contained in:
cel 🌸 2025-02-18 01:01:17 +00:00
parent 0d9e3d27e9
commit 68a7d13670
12 changed files with 525 additions and 96 deletions

4
.helix/languages.toml Normal file
View File

@ -0,0 +1,4 @@
[language-server.rust-analyzer]
command = "rust-analyzer"
environment = { "DATABASE_URL" = "sqlite://luz/luz.db" }
config = { cargo.features = "all" }

View File

@ -4,3 +4,4 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
sqlx = { version = "0.8.3", features = ["sqlite"] }

View File

@ -1,6 +1,8 @@
use std::str::FromStr; use std::{error::Error, fmt::Display, str::FromStr};
#[derive(PartialEq, Debug, Clone)] use sqlx::Sqlite;
#[derive(PartialEq, Debug, Clone, sqlx::Type, sqlx::Encode)]
pub struct JID { pub struct JID {
// TODO: validate localpart (length, char] // TODO: validate localpart (length, char]
pub localpart: Option<String>, pub localpart: Option<String>,
@ -8,6 +10,33 @@ pub struct JID {
pub resourcepart: Option<String>, pub resourcepart: Option<String>,
} }
// TODO: feature gate
impl sqlx::Type<Sqlite> for JID {
fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
<&str as sqlx::Type<Sqlite>>::type_info()
}
}
impl sqlx::Decode<'_, Sqlite> for JID {
fn decode(
value: <Sqlite as sqlx::Database>::ValueRef<'_>,
) -> Result<Self, sqlx::error::BoxDynError> {
let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
Ok(value.parse()?)
}
}
impl sqlx::Encode<'_, Sqlite> for JID {
fn encode_by_ref(
&self,
buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
let jid = self.to_string();
<String as sqlx::Encode<Sqlite>>::encode(jid, buf)
}
}
pub enum JIDError { pub enum JIDError {
NoResourcePart, NoResourcePart,
ParseError(ParseError), ParseError(ParseError),
@ -19,6 +48,19 @@ pub enum ParseError {
Malformed(String), Malformed(String),
} }
impl Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ParseError::Empty => f.write_str("JID parse error: Empty"),
ParseError::Malformed(j) => {
f.write_str(format!("JID parse error: malformed; got '{}'", j).as_str())
}
}
}
}
impl Error for ParseError {}
impl JID { impl JID {
pub fn new( pub fn new(
localpart: Option<String>, localpart: Option<String>,

View File

@ -8,11 +8,11 @@ futures = "0.3.31"
jabber = { version = "0.1.0", path = "../jabber" } jabber = { version = "0.1.0", path = "../jabber" }
peanuts = { version = "0.1.0", path = "../../peanuts" } peanuts = { version = "0.1.0", path = "../../peanuts" }
jid = { version = "0.1.0", path = "../jid" } jid = { version = "0.1.0", path = "../jid" }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio"] } sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid"] }
stanza = { version = "0.1.0", path = "../stanza" } stanza = { version = "0.1.0", path = "../stanza" }
tokio = "1.42.0" tokio = "1.42.0"
tokio-stream = "0.1.17" tokio-stream = "0.1.17"
tokio-util = "0.7.13" tokio-util = "0.7.13"
tracing = "0.1.41" tracing = "0.1.41"
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
uuid = "1.13.1" uuid = { version = "1.13.1", features = ["v4"] }

View File

@ -3,53 +3,53 @@ PRAGMA foreign_keys = on;
-- a user jid will never change, only a chat user will change -- a user jid will never change, only a chat user will change
-- TODO: avatar, nick, etc. -- TODO: avatar, nick, etc.
create table users( create table users(
jid jid primary key, jid text primary key not null,
-- can receive presence status from non-contacts -- can receive presence status from non-contacts
cached_status_message text cached_status_message text
); );
-- enum for subscription state -- enum for subscription state
create table subscription( create table subscription(
state text primary key state text primary key not null
); );
insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy'); insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');
-- a roster contains users, with client-set nickname -- a roster contains users, with client-set nickname
CREATE TABLE roster( CREATE TABLE roster(
jid jid primary key, user_jid text primary key not null,
name TEXT, name TEXT,
subscription text not null, subscription text not null,
foreign key(subscription) references subscription(state), foreign key(subscription) references subscription(state),
foreign key(jid) references users(jid) foreign key(user_jid) references users(jid)
); );
create table groups( create table groups(
group text primary key group_name text primary key not null
); );
create table groups_roster( create table groups_roster(
group_id text, group_name text not null,
contact_jid jid, contact_jid text not null,
foreign key(group_id) references group(id), foreign key(group_name) references groups(group_name),
foreign key(contact_jid) references roster(jid), foreign key(contact_jid) references roster(user_jid) on delete cascade,
primary key(group_id, contact_id) primary key(group_name, contact_jid)
); );
-- chat includes reference to user jid chat is with -- chat includes reference to user jid chat is with
-- specifically for dms, groups should be different -- specifically for dms, groups should be different
-- can send chat message to user (creating a new chat if not already exists) -- can send chat message to user (creating a new chat if not already exists)
create table chats ( create table chats (
id uuid primary key, id text primary key not null,
user_jid jid not null unique, correspondent text not null unique,
foreign key(user_jid) references users(jid) foreign key(correspondent) references users(jid)
); );
-- messages include reference to chat they are in, and who sent them. -- messages include reference to chat they are in, and who sent them.
create table messages ( create table messages (
id uuid primary key, id text primary key not null,
body text, body text,
chat_id uuid, chat_id text not null,
-- TODO: channel stuff -- TODO: channel stuff
-- channel_id uuid, -- channel_id uuid,
-- check ((chat_id == null) <> (channel_id == null)), -- check ((chat_id == null) <> (channel_id == null)),
@ -57,13 +57,14 @@ create table messages (
-- user is the current "owner" of the message -- user is the current "owner" of the message
-- TODO: icky -- TODO: icky
from_jid jid not null, -- the user to show it coming from (not necessarily the original sender)
originally_from jid not null, from_jid text not null,
check (from_jid != original_sender), originally_from text not null,
-- check (from_jid != original_sender),
-- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user) -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)
-- TODO: read bool not null, -- TODO: read bool not null,
foreign key(chat_id) references chats(id), foreign key(chat_id) references chats(id) on delete cascade,
foreign key(from_jid) references users(jid), foreign key(from_jid) references users(jid),
foreign key(originally_from) references users(jid) foreign key(originally_from) references users(jid)
); );

View File

@ -1,31 +1,45 @@
use jid::JID; use jid::JID;
use uuid::Uuid; use uuid::Uuid;
use crate::{roster::Contact, user::User}; #[derive(Debug, sqlx::FromRow)]
#[derive(Debug)]
pub struct Message { pub struct Message {
id: Uuid, pub id: Uuid,
// contains full user information // does not contain full user information
from: Correspondent, #[sqlx(rename = "from_jid")]
body: Body, pub from: JID,
// TODO: originally_from
// TODO: message edits
// TODO: message timestamp
#[sqlx(flatten)]
pub body: Body,
} }
#[derive(Debug)] // TODO: user migrations
// pub enum Migrated {
// Jabber(User),
// Outside,
// }
#[derive(Debug, sqlx::FromRow)]
pub struct Body { pub struct Body {
// TODO: rich text, other contents, threads // TODO: rich text, other contents, threads
body: String, pub body: String,
} }
#[derive(sqlx::FromRow)]
pub struct Chat { pub struct Chat {
correspondent: Correspondent, correspondent: JID,
message_history: Vec<Message>, // message history is not stored in chat, retreived separately.
// pub message_history: Vec<Message>,
} }
#[derive(Debug)] impl Chat {
pub enum Correspondent { pub fn new(correspondent: JID) -> Self {
User(User), Self { correspondent }
Contact(Contact), }
pub fn correspondent(&self) -> &JID {
&self.correspondent
}
} }
// TODO: group chats // TODO: group chats

View File

@ -10,7 +10,6 @@ use std::{
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use jid::JID; use jid::JID;
use read::{ReadControl, ReadControlHandle}; use read::{ReadControl, ReadControlHandle};
use sqlx::SqlitePool;
use stanza::client::Stanza; use stanza::client::Stanza;
use tokio::{ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
@ -18,7 +17,7 @@ use tokio::{
}; };
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
use crate::{error::Error, UpdateMessage}; use crate::{db::Db, error::Error, UpdateMessage};
mod read; mod read;
pub(crate) mod write; pub(crate) mod write;
@ -27,7 +26,7 @@ pub struct Supervisor {
connection_commands: mpsc::Receiver<SupervisorCommand>, connection_commands: mpsc::Receiver<SupervisorCommand>,
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>, writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
reader_crash: oneshot::Receiver<( reader_crash: oneshot::Receiver<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
tokio::task::JoinSet<()>, tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
@ -53,7 +52,7 @@ pub enum State {
Write(mpsc::Receiver<WriteMessage>), Write(mpsc::Receiver<WriteMessage>),
Read( Read(
( (
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
tokio::task::JoinSet<()>, tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
@ -68,7 +67,7 @@ impl Supervisor {
connection_commands: mpsc::Receiver<SupervisorCommand>, connection_commands: mpsc::Receiver<SupervisorCommand>,
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>, writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
reader_crash: oneshot::Receiver<( reader_crash: oneshot::Receiver<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
@ -328,7 +327,7 @@ impl SupervisorHandle {
pub fn new( pub fn new(
streams: BoundJabberStream<Tls>, streams: BoundJabberStream<Tls>,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
db: SqlitePool, db: Db,
on_shutdown: oneshot::Sender<()>, on_shutdown: oneshot::Sender<()>,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
password: Arc<String>, password: Arc<String>,

View File

@ -6,7 +6,6 @@ use std::{
}; };
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use sqlx::SqlitePool;
use stanza::client::Stanza; use stanza::client::Stanza;
use tokio::{ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
@ -14,7 +13,7 @@ use tokio::{
}; };
use tracing::info; use tracing::info;
use crate::{error::Error, UpdateMessage}; use crate::{db::Db, error::Error, UpdateMessage};
use super::{ use super::{
write::{WriteHandle, WriteMessage}, write::{WriteHandle, WriteMessage},
@ -25,14 +24,14 @@ pub struct Read {
control_receiver: mpsc::Receiver<ReadControl>, control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>, stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<( on_crash: oneshot::Sender<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: Db,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle, write_handle: WriteHandle,
@ -48,14 +47,14 @@ impl Read {
control_receiver: mpsc::Receiver<ReadControl>, control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>, stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<( on_crash: oneshot::Sender<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: Db,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
@ -167,7 +166,7 @@ impl Read {
async fn handle_stanza( async fn handle_stanza(
stanza: Stanza, stanza: Stanza,
update_sender: mpsc::Sender<UpdateMessage>, update_sender: mpsc::Sender<UpdateMessage>,
db: SqlitePool, db: Db,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle, write_handle: WriteHandle,
) { ) {
@ -178,7 +177,7 @@ pub enum ReadControl {
Disconnect, Disconnect,
Abort( Abort(
oneshot::Sender<( oneshot::Sender<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
@ -211,14 +210,14 @@ impl ReadControlHandle {
pub fn new( pub fn new(
stream: BoundJabberReader<Tls>, stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<( on_crash: oneshot::Sender<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle, jabber_write: WriteHandle,
@ -248,14 +247,14 @@ impl ReadControlHandle {
pub fn reconnect( pub fn reconnect(
stream: BoundJabberReader<Tls>, stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<( on_crash: oneshot::Sender<(
SqlitePool, Db,
mpsc::Sender<UpdateMessage>, mpsc::Sender<UpdateMessage>,
JoinSet<()>, JoinSet<()>,
mpsc::Sender<SupervisorCommand>, mpsc::Sender<SupervisorCommand>,
WriteHandle, WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
)>, )>,
db: SqlitePool, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle, jabber_write: WriteHandle,

318
luz/src/db/mod.rs Normal file
View File

@ -0,0 +1,318 @@
use std::collections::HashSet;
use jid::JID;
use sqlx::{Error, SqlitePool};
use uuid::Uuid;
use crate::{
chat::{Chat, Message},
roster::Contact,
user::User,
};
#[derive(Clone)]
pub struct Db {
db: SqlitePool,
}
impl Db {
pub fn new(db: SqlitePool) -> Self {
Self { db }
}
pub async fn create_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
"insert into users ( jid, cached_status_message ) values ( ?, ? )",
user.jid,
user.cached_status_message
)
.execute(&self.db)
.await?;
Ok(())
}
pub async fn read_user(&self, user: JID) -> Result<User, Error> {
let user: User = sqlx::query_as("select * from users where jid = ?")
.bind(user)
.fetch_one(&self.db)
.await?;
Ok(user)
}
pub async fn update_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
"update users set cached_status_message = ? where jid = ?",
user.cached_status_message,
user.jid
)
.execute(&self.db)
.await?;
Ok(())
}
// TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
// pub async fn delete_user(&self, user: JID) -> Result<(), Error> {}
/// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
pub async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
sqlx::query!(
"insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )",
contact.user_jid,
contact.name,
contact.subscription
)
.execute(&self.db)
.await?;
// TODO: abstract this out in to add_to_group() function ?
for group in contact.groups {
sqlx::query!(
"insert into groups (group_name) values (?) on conflict do nothing",
group
)
.execute(&self.db)
.await?;
sqlx::query!(
"insert into groups_roster (group_name, contact_jid) values (?, ?)",
group,
contact.user_jid
)
.execute(&self.db)
.await?;
}
Ok(())
}
pub async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?")
.bind(contact)
.fetch_one(&self.db)
.await?;
#[derive(sqlx::FromRow)]
struct Row {
group_name: String,
}
let groups: Vec<Row> =
sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
.bind(&contact.user_jid)
.fetch_all(&self.db)
.await?;
contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
Ok(contact)
}
pub async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
let contact: Option<Contact> = sqlx::query_as(
"select * from roster full outer join users on jid = user_jid where jid = ?",
)
.bind(contact)
.fetch_optional(&self.db)
.await?;
if let Some(mut contact) = contact {
#[derive(sqlx::FromRow)]
struct Row {
group_name: String,
}
let groups: Vec<Row> =
sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
.bind(&contact.user_jid)
.fetch_all(&self.db)
.await?;
contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
Ok(Some(contact))
} else {
Ok(None)
}
}
/// does not update the underlying user, to update user, update_user() must be called separately
pub async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
sqlx::query!(
"update roster set name = ?, subscription = ? where user_jid = ?",
contact.name,
contact.subscription,
contact.user_jid
)
.execute(&self.db)
.await?;
sqlx::query!(
"delete from groups_roster where contact_jid = ?",
contact.user_jid
)
.execute(&self.db)
.await?;
// TODO: delete orphaned groups from groups table
for group in contact.groups {
sqlx::query!(
"insert into groups (group_name) values (?) on conflict do nothing",
group
)
.execute(&self.db)
.await?;
sqlx::query!(
"insert into groups_roster (group_name, contact_jid) values (?, ?)",
group,
contact.user_jid
)
.execute(&self.db)
.await?;
}
Ok(())
}
pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
sqlx::query!("delete from roster where user_jid = ?", contact)
.execute(&self.db)
.await?;
// TODO: delete orphaned groups from groups table
Ok(())
}
pub async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
sqlx::query!("delete from roster").execute(&self.db).await?;
for contact in roster {
self.create_contact(contact).await?;
}
Ok(())
}
pub async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
let mut roster: Vec<Contact> =
sqlx::query_as("select * from roster full outer join users on jid = user_jid")
.fetch_all(&self.db)
.await?;
for contact in &mut roster {
#[derive(sqlx::FromRow)]
struct Row {
group_name: String,
}
let groups: Vec<Row> =
sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
.bind(&contact.user_jid)
.fetch_all(&self.db)
.await?;
contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
}
Ok(roster)
}
pub async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
let id = Uuid::new_v4();
let jid = chat.correspondent();
sqlx::query!(
"insert into chats (id, correspondent) values (?, ?)",
id,
jid
)
.execute(&self.db)
.await?;
Ok(())
}
// TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
pub async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
// check if the chat correponding with the jid exists
let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?")
.bind(chat)
.fetch_one(&self.db)
.await?;
Ok(chat)
}
pub async fn update_chat_correspondent(
&self,
old_chat: Chat,
new_correspondent: JID,
) -> Result<Chat, Error> {
// TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter)
let new_jid = &new_correspondent;
let old_jid = old_chat.correspondent();
sqlx::query!(
"update chats set correspondent = ? where correspondent = ?",
new_jid,
old_jid,
)
.execute(&self.db)
.await?;
let chat = self.read_chat(new_correspondent).await?;
Ok(chat)
}
// pub async fn update_chat
pub async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
sqlx::query!("delete from chats where correspondent = ?", chat)
.execute(&self.db)
.await?;
Ok(())
}
/// TODO: sorting and filtering (for now there is no sorting)
pub async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
let chats: Vec<Chat> = sqlx::query_as("select * from chats")
.fetch_all(&self.db)
.await?;
Ok(chats)
}
async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
#[derive(sqlx::FromRow)]
struct Row {
id: Uuid,
}
let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
.bind(chat)
.fetch_one(&self.db)
.await?;
let chat_id = chat_id.id;
Ok(chat_id)
}
async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
#[derive(sqlx::FromRow)]
struct Row {
id: Uuid,
}
let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?")
.bind(chat)
.fetch_optional(&self.db)
.await?;
let chat_id = chat_id.map(|row| row.id);
Ok(chat_id)
}
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
// TODO: one query
let chat_id = self.read_chat_id(chat).await?;
sqlx::query!("insert into messages (id, body, chat_id, from_jid, originally_from) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, message.from, message.from).execute(&self.db).await?;
Ok(())
}
pub async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
let message: Message = sqlx::query_as("select * from messages where id = ?")
.bind(message)
.fetch_one(&self.db)
.await?;
Ok(message)
}
// TODO: message updates/edits pub async fn update_message(&self, message: Message) -> Result<(), Error> {}
pub async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
sqlx::query!("delete from messages where id = ?", message)
.execute(&self.db)
.await?;
Ok(())
}
// TODO: paging
pub async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
let chat_id = self.read_chat_id(chat).await?;
let messages: Vec<Message> = sqlx::query_as("select * from messages where chat_id = ?")
.bind(chat_id)
.fetch_all(&self.db)
.await?;
Ok(messages)
}
}

View File

@ -6,6 +6,7 @@ use std::{
use chat::{Body, Chat, Message}; use chat::{Body, Chat, Message};
use connection::{write::WriteMessage, SupervisorSender}; use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
use jabber::JID; use jabber::JID;
use presence::{Offline, Online, Presence}; use presence::{Offline, Online, Presence};
use roster::{Contact, ContactUpdate}; use roster::{Contact, ContactUpdate};
@ -18,6 +19,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
task::JoinSet, task::JoinSet,
}; };
use user::User;
use uuid::Uuid; use uuid::Uuid;
use crate::connection::write::WriteHandle; use crate::connection::write::WriteHandle;
@ -26,6 +28,7 @@ use crate::error::Error;
mod chat; mod chat;
mod connection; mod connection;
mod db;
mod error; mod error;
mod presence; mod presence;
mod roster; mod roster;
@ -38,7 +41,7 @@ pub struct Luz {
password: Arc<String>, password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>, connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
db: SqlitePool, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
connection_supervisor_shutdown: oneshot::Receiver<()>, connection_supervisor_shutdown: oneshot::Receiver<()>,
@ -61,7 +64,7 @@ impl Luz {
jid, jid,
password: Arc::new(password), password: Arc::new(password),
connected, connected,
db, db: Db::new(db),
receiver, receiver,
sender, sender,
tasks: JoinSet::new(), tasks: JoinSet::new(),
@ -164,7 +167,7 @@ impl CommandMessage {
pub async fn handle_offline( pub async fn handle_offline(
mut self, mut self,
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
db: SqlitePool, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
) { ) {
todo!() todo!()
@ -176,7 +179,7 @@ impl CommandMessage {
supervisor_control: SupervisorSender, supervisor_control: SupervisorSender,
// TODO: jid could lose resource by the end // TODO: jid could lose resource by the end
jid: Arc<Mutex<JID>>, jid: Arc<Mutex<JID>>,
db: SqlitePool, db: Db,
sender: mpsc::Sender<UpdateMessage>, sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) { ) {
@ -277,41 +280,47 @@ pub enum CommandMessage {
/// disconnect from XMPP chat server, sending unavailable presence then closing stream. /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect(Offline), Disconnect(Offline),
/// get the roster. if offline, retreive cached version from database. should be stored in application memory /// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster, GetRoster(oneshot::Sender<Result<Vec<Contact>, Error>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering // TODO: paging and filtering
GetChats(oneshot::Sender<Vec<Chat>>), GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>),
/// get message history for chat (does appropriate mam things) /// get message history for chat (does appropriate mam things)
// TODO: paging and filtering // TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Vec<Message>>), GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), Error>>),
/// delete a message from your chat history
DeleteMessage(Uuid, oneshot::Sender<Result<(), Error>>),
/// get a user from your users database
GetUser(JID, oneshot::Sender<Result<User, Error>>),
/// add a contact to your roster, with a status of none, no subscriptions. /// add a contact to your roster, with a status of none, no subscriptions.
AddContact(JID), // TODO: for all these, consider returning with oneshot::Sender<Result<(), Error>>
AddContact(JID, oneshot::Sender<Result<(), Error>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
BuddyRequest(JID), BuddyRequest(JID, oneshot::Sender<Result<(), Error>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster. /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
SubscriptionRequest(JID), SubscriptionRequest(JID, oneshot::Sender<Result<(), Error>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
AcceptBuddyRequest(JID), AcceptBuddyRequest(JID, oneshot::Sender<Result<(), Error>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
AcceptSubscriptionRequest(JID), AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), Error>>),
/// unsubscribe to a contact, but don't remove their subscription. /// unsubscribe to a contact, but don't remove their subscription.
UnsubscribeFromContact(JID), UnsubscribeFromContact(JID, oneshot::Sender<Result<(), Error>>),
/// stop a contact from being subscribed, but stay subscribed to the contact. /// stop a contact from being subscribed, but stay subscribed to the contact.
UnsubscribeContact(JID), UnsubscribeContact(JID, oneshot::Sender<Result<(), Error>>),
/// remove subscriptions to and from contact, but keep in roster. /// remove subscriptions to and from contact, but keep in roster.
UnfriendContact(JID), UnfriendContact(JID, oneshot::Sender<Result<(), Error>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster. /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
DeleteContact(JID), DeleteContact(JID, oneshot::Sender<Result<(), Error>>),
/// update contact /// update contact
UpdateContact(JID, ContactUpdate), UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online), SetStatusMessage(Option<String>, oneshot::Sender<Result<(), Error>>),
/// send a directed presence (usually to a non-contact). /// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client. // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
// SendDirectedPresence(JID, Online),
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent. /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
SendMessage(JID, Body), SendMessage(JID, Body, oneshot::Sender<Result<(), Error>>),
} }
#[derive(Debug)] #[derive(Debug)]
@ -319,15 +328,17 @@ pub enum UpdateMessage {
Error(Error), Error(Error),
Online(Online), Online(Online),
Offline(Offline), Offline(Offline),
/// received roster (replace full app roster state with this) /// received roster from jabber server (replace full app roster state with this)
FullRoster(Vec<Contact>), FullRoster(Vec<Contact>),
/// (only update app roster state) /// (only update app roster state, don't replace)
RosterUpdate(Contact), RosterUpdate(Contact),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence { Presence {
from: JID, from: JID,
presence: Presence, presence: Presence,
}, },
MessageDispatched(Uuid), // TODO: receipts
// MessageDispatched(Uuid),
Message { Message {
to: JID, to: JID,
message: Message, message: Message,

View File

@ -1,9 +1,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use jid::JID; use jid::JID;
use uuid::Uuid; use sqlx::Sqlite;
use crate::user::User;
pub enum ContactUpdate { pub enum ContactUpdate {
Name(Option<String>), Name(Option<String>),
@ -11,21 +9,18 @@ pub enum ContactUpdate {
RemoveFromGroup(String), RemoveFromGroup(String),
} }
#[derive(Debug)] #[derive(Debug, sqlx::FromRow)]
pub struct Contact { pub struct Contact {
// jid is the id used to reference everything, but not the primary key // jid is the id used to reference everything, but not the primary key
user: User, pub user_jid: JID,
subscription: Subscription, pub subscription: Subscription,
/// client user defined name /// client user defined name
name: Option<String>, pub name: Option<String>,
// TODO: avatar, nickname // TODO: avatar, nickname
/// nickname picked by contact /// nickname picked by contact
// nickname: Option<String>, // nickname: Option<String>,
groups: HashSet<String>, #[sqlx(skip)]
} pub groups: HashSet<String>,
impl Contact {
pub fn new(user: User, name: Option<String>, )
} }
#[derive(Debug)] #[derive(Debug)]
@ -38,5 +33,50 @@ enum Subscription {
OutPendingIn, OutPendingIn,
InPendingOut, InPendingOut,
Buddy, Buddy,
Remove, // TODO: perhaps don't need, just emit event to remove contact
// Remove,
}
impl sqlx::Type<Sqlite> for Subscription {
fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
<&str as sqlx::Type<Sqlite>>::type_info()
}
}
impl sqlx::Decode<'_, Sqlite> for Subscription {
fn decode(
value: <Sqlite as sqlx::Database>::ValueRef<'_>,
) -> Result<Self, sqlx::error::BoxDynError> {
let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
match value {
"none" => Ok(Self::None),
"pending-out" => Ok(Self::PendingOut),
"pending-in" => Ok(Self::PendingIn),
"only-out" => Ok(Self::OnlyOut),
"only-in" => Ok(Self::OnlyIn),
"out-pending-in" => Ok(Self::OutPendingIn),
"in-pending-out" => Ok(Self::InPendingOut),
"buddy" => Ok(Self::Buddy),
_ => unreachable!(),
}
}
}
impl sqlx::Encode<'_, Sqlite> for Subscription {
fn encode_by_ref(
&self,
buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
let value = match self {
Subscription::None => "none",
Subscription::PendingOut => "pending-out",
Subscription::PendingIn => "pending-in",
Subscription::OnlyOut => "only-out",
Subscription::OnlyIn => "only-in",
Subscription::OutPendingIn => "out-pending-in",
Subscription::InPendingOut => "in-pending-out",
Subscription::Buddy => "buddy",
};
<&str as sqlx::Encode<Sqlite>>::encode(value, buf)
}
} }

View File

@ -1,7 +1,7 @@
use jid::JID; use jid::JID;
#[derive(Debug)] #[derive(Debug, sqlx::FromRow)]
pub struct User { pub struct User {
jid: JID, pub jid: JID,
cached_status: Option<String>, pub cached_status_message: Option<String>,
} }