diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql index 082cc4b..028ae24 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/luz/migrations/20240113011930_luz.sql @@ -14,7 +14,7 @@ create table subscription( state text primary key not null ); -insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy'); +insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy'); -- a roster contains users, with client-set nickname CREATE TABLE roster( diff --git a/luz/src/chat.rs b/luz/src/chat.rs index ff76ce1..4fb8579 100644 --- a/luz/src/chat.rs +++ b/luz/src/chat.rs @@ -1,7 +1,7 @@ use jid::JID; use uuid::Uuid; -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::FromRow, Clone)] pub struct Message { pub id: Uuid, // does not contain full user information @@ -20,7 +20,7 @@ pub struct Message { // Outside, // } -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::FromRow, Clone)] pub struct Body { // TODO: rich text, other contents, threads pub body: String, diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 8f8c4a0..46f1dc9 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, ops::{Deref, DerefMut}, + str::FromStr, sync::Arc, time::Duration, }; @@ -12,10 +13,14 @@ use tokio::{ task::{JoinHandle, JoinSet}, }; use tracing::info; +use uuid::Uuid; use crate::{ + chat::{Body, Message}, db::Db, - error::{Error, Reason}, + error::{Error, IqError, PresenceError, Reason, RecvMessageError}, + presence::{Offline, Online, Presence, Show}, + roster::Contact, UpdateMessage, }; @@ -116,7 +121,7 @@ impl Read { println!("read stanza"); match s { Ok(s) => { - self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); + self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone())); }, Err(e) => { println!("error: {:?}", e); @@ -173,8 +178,197 @@ async fn handle_stanza( db: Db, supervisor_control: mpsc::Sender, write_handle: WriteHandle, + pending_iqs: Arc>>>>, ) { - println!("{:?}", stanza) + match stanza { + Stanza::Message(stanza_message) => { + if let Some(from) = stanza_message.from { + // TODO: group chat messages + let message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + let result = db + .create_message_with_user_and_chat(message.clone(), from.clone()) + .await; + if let Err(e) = result { + let _ = update_sender + .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) + .await; + } + let _ = update_sender + .send(UpdateMessage::Message { to: from, message }) + .await; + } else { + let _ = update_sender + .send(UpdateMessage::Error(Error::RecvMessage( + RecvMessageError::MissingFrom, + ))) + .await; + } + } + Stanza::Presence(presence) => { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + let _ = update_sender + .send(UpdateMessage::Error(Error::Presence(PresenceError::Error( + Reason::Stanza(presence.errors.first().cloned()), + )))) + .await; + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + let _ = update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::Unsupported, + ))) + .await; + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + let _ = update_sender + .send(UpdateMessage::SubscriptionRequest(from)) + .await; + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let _ = update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence::Offline(offline), + }) + .await; + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => {} + stanza::client::presence::PresenceType::Unsubscribe => {} + stanza::client::presence::PresenceType::Unsubscribed => {} + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let _ = update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence::Online(online), + }) + .await; + } + } + } else { + let _ = update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::MissingFrom, + ))) + .await; + } + } + Stanza::Iq(iq) => match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = pending_iqs.lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + } else { + let _ = update_sender + .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( + iq.id, + )))) + .await; + } + } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => {} + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + db.delete_contact(item.jid.clone()).await; + update_sender + .send(UpdateMessage::RosterDelete(item.jid)) + .await; + // TODO: send result + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = db.upsert_contact(contact.clone()).await { + let _ = update_sender + .send(UpdateMessage::Error(Error::CacheUpdate( + e.into(), + ))) + .await; + } + let _ = update_sender + .send(UpdateMessage::RosterUpdate(contact)) + .await; + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); + } + } + } + } + // TODO: send unsupported to server + _ => {} + } + } else { + // TODO: send error (unsupported) to server + } + } + }, + Stanza::Error(error) => { + let _ = update_sender + .send(UpdateMessage::Error(Error::Stream(error))) + .await; + // TODO: reconnect + } + Stanza::OtherContent(content) => { + let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content))); + // TODO: send error to write_thread + } + } } pub enum ReadControl { diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs index 7557f70..4202163 100644 --- a/luz/src/db/mod.rs +++ b/luz/src/db/mod.rs @@ -160,6 +160,48 @@ impl Db { Ok(()) } + pub async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { + sqlx::query!( + "insert into users ( jid ) values ( ? ) on conflict do nothing", + contact.user_jid, + ) + .execute(&self.db) + .await?; + sqlx::query!( + "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?", + contact.user_jid, + contact.name, + contact.subscription, + contact.name, + contact.subscription + ) + .execute(&self.db) + .await?; + sqlx::query!( + "delete from groups_roster where contact_jid = ?", + contact.user_jid + ) + .execute(&self.db) + .await?; + // TODO: delete orphaned groups from groups table + for group in contact.groups { + sqlx::query!( + "insert into groups (group_name) values (?) on conflict do nothing", + group + ) + .execute(&self.db) + .await?; + sqlx::query!( + "insert into groups_roster (group_name, contact_jid) values (?, ?)", + group, + contact.user_jid + ) + .execute(&self.db) + .await?; + } + Ok(()) + } + pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> { sqlx::query!("delete from roster where user_jid = ?", contact) .execute(&self.db) @@ -290,6 +332,29 @@ impl Db { Ok(()) } + pub async fn create_message_with_user_and_chat( + &self, + message: Message, + chat: JID, + ) -> Result<(), Error> { + sqlx::query!( + "insert into users (jid) values (?) on conflict do nothing", + chat + ) + .execute(&self.db) + .await?; + let id = Uuid::new_v4(); + sqlx::query!( + "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", + id, + chat + ) + .execute(&self.db) + .await?; + self.create_message(message, chat).await?; + Ok(()) + } + pub async fn read_message(&self, message: Uuid) -> Result { let message: Message = sqlx::query_as("select * from messages where id = ?") .bind(message) diff --git a/luz/src/error.rs b/luz/src/error.rs index fbbcd2b..4fdce79 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -6,12 +6,35 @@ pub enum Error { AlreadyConnected, // TODO: change to Connecting(ConnectingError) Connection(ConnectionError), - Presence(Reason), + Presence(PresenceError), + SetStatus(Reason), Roster(Reason), + Stream(stanza::stream::Error), SendMessage(Reason), + RecvMessage(RecvMessageError), AlreadyDisconnected, LostConnection, + // TODO: should all cache update errors include the context? CacheUpdate(Reason), + UnrecognizedContent(peanuts::element::Content), + Iq(IqError), +} + +#[derive(Debug)] +pub enum PresenceError { + Error(Reason), + Unsupported, + MissingFrom, +} + +#[derive(Debug)] +pub enum IqError { + NoMatchingId(String), +} + +#[derive(Debug)] +pub enum RecvMessageError { + MissingFrom, } #[derive(Debug)] @@ -40,7 +63,7 @@ pub struct StatusError(pub Reason); impl From for Error { fn from(e: StatusError) -> Self { - Error::Presence(e.0) + Error::SetStatus(e.0) } } diff --git a/luz/src/lib.rs b/luz/src/lib.rs index cffffb2..901553b 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -20,7 +20,7 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::JoinSet, }; -use tracing::{info, Instrument}; +use tracing::{debug, info, Instrument}; use user::User; use uuid::Uuid; @@ -28,7 +28,7 @@ use crate::connection::write::WriteHandle; use crate::connection::{SupervisorCommand, SupervisorHandle}; use crate::error::Error; -mod chat; +pub mod chat; mod connection; mod db; mod error; @@ -103,12 +103,19 @@ impl Luz { .await; } None => { - let mut jid = self.jid.lock().await; - let mut domain = jid.domainpart.clone(); - // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) - let streams_result = - jabber::connect_and_login(&mut jid, &*self.password, &mut domain) - .await; + let streams_result; + { + let mut jid = self.jid.lock().await; + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + streams_result = jabber::connect_and_login( + &mut jid, + &*self.password, + &mut domain, + ) + .await; + debug!("connected and logged in as {}", jid); + } match streams_result { Ok(s) => { let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); @@ -124,6 +131,7 @@ impl Luz { self.connection_supervisor_shutdown = shutdown_recv; // TODO: get roster and send initial presence let (send, recv) = oneshot::channel(); + debug!("getting roster"); CommandMessage::GetRoster(send) .handle_online( writer.clone(), @@ -134,7 +142,9 @@ impl Luz { self.pending_iqs.clone(), ) .await; + debug!("sent roster req"); let roster = recv.await; + debug!("got roster"); match roster { Ok(r) => { match r { @@ -371,9 +381,11 @@ impl CommandMessage { CommandMessage::GetRoster(result_sender) => { // TODO: jid resource should probably be stored within the connection let owned_jid: JID; + debug!("before client_jid lock"); { owned_jid = client_jid.lock().await.clone(); } + debug!("after client_jid lock"); let iq_id = Uuid::new_v4().to_string(); let (send, iq_recv) = oneshot::channel(); { @@ -1062,6 +1074,7 @@ pub enum UpdateMessage { FullRoster(Vec), /// (only update app roster state, don't replace) RosterUpdate(Contact), + RosterDelete(JID), /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone Presence { from: JID, @@ -1073,4 +1086,5 @@ pub enum UpdateMessage { to: JID, message: Message, }, + SubscriptionRequest(jid::JID), } diff --git a/luz/src/main.rs b/luz/src/main.rs index 5e9cd13..9779351 100644 --- a/luz/src/main.rs +++ b/luz/src/main.rs @@ -1,8 +1,13 @@ -use std::time::Duration; +use std::{str::FromStr, time::Duration}; +use jid::JID; use luz::{CommandMessage, LuzHandle}; use sqlx::SqlitePool; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::oneshot, +}; +use tracing::info; #[tokio::main] async fn main() { @@ -13,10 +18,23 @@ async fn main() { tokio::spawn(async move { while let Some(msg) = recv.recv().await { - println!("{:#?}", msg) + info!("{:#?}", msg) } }); luz.send(CommandMessage::Connect).await.unwrap(); - tokio::time::sleep(Duration::from_secs(15)).await; + let (send, recv) = oneshot::channel(); + tokio::time::sleep(Duration::from_secs(5)).await; + info!("sending message"); + luz.send(CommandMessage::SendMessage( + JID::from_str("cel@blos.sm").unwrap(), + luz::chat::Body { + body: "hallo!!!".to_string(), + }, + send, + )) + .await + .unwrap(); + recv.await.unwrap().unwrap(); + println!("sent message"); } diff --git a/luz/src/presence.rs b/luz/src/presence.rs index 563121b..1df20a7 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -6,7 +6,7 @@ pub struct Online { pub show: Option, pub status: Option, #[sqlx(skip)] - priority: Option, + pub priority: Option, } #[derive(Debug, Clone, Copy)] @@ -55,7 +55,7 @@ impl sqlx::Encode<'_, Sqlite> for Show { #[derive(Debug, Default)] pub struct Offline { - status: Option, + pub status: Option, } #[derive(Debug)] diff --git a/luz/src/roster.rs b/luz/src/roster.rs index e3db00f..0e43a8a 100644 --- a/luz/src/roster.rs +++ b/luz/src/roster.rs @@ -27,6 +27,7 @@ pub enum Subscription { None, PendingOut, PendingIn, + PendingInPendingOut, OnlyOut, OnlyIn, OutPendingIn, @@ -51,6 +52,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription { "none" => Ok(Self::None), "pending-out" => Ok(Self::PendingOut), "pending-in" => Ok(Self::PendingIn), + "pending-in-pending-out" => Ok(Self::PendingInPendingOut), "only-out" => Ok(Self::OnlyOut), "only-in" => Ok(Self::OnlyIn), "out-pending-in" => Ok(Self::OutPendingIn), @@ -70,6 +72,7 @@ impl sqlx::Encode<'_, Sqlite> for Subscription { Subscription::None => "none", Subscription::PendingOut => "pending-out", Subscription::PendingIn => "pending-in", + Subscription::PendingInPendingOut => "pending-in-pending-out", Subscription::OnlyOut => "only-out", Subscription::OnlyIn => "only-in", Subscription::OutPendingIn => "out-pending-in",