implement CLIENT

This commit is contained in:
cel 🌸 2025-02-20 21:08:16 +00:00
parent c0d2aae038
commit 2e6ad369c5
9 changed files with 339 additions and 22 deletions

View File

@ -14,7 +14,7 @@ create table subscription(
state text primary key not null state text primary key not null
); );
insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy'); insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');
-- a roster contains users, with client-set nickname -- a roster contains users, with client-set nickname
CREATE TABLE roster( CREATE TABLE roster(

View File

@ -1,7 +1,7 @@
use jid::JID; use jid::JID;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, sqlx::FromRow)] #[derive(Debug, sqlx::FromRow, Clone)]
pub struct Message { pub struct Message {
pub id: Uuid, pub id: Uuid,
// does not contain full user information // does not contain full user information
@ -20,7 +20,7 @@ pub struct Message {
// Outside, // Outside,
// } // }
#[derive(Debug, sqlx::FromRow)] #[derive(Debug, sqlx::FromRow, Clone)]
pub struct Body { pub struct Body {
// TODO: rich text, other contents, threads // TODO: rich text, other contents, threads
pub body: String, pub body: String,

View File

@ -1,6 +1,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
str::FromStr,
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
@ -12,10 +13,14 @@ use tokio::{
task::{JoinHandle, JoinSet}, task::{JoinHandle, JoinSet},
}; };
use tracing::info; use tracing::info;
use uuid::Uuid;
use crate::{ use crate::{
chat::{Body, Message},
db::Db, db::Db,
error::{Error, Reason}, error::{Error, IqError, PresenceError, Reason, RecvMessageError},
presence::{Offline, Online, Presence, Show},
roster::Contact,
UpdateMessage, UpdateMessage,
}; };
@ -116,7 +121,7 @@ impl Read {
println!("read stanza"); println!("read stanza");
match s { match s {
Ok(s) => { Ok(s) => {
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone()));
}, },
Err(e) => { Err(e) => {
println!("error: {:?}", e); println!("error: {:?}", e);
@ -173,8 +178,197 @@ async fn handle_stanza(
db: Db, db: Db,
supervisor_control: mpsc::Sender<SupervisorCommand>, supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle, write_handle: WriteHandle,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) { ) {
println!("{:?}", stanza) match stanza {
Stanza::Message(stanza_message) => {
if let Some(from) = stanza_message.from {
// TODO: group chat messages
let message = Message {
id: stanza_message
.id
// TODO: proper id storage
.map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
.unwrap_or_else(|| Uuid::new_v4()),
from: from.clone(),
body: Body {
// TODO: should this be an option?
body: stanza_message
.body
.map(|body| body.body)
.unwrap_or_default()
.unwrap_or_default(),
},
};
// TODO: can this be more efficient?
let result = db
.create_message_with_user_and_chat(message.clone(), from.clone())
.await;
if let Err(e) = result {
let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
.await;
}
let _ = update_sender
.send(UpdateMessage::Message { to: from, message })
.await;
} else {
let _ = update_sender
.send(UpdateMessage::Error(Error::RecvMessage(
RecvMessageError::MissingFrom,
)))
.await;
}
}
Stanza::Presence(presence) => {
if let Some(from) = presence.from {
match presence.r#type {
Some(r#type) => match r#type {
// error processing a presence from somebody
stanza::client::presence::PresenceType::Error => {
// TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
let _ = update_sender
.send(UpdateMessage::Error(Error::Presence(PresenceError::Error(
Reason::Stanza(presence.errors.first().cloned()),
))))
.await;
}
// should not happen (error to server)
stanza::client::presence::PresenceType::Probe => {
// TODO: should probably write an error and restart stream
let _ = update_sender
.send(UpdateMessage::Error(Error::Presence(
PresenceError::Unsupported,
)))
.await;
}
stanza::client::presence::PresenceType::Subscribe => {
// may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
let _ = update_sender
.send(UpdateMessage::SubscriptionRequest(from))
.await;
}
stanza::client::presence::PresenceType::Unavailable => {
let offline = Offline {
status: presence.status.map(|status| status.status.0),
};
let _ = update_sender
.send(UpdateMessage::Presence {
from,
presence: Presence::Offline(offline),
})
.await;
}
// for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
stanza::client::presence::PresenceType::Subscribed => {}
stanza::client::presence::PresenceType::Unsubscribe => {}
stanza::client::presence::PresenceType::Unsubscribed => {}
},
None => {
let online = Online {
show: presence.show.map(|show| match show {
stanza::client::presence::Show::Away => Show::Away,
stanza::client::presence::Show::Chat => Show::Chat,
stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
stanza::client::presence::Show::Xa => Show::ExtendedAway,
}),
status: presence.status.map(|status| status.status.0),
priority: presence.priority.map(|priority| priority.0),
};
let _ = update_sender
.send(UpdateMessage::Presence {
from,
presence: Presence::Online(online),
})
.await;
}
}
} else {
let _ = update_sender
.send(UpdateMessage::Error(Error::Presence(
PresenceError::MissingFrom,
)))
.await;
}
}
Stanza::Iq(iq) => match iq.r#type {
stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
let send;
{
send = pending_iqs.lock().await.remove(&iq.id);
}
if let Some(send) = send {
send.send(Ok(Stanza::Iq(iq)));
} else {
let _ = update_sender
.send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
iq.id,
))))
.await;
}
}
// TODO: send unsupported to server
// TODO: proper errors i am so tired please
stanza::client::iq::IqType::Get => {}
stanza::client::iq::IqType::Set => {
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::Roster(mut query) => {
// TODO: there should only be one
if let Some(item) = query.items.pop() {
match item.subscription {
Some(stanza::roster::Subscription::Remove) => {
db.delete_contact(item.jid.clone()).await;
update_sender
.send(UpdateMessage::RosterDelete(item.jid))
.await;
// TODO: send result
}
_ => {
let contact: Contact = item.into();
if let Err(e) = db.upsert_contact(contact.clone()).await {
let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(
e.into(),
)))
.await;
}
let _ = update_sender
.send(UpdateMessage::RosterUpdate(contact))
.await;
// TODO: send result
// write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
// from: ,
// id: todo!(),
// to: todo!(),
// r#type: todo!(),
// lang: todo!(),
// query: todo!(),
// errors: todo!(),
// }));
}
}
}
}
// TODO: send unsupported to server
_ => {}
}
} else {
// TODO: send error (unsupported) to server
}
}
},
Stanza::Error(error) => {
let _ = update_sender
.send(UpdateMessage::Error(Error::Stream(error)))
.await;
// TODO: reconnect
}
Stanza::OtherContent(content) => {
let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
// TODO: send error to write_thread
}
}
} }
pub enum ReadControl { pub enum ReadControl {

View File

@ -160,6 +160,48 @@ impl Db {
Ok(()) Ok(())
} }
pub async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
sqlx::query!(
"insert into users ( jid ) values ( ? ) on conflict do nothing",
contact.user_jid,
)
.execute(&self.db)
.await?;
sqlx::query!(
"insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?",
contact.user_jid,
contact.name,
contact.subscription,
contact.name,
contact.subscription
)
.execute(&self.db)
.await?;
sqlx::query!(
"delete from groups_roster where contact_jid = ?",
contact.user_jid
)
.execute(&self.db)
.await?;
// TODO: delete orphaned groups from groups table
for group in contact.groups {
sqlx::query!(
"insert into groups (group_name) values (?) on conflict do nothing",
group
)
.execute(&self.db)
.await?;
sqlx::query!(
"insert into groups_roster (group_name, contact_jid) values (?, ?)",
group,
contact.user_jid
)
.execute(&self.db)
.await?;
}
Ok(())
}
pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> { pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
sqlx::query!("delete from roster where user_jid = ?", contact) sqlx::query!("delete from roster where user_jid = ?", contact)
.execute(&self.db) .execute(&self.db)
@ -290,6 +332,29 @@ impl Db {
Ok(()) Ok(())
} }
pub async fn create_message_with_user_and_chat(
&self,
message: Message,
chat: JID,
) -> Result<(), Error> {
sqlx::query!(
"insert into users (jid) values (?) on conflict do nothing",
chat
)
.execute(&self.db)
.await?;
let id = Uuid::new_v4();
sqlx::query!(
"insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
id,
chat
)
.execute(&self.db)
.await?;
self.create_message(message, chat).await?;
Ok(())
}
pub async fn read_message(&self, message: Uuid) -> Result<Message, Error> { pub async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
let message: Message = sqlx::query_as("select * from messages where id = ?") let message: Message = sqlx::query_as("select * from messages where id = ?")
.bind(message) .bind(message)

View File

@ -6,12 +6,35 @@ pub enum Error {
AlreadyConnected, AlreadyConnected,
// TODO: change to Connecting(ConnectingError) // TODO: change to Connecting(ConnectingError)
Connection(ConnectionError), Connection(ConnectionError),
Presence(Reason), Presence(PresenceError),
SetStatus(Reason),
Roster(Reason), Roster(Reason),
Stream(stanza::stream::Error),
SendMessage(Reason), SendMessage(Reason),
RecvMessage(RecvMessageError),
AlreadyDisconnected, AlreadyDisconnected,
LostConnection, LostConnection,
// TODO: should all cache update errors include the context?
CacheUpdate(Reason), CacheUpdate(Reason),
UnrecognizedContent(peanuts::element::Content),
Iq(IqError),
}
#[derive(Debug)]
pub enum PresenceError {
Error(Reason),
Unsupported,
MissingFrom,
}
#[derive(Debug)]
pub enum IqError {
NoMatchingId(String),
}
#[derive(Debug)]
pub enum RecvMessageError {
MissingFrom,
} }
#[derive(Debug)] #[derive(Debug)]
@ -40,7 +63,7 @@ pub struct StatusError(pub Reason);
impl From<StatusError> for Error { impl From<StatusError> for Error {
fn from(e: StatusError) -> Self { fn from(e: StatusError) -> Self {
Error::Presence(e.0) Error::SetStatus(e.0)
} }
} }

View File

@ -20,7 +20,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex}, sync::{mpsc, oneshot, Mutex},
task::JoinSet, task::JoinSet,
}; };
use tracing::{info, Instrument}; use tracing::{debug, info, Instrument};
use user::User; use user::User;
use uuid::Uuid; use uuid::Uuid;
@ -28,7 +28,7 @@ use crate::connection::write::WriteHandle;
use crate::connection::{SupervisorCommand, SupervisorHandle}; use crate::connection::{SupervisorCommand, SupervisorHandle};
use crate::error::Error; use crate::error::Error;
mod chat; pub mod chat;
mod connection; mod connection;
mod db; mod db;
mod error; mod error;
@ -103,12 +103,19 @@ impl Luz {
.await; .await;
} }
None => { None => {
let mut jid = self.jid.lock().await; let streams_result;
let mut domain = jid.domainpart.clone(); {
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) let mut jid = self.jid.lock().await;
let streams_result = let mut domain = jid.domainpart.clone();
jabber::connect_and_login(&mut jid, &*self.password, &mut domain) // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
.await; streams_result = jabber::connect_and_login(
&mut jid,
&*self.password,
&mut domain,
)
.await;
debug!("connected and logged in as {}", jid);
}
match streams_result { match streams_result {
Ok(s) => { Ok(s) => {
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
@ -124,6 +131,7 @@ impl Luz {
self.connection_supervisor_shutdown = shutdown_recv; self.connection_supervisor_shutdown = shutdown_recv;
// TODO: get roster and send initial presence // TODO: get roster and send initial presence
let (send, recv) = oneshot::channel(); let (send, recv) = oneshot::channel();
debug!("getting roster");
CommandMessage::GetRoster(send) CommandMessage::GetRoster(send)
.handle_online( .handle_online(
writer.clone(), writer.clone(),
@ -134,7 +142,9 @@ impl Luz {
self.pending_iqs.clone(), self.pending_iqs.clone(),
) )
.await; .await;
debug!("sent roster req");
let roster = recv.await; let roster = recv.await;
debug!("got roster");
match roster { match roster {
Ok(r) => { Ok(r) => {
match r { match r {
@ -371,9 +381,11 @@ impl CommandMessage {
CommandMessage::GetRoster(result_sender) => { CommandMessage::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection // TODO: jid resource should probably be stored within the connection
let owned_jid: JID; let owned_jid: JID;
debug!("before client_jid lock");
{ {
owned_jid = client_jid.lock().await.clone(); owned_jid = client_jid.lock().await.clone();
} }
debug!("after client_jid lock");
let iq_id = Uuid::new_v4().to_string(); let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel(); let (send, iq_recv) = oneshot::channel();
{ {
@ -1062,6 +1074,7 @@ pub enum UpdateMessage {
FullRoster(Vec<Contact>), FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace) /// (only update app roster state, don't replace)
RosterUpdate(Contact), RosterUpdate(Contact),
RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence { Presence {
from: JID, from: JID,
@ -1073,4 +1086,5 @@ pub enum UpdateMessage {
to: JID, to: JID,
message: Message, message: Message,
}, },
SubscriptionRequest(jid::JID),
} }

View File

@ -1,8 +1,13 @@
use std::time::Duration; use std::{str::FromStr, time::Duration};
use jid::JID;
use luz::{CommandMessage, LuzHandle}; use luz::{CommandMessage, LuzHandle};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot,
};
use tracing::info;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -13,10 +18,23 @@ async fn main() {
tokio::spawn(async move { tokio::spawn(async move {
while let Some(msg) = recv.recv().await { while let Some(msg) = recv.recv().await {
println!("{:#?}", msg) info!("{:#?}", msg)
} }
}); });
luz.send(CommandMessage::Connect).await.unwrap(); luz.send(CommandMessage::Connect).await.unwrap();
tokio::time::sleep(Duration::from_secs(15)).await; let (send, recv) = oneshot::channel();
tokio::time::sleep(Duration::from_secs(5)).await;
info!("sending message");
luz.send(CommandMessage::SendMessage(
JID::from_str("cel@blos.sm").unwrap(),
luz::chat::Body {
body: "hallo!!!".to_string(),
},
send,
))
.await
.unwrap();
recv.await.unwrap().unwrap();
println!("sent message");
} }

View File

@ -6,7 +6,7 @@ pub struct Online {
pub show: Option<Show>, pub show: Option<Show>,
pub status: Option<String>, pub status: Option<String>,
#[sqlx(skip)] #[sqlx(skip)]
priority: Option<i8>, pub priority: Option<i8>,
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -55,7 +55,7 @@ impl sqlx::Encode<'_, Sqlite> for Show {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Offline { pub struct Offline {
status: Option<String>, pub status: Option<String>,
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -27,6 +27,7 @@ pub enum Subscription {
None, None,
PendingOut, PendingOut,
PendingIn, PendingIn,
PendingInPendingOut,
OnlyOut, OnlyOut,
OnlyIn, OnlyIn,
OutPendingIn, OutPendingIn,
@ -51,6 +52,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription {
"none" => Ok(Self::None), "none" => Ok(Self::None),
"pending-out" => Ok(Self::PendingOut), "pending-out" => Ok(Self::PendingOut),
"pending-in" => Ok(Self::PendingIn), "pending-in" => Ok(Self::PendingIn),
"pending-in-pending-out" => Ok(Self::PendingInPendingOut),
"only-out" => Ok(Self::OnlyOut), "only-out" => Ok(Self::OnlyOut),
"only-in" => Ok(Self::OnlyIn), "only-in" => Ok(Self::OnlyIn),
"out-pending-in" => Ok(Self::OutPendingIn), "out-pending-in" => Ok(Self::OutPendingIn),
@ -70,6 +72,7 @@ impl sqlx::Encode<'_, Sqlite> for Subscription {
Subscription::None => "none", Subscription::None => "none",
Subscription::PendingOut => "pending-out", Subscription::PendingOut => "pending-out",
Subscription::PendingIn => "pending-in", Subscription::PendingIn => "pending-in",
Subscription::PendingInPendingOut => "pending-in-pending-out",
Subscription::OnlyOut => "only-out", Subscription::OnlyOut => "only-out",
Subscription::OnlyIn => "only-in", Subscription::OnlyIn => "only-in",
Subscription::OutPendingIn => "out-pending-in", Subscription::OutPendingIn => "out-pending-in",