refactor(luz): create Db separate from LuzHandle

This commit is contained in:
cel 🌸 2025-03-01 09:12:32 +00:00
parent 357311cb93
commit 542a8e299c
4 changed files with 102 additions and 52 deletions

View File

@ -1,11 +1,12 @@
use std::collections::HashSet;
use std::{collections::HashSet, path::Path};
use jid::JID;
use sqlx::{Error, SqlitePool};
use sqlx::{migrate, Error, SqlitePool};
use uuid::Uuid;
use crate::{
chat::{Chat, Message},
error::{DatabaseError, DatabaseOpenError},
presence::Online,
roster::Contact,
user::User,
@ -16,12 +17,38 @@ pub struct Db {
db: SqlitePool,
}
// TODO: turn into trait
impl Db {
pub fn new(db: SqlitePool) -> Self {
pub async fn create_connect_and_migrate(
path: impl AsRef<Path>,
) -> Result<Self, DatabaseOpenError> {
if let Some(dir) = path.as_ref().parent() {
if dir.is_dir() {
} else {
tokio::fs::create_dir_all(dir).await?;
}
let _file = tokio::fs::OpenOptions::new()
.append(true)
.create(true)
.open(path.as_ref())
.await?;
}
let url = format!(
"sqlite://{}",
path.as_ref()
.to_str()
.ok_or(DatabaseOpenError::InvalidPath)?
);
let db = SqlitePool::connect(&url).await?;
migrate!().run(&db).await?;
Ok(Self { db })
}
pub(crate) fn new(db: SqlitePool) -> Self {
Self { db }
}
pub async fn create_user(&self, user: User) -> Result<(), Error> {
pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
"insert into users ( jid, cached_status_message ) values ( ?, ? )",
user.jid,
@ -32,7 +59,7 @@ impl Db {
Ok(())
}
pub async fn read_user(&self, user: JID) -> Result<User, Error> {
pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
let user: User = sqlx::query_as("select * from users where jid = ?")
.bind(user)
.fetch_one(&self.db)
@ -40,7 +67,7 @@ impl Db {
Ok(user)
}
pub async fn update_user(&self, user: User) -> Result<(), Error> {
pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
"update users set cached_status_message = ? where jid = ?",
user.cached_status_message,
@ -52,10 +79,10 @@ impl Db {
}
// TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
// pub async fn delete_user(&self, user: JID) -> Result<(), Error> {}
// pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}
/// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
pub async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
sqlx::query!(
"insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )",
contact.user_jid,
@ -83,7 +110,7 @@ impl Db {
Ok(())
}
pub async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?")
.bind(contact)
.fetch_one(&self.db)
@ -101,7 +128,7 @@ impl Db {
Ok(contact)
}
pub async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
let contact: Option<Contact> = sqlx::query_as(
"select * from roster full outer join users on jid = user_jid where jid = ?",
)
@ -126,7 +153,7 @@ impl Db {
}
/// does not update the underlying user, to update user, update_user() must be called separately
pub async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
sqlx::query!(
"update roster set name = ?, subscription = ? where user_jid = ?",
contact.name,
@ -160,7 +187,7 @@ impl Db {
Ok(())
}
pub async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
sqlx::query!(
"insert into users ( jid ) values ( ? ) on conflict do nothing",
contact.user_jid,
@ -202,7 +229,7 @@ impl Db {
Ok(())
}
pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
sqlx::query!("delete from roster where user_jid = ?", contact)
.execute(&self.db)
.await?;
@ -210,7 +237,7 @@ impl Db {
Ok(())
}
pub async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
sqlx::query!("delete from roster").execute(&self.db).await?;
for contact in roster {
self.upsert_contact(contact).await?;
@ -218,7 +245,7 @@ impl Db {
Ok(())
}
pub async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
let mut roster: Vec<Contact> =
sqlx::query_as("select * from roster join users on jid = user_jid")
.fetch_all(&self.db)
@ -238,7 +265,7 @@ impl Db {
Ok(roster)
}
pub async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
let id = Uuid::new_v4();
let jid = chat.correspondent();
sqlx::query!(
@ -253,7 +280,7 @@ impl Db {
// TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
pub async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
// check if the chat correponding with the jid exists
let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?")
.bind(chat)
@ -262,7 +289,7 @@ impl Db {
Ok(chat)
}
pub async fn update_chat_correspondent(
pub(crate) async fn update_chat_correspondent(
&self,
old_chat: Chat,
new_correspondent: JID,
@ -281,9 +308,9 @@ impl Db {
Ok(chat)
}
// pub async fn update_chat
// pub(crate) async fn update_chat
pub async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
sqlx::query!("delete from chats where correspondent = ?", chat)
.execute(&self.db)
.await?;
@ -291,7 +318,7 @@ impl Db {
}
/// TODO: sorting and filtering (for now there is no sorting)
pub async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
let chats: Vec<Chat> = sqlx::query_as("select * from chats")
.fetch_all(&self.db)
.await?;
@ -326,7 +353,7 @@ impl Db {
}
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
pub(crate) async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
// TODO: one query
let bare_jid = message.from.as_bare();
let resource = message.from.resourcepart;
@ -335,7 +362,7 @@ impl Db {
Ok(())
}
pub async fn create_message_with_self_resource_and_chat(
pub(crate) async fn create_message_with_self_resource_and_chat(
&self,
message: Message,
chat: JID,
@ -371,7 +398,7 @@ impl Db {
}
// create direct message from incoming
pub async fn create_message_with_user_resource_and_chat(
pub(crate) async fn create_message_with_user_resource_and_chat(
&self,
message: Message,
chat: JID,
@ -405,7 +432,7 @@ impl Db {
Ok(())
}
pub async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
let message: Message = sqlx::query_as("select * from messages where id = ?")
.bind(message)
.fetch_one(&self.db)
@ -413,9 +440,9 @@ impl Db {
Ok(message)
}
// TODO: message updates/edits pub async fn update_message(&self, message: Message) -> Result<(), Error> {}
// TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}
pub async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
sqlx::query!("delete from messages where id = ?", message)
.execute(&self.db)
.await?;
@ -423,7 +450,7 @@ impl Db {
}
// TODO: paging
pub async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
let chat_id = self.read_chat_id(chat).await?;
let messages: Vec<Message> = sqlx::query_as("select * from messages where chat_id = ?")
.bind(chat_id)
@ -432,14 +459,14 @@ impl Db {
Ok(messages)
}
pub async fn read_cached_status(&self) -> Result<Online, Error> {
pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
let online: Online = sqlx::query_as("select * from cached_status where id = 0")
.fetch_one(&self.db)
.await?;
Ok(online)
}
pub async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
sqlx::query!(
"insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
status.show,
@ -450,7 +477,7 @@ impl Db {
Ok(())
}
pub async fn delete_cached_status(&self) -> Result<(), Error> {
pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> {
sqlx::query!("update cached_status set show = null, message = null where id = 0")
.execute(&self.db)
.await?;

View File

@ -110,12 +110,42 @@ pub enum RosterError {
#[error("database error: {0}")]
pub struct DatabaseError(Arc<sqlx::Error>);
#[derive(Debug, Error, Clone)]
pub enum DatabaseOpenError {
#[error("error: {0}")]
Error(Arc<sqlx::Error>),
#[error("migration: {0}")]
Migration(Arc<sqlx::migrate::MigrateError>),
#[error("io: {0}")]
Io(Arc<tokio::io::Error>),
#[error("invalid path")]
InvalidPath,
}
impl From<sqlx::Error> for DatabaseError {
fn from(e: sqlx::Error) -> Self {
Self(Arc::new(e))
}
}
impl From<sqlx::Error> for DatabaseOpenError {
fn from(e: sqlx::Error) -> Self {
Self::Error(Arc::new(e))
}
}
impl From<sqlx::migrate::MigrateError> for DatabaseOpenError {
fn from(e: sqlx::migrate::MigrateError) -> Self {
Self::Migration(Arc::new(e))
}
}
impl From<tokio::io::Error> for DatabaseOpenError {
fn from(e: tokio::io::Error) -> Self {
Self::Io(Arc::new(e))
}
}
#[derive(Debug, Error, Clone)]
pub enum StatusError {
#[error("cache: {0}")]

View File

@ -36,8 +36,8 @@ use crate::error::Error;
pub mod chat;
mod connection;
mod db;
mod error;
pub mod db;
pub mod error;
pub mod presence;
pub mod roster;
pub mod user;
@ -67,14 +67,14 @@ impl Luz {
password: String,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
db: SqlitePool,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
) -> Self {
Self {
jid,
password: Arc::new(password),
connected,
db: Db::new(db),
db,
receiver,
sender,
tasks: JoinSet::new(),
@ -1068,12 +1068,7 @@ impl DerefMut for LuzHandle {
impl LuzHandle {
// TODO: database creation separate
pub async fn new(
jid: JID,
password: String,
db: &str,
) -> Result<(Self, mpsc::Receiver<UpdateMessage>), DatabaseError> {
let db = SqlitePool::connect(db).await?;
pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
@ -1092,14 +1087,14 @@ impl LuzHandle {
);
tokio::spawn(async move { actor.run().await });
Ok((
(
Self {
sender: command_sender,
// TODO: configure timeout
timeout: Duration::from_secs(10),
},
update_receiver,
))
)
}
pub async fn connect(&self) -> Result<(), ActorError> {

View File

@ -1,7 +1,7 @@
use std::{str::FromStr, time::Duration};
use std::{path::Path, str::FromStr, time::Duration};
use jid::JID;
use luz::{CommandMessage, LuzHandle};
use luz::{db::Db, CommandMessage, LuzHandle};
use sqlx::SqlitePool;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
@ -12,13 +12,11 @@ use tracing::info;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let (luz, mut recv) = LuzHandle::new(
"test@blos.sm".try_into().unwrap(),
"slayed".to_string(),
"./luz.db",
)
.await
.unwrap();
let db = Db::create_connect_and_migrate(Path::new("./luz.db"))
.await
.unwrap();
let (luz, mut recv) =
LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db).unwrap();
tokio::spawn(async move {
while let Some(msg) = recv.recv().await {