implement handle_online() and handle_offline() for CommandMessage
This commit is contained in:
parent
945f140616
commit
c0d2aae038
|
@ -139,6 +139,20 @@ pub struct WriteHandle {
|
||||||
sender: mpsc::Sender<WriteMessage>,
|
sender: mpsc::Sender<WriteMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl WriteHandle {
|
||||||
|
pub async fn write(&self, stanza: Stanza) -> Result<(), Reason> {
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.send(WriteMessage {
|
||||||
|
stanza,
|
||||||
|
respond_to: send,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|_| Reason::ChannelSend)?;
|
||||||
|
// TODO: timeout
|
||||||
|
recv.await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Deref for WriteHandle {
|
impl Deref for WriteHandle {
|
||||||
type Target = mpsc::Sender<WriteMessage>;
|
type Target = mpsc::Sender<WriteMessage>;
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ impl From<RosterError> for ConnectionError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct StatusError(Reason);
|
pub struct StatusError(pub Reason);
|
||||||
|
|
||||||
impl From<StatusError> for Error {
|
impl From<StatusError> for Error {
|
||||||
fn from(e: StatusError) -> Self {
|
fn from(e: StatusError) -> Self {
|
||||||
|
@ -55,7 +55,7 @@ pub enum Reason {
|
||||||
// TODO: organisastion of error into internal error thing
|
// TODO: organisastion of error into internal error thing
|
||||||
Timeout,
|
Timeout,
|
||||||
Stream(stanza::stream_error::Error),
|
Stream(stanza::stream_error::Error),
|
||||||
Stanza(stanza::stanza_error::Error),
|
Stanza(Option<stanza::client::error::Error>),
|
||||||
Jabber(jabber::Error),
|
Jabber(jabber::Error),
|
||||||
XML(peanuts::Error),
|
XML(peanuts::Error),
|
||||||
SQL(sqlx::Error),
|
SQL(sqlx::Error),
|
||||||
|
@ -63,6 +63,8 @@ pub enum Reason {
|
||||||
LostConnection,
|
LostConnection,
|
||||||
OneshotRecv(oneshot::error::RecvError),
|
OneshotRecv(oneshot::error::RecvError),
|
||||||
UnexpectedStanza(Stanza),
|
UnexpectedStanza(Stanza),
|
||||||
|
Disconnected,
|
||||||
|
ChannelSend,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<oneshot::error::RecvError> for Reason {
|
impl From<oneshot::error::RecvError> for Reason {
|
||||||
|
|
640
luz/src/lib.rs
640
luz/src/lib.rs
|
@ -20,7 +20,7 @@ use tokio::{
|
||||||
sync::{mpsc, oneshot, Mutex},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
};
|
};
|
||||||
use tracing::info;
|
use tracing::{info, Instrument};
|
||||||
use user::User;
|
use user::User;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -286,24 +286,72 @@ impl CommandMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
CommandMessage::GetChats(sender) => todo!(),
|
CommandMessage::GetChats(sender) => {
|
||||||
CommandMessage::GetChat(jid, sender) => todo!(),
|
let chats = db.read_chats().await.map_err(|e| e.into());
|
||||||
CommandMessage::GetMessages(jid, sender) => todo!(),
|
sender.send(chats);
|
||||||
CommandMessage::DeleteChat(jid, sender) => todo!(),
|
}
|
||||||
CommandMessage::DeleteMessage(uuid, sender) => todo!(),
|
CommandMessage::GetChat(jid, sender) => {
|
||||||
CommandMessage::GetUser(jid, sender) => todo!(),
|
let chats = db.read_chat(jid).await.map_err(|e| e.into());
|
||||||
CommandMessage::AddContact(jid, sender) => todo!(),
|
sender.send(chats);
|
||||||
CommandMessage::BuddyRequest(jid, sender) => todo!(),
|
}
|
||||||
CommandMessage::SubscriptionRequest(jid, sender) => todo!(),
|
CommandMessage::GetMessages(jid, sender) => {
|
||||||
CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(),
|
let messages = db.read_message_history(jid).await.map_err(|e| e.into());
|
||||||
CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(),
|
sender.send(messages);
|
||||||
CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(),
|
}
|
||||||
CommandMessage::UnsubscribeContact(jid, sender) => todo!(),
|
CommandMessage::DeleteChat(jid, sender) => {
|
||||||
CommandMessage::UnfriendContact(jid, sender) => todo!(),
|
let result = db.delete_chat(jid).await.map_err(|e| e.into());
|
||||||
CommandMessage::DeleteContact(jid, sender) => todo!(),
|
sender.send(result);
|
||||||
CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(),
|
}
|
||||||
CommandMessage::SetStatus(online, sender) => todo!(),
|
CommandMessage::DeleteMessage(uuid, sender) => {
|
||||||
CommandMessage::SendMessage(jid, body, sender) => todo!(),
|
let result = db.delete_message(uuid).await.map_err(|e| e.into());
|
||||||
|
sender.send(result);
|
||||||
|
}
|
||||||
|
CommandMessage::GetUser(jid, sender) => {
|
||||||
|
let user = db.read_user(jid).await.map_err(|e| e.into());
|
||||||
|
sender.send(user);
|
||||||
|
}
|
||||||
|
// TODO: offline queue to modify roster
|
||||||
|
CommandMessage::AddContact(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::BuddyRequest(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::SubscriptionRequest(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::AcceptBuddyRequest(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::UnsubscribeFromContact(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::UnsubscribeContact(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::UnfriendContact(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::DeleteContact(jid, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::UpdateContact(jid, contact_update, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
|
CommandMessage::SetStatus(online, sender) => {
|
||||||
|
let result = db
|
||||||
|
.upsert_cached_status(online)
|
||||||
|
.await
|
||||||
|
.map_err(|e| StatusError(e.into()));
|
||||||
|
sender.send(result);
|
||||||
|
}
|
||||||
|
// TODO: offline message queue
|
||||||
|
CommandMessage::SendMessage(jid, body, sender) => {
|
||||||
|
sender.send(Err(Reason::Disconnected));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +360,7 @@ impl CommandMessage {
|
||||||
write_handle: WriteHandle,
|
write_handle: WriteHandle,
|
||||||
supervisor_control: SupervisorSender,
|
supervisor_control: SupervisorSender,
|
||||||
// TODO: jid could lose resource by the end
|
// TODO: jid could lose resource by the end
|
||||||
jid: Arc<Mutex<JID>>,
|
client_jid: Arc<Mutex<JID>>,
|
||||||
db: Db,
|
db: Db,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
|
||||||
|
@ -324,7 +372,7 @@ impl CommandMessage {
|
||||||
// TODO: jid resource should probably be stored within the connection
|
// TODO: jid resource should probably be stored within the connection
|
||||||
let owned_jid: JID;
|
let owned_jid: JID;
|
||||||
{
|
{
|
||||||
owned_jid = jid.lock().await.clone();
|
owned_jid = client_jid.lock().await.clone();
|
||||||
}
|
}
|
||||||
let iq_id = Uuid::new_v4().to_string();
|
let iq_id = Uuid::new_v4().to_string();
|
||||||
let (send, iq_recv) = oneshot::channel();
|
let (send, iq_recv) = oneshot::channel();
|
||||||
|
@ -400,24 +448,502 @@ impl CommandMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
CommandMessage::GetChats(sender) => todo!(),
|
CommandMessage::GetChats(sender) => {
|
||||||
CommandMessage::GetChat(jid, sender) => todo!(),
|
let chats = db.read_chats().await.map_err(|e| e.into());
|
||||||
CommandMessage::GetMessages(jid, sender) => todo!(),
|
sender.send(chats);
|
||||||
CommandMessage::DeleteChat(jid, sender) => todo!(),
|
}
|
||||||
CommandMessage::DeleteMessage(uuid, sender) => todo!(),
|
CommandMessage::GetChat(jid, sender) => {
|
||||||
CommandMessage::GetUser(jid, sender) => todo!(),
|
let chats = db.read_chat(jid).await.map_err(|e| e.into());
|
||||||
CommandMessage::AddContact(jid, sender) => todo!(),
|
sender.send(chats);
|
||||||
CommandMessage::BuddyRequest(jid, sender) => todo!(),
|
}
|
||||||
CommandMessage::SubscriptionRequest(jid, sender) => todo!(),
|
CommandMessage::GetMessages(jid, sender) => {
|
||||||
CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(),
|
let messages = db.read_message_history(jid).await.map_err(|e| e.into());
|
||||||
CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(),
|
sender.send(messages);
|
||||||
CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(),
|
}
|
||||||
CommandMessage::UnsubscribeContact(jid, sender) => todo!(),
|
CommandMessage::DeleteChat(jid, sender) => {
|
||||||
CommandMessage::UnfriendContact(jid, sender) => todo!(),
|
let result = db.delete_chat(jid).await.map_err(|e| e.into());
|
||||||
CommandMessage::DeleteContact(jid, sender) => todo!(),
|
sender.send(result);
|
||||||
CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(),
|
}
|
||||||
CommandMessage::SetStatus(online, sender) => todo!(),
|
CommandMessage::DeleteMessage(uuid, sender) => {
|
||||||
CommandMessage::SendMessage(jid, body, sender) => todo!(),
|
let result = db.delete_message(uuid).await.map_err(|e| e.into());
|
||||||
|
sender.send(result);
|
||||||
|
}
|
||||||
|
CommandMessage::GetUser(jid, sender) => {
|
||||||
|
let user = db.read_user(jid).await.map_err(|e| e.into());
|
||||||
|
sender.send(user);
|
||||||
|
}
|
||||||
|
// TODO: offline queue to modify roster
|
||||||
|
CommandMessage::AddContact(jid, sender) => {
|
||||||
|
let owned_jid;
|
||||||
|
{
|
||||||
|
owned_jid = client_jid.lock().await.clone();
|
||||||
|
}
|
||||||
|
let iq_id = Uuid::new_v4().to_string();
|
||||||
|
let set_stanza = Stanza::Iq(Iq {
|
||||||
|
from: Some(owned_jid),
|
||||||
|
id: iq_id.clone(),
|
||||||
|
to: None,
|
||||||
|
r#type: IqType::Set,
|
||||||
|
lang: None,
|
||||||
|
query: Some(iq::Query::Roster(stanza::roster::Query {
|
||||||
|
ver: None,
|
||||||
|
items: vec![stanza::roster::Item {
|
||||||
|
approved: None,
|
||||||
|
ask: false,
|
||||||
|
jid,
|
||||||
|
name: None,
|
||||||
|
subscription: None,
|
||||||
|
groups: Vec::new(),
|
||||||
|
}],
|
||||||
|
})),
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
{
|
||||||
|
pending_iqs.lock().await.insert(iq_id.clone(), send);
|
||||||
|
}
|
||||||
|
// TODO: write_handle send helper function
|
||||||
|
let result = write_handle.write(set_stanza).await;
|
||||||
|
if let Err(_) = result {
|
||||||
|
sender.send(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let iq_result = recv.await;
|
||||||
|
match iq_result {
|
||||||
|
Ok(i) => match i {
|
||||||
|
Ok(iq_result) => match iq_result {
|
||||||
|
Stanza::Iq(Iq {
|
||||||
|
from: _,
|
||||||
|
id,
|
||||||
|
to: _,
|
||||||
|
r#type,
|
||||||
|
lang: _,
|
||||||
|
query: _,
|
||||||
|
errors: _,
|
||||||
|
}) if id == iq_id && r#type == IqType::Result => {
|
||||||
|
sender.send(Ok(()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Stanza::Iq(Iq {
|
||||||
|
from: _,
|
||||||
|
id,
|
||||||
|
to: _,
|
||||||
|
r#type,
|
||||||
|
lang: _,
|
||||||
|
query: _,
|
||||||
|
errors,
|
||||||
|
}) if id == iq_id && r#type == IqType::Error => {
|
||||||
|
if let Some(error) = errors.first() {
|
||||||
|
sender.send(Err(Reason::Stanza(Some(error.clone()))));
|
||||||
|
} else {
|
||||||
|
sender.send(Err(Reason::Stanza(None)));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
s => {
|
||||||
|
sender.send(Err(Reason::UnexpectedStanza(s)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sender.send(Err(e.into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sender.send(Err(e.into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandMessage::BuddyRequest(jid, sender) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid.clone()),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
match result {
|
||||||
|
Err(_) => {
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandMessage::SubscriptionRequest(jid, sender) => {
|
||||||
|
// TODO: i should probably have builders
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
CommandMessage::AcceptBuddyRequest(jid, sender) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid.clone()),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
match result {
|
||||||
|
Err(_) => {
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
CommandMessage::UnsubscribeFromContact(jid, sender) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
CommandMessage::UnsubscribeContact(jid, sender) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
CommandMessage::UnfriendContact(jid, sender) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid.clone()),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
match result {
|
||||||
|
Err(_) => {
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
let presence = Stanza::Presence(stanza::client::presence::Presence {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: Some(jid),
|
||||||
|
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
|
||||||
|
lang: None,
|
||||||
|
show: None,
|
||||||
|
status: None,
|
||||||
|
priority: None,
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let result = write_handle.write(presence).await;
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandMessage::DeleteContact(jid, sender) => {
|
||||||
|
let owned_jid;
|
||||||
|
{
|
||||||
|
owned_jid = client_jid.lock().await.clone();
|
||||||
|
}
|
||||||
|
let iq_id = Uuid::new_v4().to_string();
|
||||||
|
let set_stanza = Stanza::Iq(Iq {
|
||||||
|
from: Some(owned_jid),
|
||||||
|
id: iq_id.clone(),
|
||||||
|
to: None,
|
||||||
|
r#type: IqType::Set,
|
||||||
|
lang: None,
|
||||||
|
query: Some(iq::Query::Roster(stanza::roster::Query {
|
||||||
|
ver: None,
|
||||||
|
items: vec![stanza::roster::Item {
|
||||||
|
approved: None,
|
||||||
|
ask: false,
|
||||||
|
jid,
|
||||||
|
name: None,
|
||||||
|
subscription: Some(stanza::roster::Subscription::Remove),
|
||||||
|
groups: Vec::new(),
|
||||||
|
}],
|
||||||
|
})),
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
{
|
||||||
|
pending_iqs.lock().await.insert(iq_id.clone(), send);
|
||||||
|
}
|
||||||
|
let result = write_handle.write(set_stanza).await;
|
||||||
|
if let Err(_) = result {
|
||||||
|
sender.send(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let iq_result = recv.await;
|
||||||
|
match iq_result {
|
||||||
|
Ok(i) => match i {
|
||||||
|
Ok(iq_result) => match iq_result {
|
||||||
|
Stanza::Iq(Iq {
|
||||||
|
from: _,
|
||||||
|
id,
|
||||||
|
to: _,
|
||||||
|
r#type,
|
||||||
|
lang: _,
|
||||||
|
query: _,
|
||||||
|
errors: _,
|
||||||
|
}) if id == iq_id && r#type == IqType::Result => {
|
||||||
|
sender.send(Ok(()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Stanza::Iq(Iq {
|
||||||
|
from: _,
|
||||||
|
id,
|
||||||
|
to: _,
|
||||||
|
r#type,
|
||||||
|
lang: _,
|
||||||
|
query: _,
|
||||||
|
errors,
|
||||||
|
}) if id == iq_id && r#type == IqType::Error => {
|
||||||
|
if let Some(error) = errors.first() {
|
||||||
|
sender.send(Err(Reason::Stanza(Some(error.clone()))));
|
||||||
|
} else {
|
||||||
|
sender.send(Err(Reason::Stanza(None)));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
s => {
|
||||||
|
sender.send(Err(Reason::UnexpectedStanza(s)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sender.send(Err(e.into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sender.send(Err(e.into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandMessage::UpdateContact(jid, contact_update, sender) => {
|
||||||
|
let owned_jid;
|
||||||
|
{
|
||||||
|
owned_jid = client_jid.lock().await.clone();
|
||||||
|
}
|
||||||
|
let iq_id = Uuid::new_v4().to_string();
|
||||||
|
let groups = Vec::from_iter(
|
||||||
|
contact_update
|
||||||
|
.groups
|
||||||
|
.into_iter()
|
||||||
|
.map(|group| stanza::roster::Group(Some(group))),
|
||||||
|
);
|
||||||
|
let set_stanza = Stanza::Iq(Iq {
|
||||||
|
from: Some(owned_jid),
|
||||||
|
id: iq_id.clone(),
|
||||||
|
to: None,
|
||||||
|
r#type: IqType::Set,
|
||||||
|
lang: None,
|
||||||
|
query: Some(iq::Query::Roster(stanza::roster::Query {
|
||||||
|
ver: None,
|
||||||
|
items: vec![stanza::roster::Item {
|
||||||
|
approved: None,
|
||||||
|
ask: false,
|
||||||
|
jid,
|
||||||
|
name: contact_update.name,
|
||||||
|
subscription: None,
|
||||||
|
groups,
|
||||||
|
}],
|
||||||
|
})),
|
||||||
|
errors: Vec::new(),
|
||||||
|
});
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
{
|
||||||
|
pending_iqs.lock().await.insert(iq_id.clone(), send);
|
||||||
|
}
|
||||||
|
let result = write_handle.write(set_stanza).await;
|
||||||
|
if let Err(_) = result {
|
||||||
|
sender.send(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let iq_result = recv.await;
|
||||||
|
match iq_result {
|
||||||
|
Ok(i) => match i {
|
||||||
|
Ok(iq_result) => match iq_result {
|
||||||
|
Stanza::Iq(Iq {
|
||||||
|
from: _,
|
||||||
|
id,
|
||||||
|
to: _,
|
||||||
|
r#type,
|
||||||
|
lang: _,
|
||||||
|
query: _,
|
||||||
|
errors: _,
|
||||||
|
}) if id == iq_id && r#type == IqType::Result => {
|
||||||
|
sender.send(Ok(()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Stanza::Iq(Iq {
|
||||||
|
from: _,
|
||||||
|
id,
|
||||||
|
to: _,
|
||||||
|
r#type,
|
||||||
|
lang: _,
|
||||||
|
query: _,
|
||||||
|
errors,
|
||||||
|
}) if id == iq_id && r#type == IqType::Error => {
|
||||||
|
if let Some(error) = errors.first() {
|
||||||
|
sender.send(Err(Reason::Stanza(Some(error.clone()))));
|
||||||
|
} else {
|
||||||
|
sender.send(Err(Reason::Stanza(None)));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
s => {
|
||||||
|
sender.send(Err(Reason::UnexpectedStanza(s)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sender.send(Err(e.into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sender.send(Err(e.into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CommandMessage::SetStatus(online, sender) => {
|
||||||
|
let result = db.upsert_cached_status(online.clone()).await;
|
||||||
|
if let Err(e) = result {
|
||||||
|
let _ = update_sender
|
||||||
|
.send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
let result = write_handle
|
||||||
|
.write(Stanza::Presence(online.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| StatusError(e));
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
// TODO: offline message queue
|
||||||
|
CommandMessage::SendMessage(jid, body, sender) => {
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let owned_jid: JID;
|
||||||
|
{
|
||||||
|
// TODO: timeout
|
||||||
|
owned_jid = client_jid.lock().await.clone();
|
||||||
|
}
|
||||||
|
let message = Stanza::Message(stanza::client::message::Message {
|
||||||
|
from: Some(owned_jid.clone()),
|
||||||
|
id: Some(id.to_string()),
|
||||||
|
to: Some(jid.clone()),
|
||||||
|
// TODO: specify message type
|
||||||
|
r#type: stanza::client::message::MessageType::Chat,
|
||||||
|
// TODO: lang ?
|
||||||
|
lang: None,
|
||||||
|
subject: None,
|
||||||
|
body: Some(stanza::client::message::Body {
|
||||||
|
lang: None,
|
||||||
|
body: Some(body.body.clone()),
|
||||||
|
}),
|
||||||
|
thread: None,
|
||||||
|
});
|
||||||
|
let result = write_handle.write(message).await;
|
||||||
|
match result {
|
||||||
|
Ok(_) => {
|
||||||
|
let message = Message {
|
||||||
|
id,
|
||||||
|
from: owned_jid,
|
||||||
|
body,
|
||||||
|
};
|
||||||
|
if let Err(e) = db.create_message(message, jid).await.map_err(|e| e.into())
|
||||||
|
{
|
||||||
|
let _ = update_sender.send(UpdateMessage::Error(Error::CacheUpdate(e)));
|
||||||
|
}
|
||||||
|
let _ = sender.send(Ok(()));
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
let _ = sender.send(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -484,46 +1010,46 @@ pub enum CommandMessage {
|
||||||
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
|
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
|
||||||
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
|
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
|
||||||
// TODO: paging and filtering
|
// TODO: paging and filtering
|
||||||
GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>),
|
GetChats(oneshot::Sender<Result<Vec<Chat>, Reason>>),
|
||||||
/// get a specific chat by jid
|
/// get a specific chat by jid
|
||||||
GetChat(JID, oneshot::Sender<Result<Chat, Error>>),
|
GetChat(JID, oneshot::Sender<Result<Chat, Reason>>),
|
||||||
/// get message history for chat (does appropriate mam things)
|
/// get message history for chat (does appropriate mam things)
|
||||||
// TODO: paging and filtering
|
// TODO: paging and filtering
|
||||||
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>),
|
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Reason>>),
|
||||||
/// delete a chat from your chat history, along with all the corresponding messages
|
/// delete a chat from your chat history, along with all the corresponding messages
|
||||||
DeleteChat(JID, oneshot::Sender<Result<(), Error>>),
|
DeleteChat(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// delete a message from your chat history
|
/// delete a message from your chat history
|
||||||
DeleteMessage(Uuid, oneshot::Sender<Result<(), Error>>),
|
DeleteMessage(Uuid, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// get a user from your users database
|
/// get a user from your users database
|
||||||
GetUser(JID, oneshot::Sender<Result<User, Error>>),
|
GetUser(JID, oneshot::Sender<Result<User, Reason>>),
|
||||||
/// add a contact to your roster, with a status of none, no subscriptions.
|
/// add a contact to your roster, with a status of none, no subscriptions.
|
||||||
// TODO: for all these, consider returning with oneshot::Sender<Result<(), Error>>
|
// TODO: for all these, consider returning with oneshot::Sender<Result<(), Error>>
|
||||||
AddContact(JID, oneshot::Sender<Result<(), Error>>),
|
AddContact(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
|
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
|
||||||
BuddyRequest(JID, oneshot::Sender<Result<(), Error>>),
|
BuddyRequest(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
|
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
|
||||||
SubscriptionRequest(JID, oneshot::Sender<Result<(), Error>>),
|
SubscriptionRequest(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
|
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
|
||||||
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), Error>>),
|
AcceptBuddyRequest(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
|
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
|
||||||
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), Error>>),
|
AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// unsubscribe to a contact, but don't remove their subscription.
|
/// unsubscribe to a contact, but don't remove their subscription.
|
||||||
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), Error>>),
|
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// stop a contact from being subscribed, but stay subscribed to the contact.
|
/// stop a contact from being subscribed, but stay subscribed to the contact.
|
||||||
UnsubscribeContact(JID, oneshot::Sender<Result<(), Error>>),
|
UnsubscribeContact(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// remove subscriptions to and from contact, but keep in roster.
|
/// remove subscriptions to and from contact, but keep in roster.
|
||||||
UnfriendContact(JID, oneshot::Sender<Result<(), Error>>),
|
UnfriendContact(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
|
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
|
||||||
DeleteContact(JID, oneshot::Sender<Result<(), Error>>),
|
DeleteContact(JID, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// update contact
|
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
|
||||||
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>),
|
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Reason>>),
|
||||||
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
|
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
|
||||||
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
|
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
|
||||||
/// send a directed presence (usually to a non-contact).
|
/// send a directed presence (usually to a non-contact).
|
||||||
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
|
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
|
||||||
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
|
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
|
||||||
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
|
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
|
||||||
SendMessage(JID, Body, oneshot::Sender<Result<(), Error>>),
|
SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use sqlx::Sqlite;
|
use sqlx::Sqlite;
|
||||||
|
use stanza::client::presence::String1024;
|
||||||
|
|
||||||
#[derive(Debug, Default, sqlx::FromRow, Clone)]
|
#[derive(Debug, Default, sqlx::FromRow, Clone)]
|
||||||
pub struct Online {
|
pub struct Online {
|
||||||
|
@ -62,3 +63,30 @@ pub enum Presence {
|
||||||
Online(Online),
|
Online(Online),
|
||||||
Offline(Offline),
|
Offline(Offline),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Online> for stanza::client::presence::Presence {
|
||||||
|
fn from(value: Online) -> Self {
|
||||||
|
Self {
|
||||||
|
from: None,
|
||||||
|
id: None,
|
||||||
|
to: None,
|
||||||
|
r#type: None,
|
||||||
|
lang: None,
|
||||||
|
show: value.show.map(|show| match show {
|
||||||
|
Show::Away => stanza::client::presence::Show::Away,
|
||||||
|
Show::Chat => stanza::client::presence::Show::Chat,
|
||||||
|
Show::DoNotDisturb => stanza::client::presence::Show::Dnd,
|
||||||
|
Show::ExtendedAway => stanza::client::presence::Show::Xa,
|
||||||
|
}),
|
||||||
|
// TODO: enforce message length in status message
|
||||||
|
status: value.status.map(|status| stanza::client::presence::Status {
|
||||||
|
lang: None,
|
||||||
|
status: String1024(status),
|
||||||
|
}),
|
||||||
|
priority: value
|
||||||
|
.priority
|
||||||
|
.map(|priority| stanza::client::presence::Priority(priority)),
|
||||||
|
errors: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -3,10 +3,9 @@ use std::collections::HashSet;
|
||||||
use jid::JID;
|
use jid::JID;
|
||||||
use sqlx::Sqlite;
|
use sqlx::Sqlite;
|
||||||
|
|
||||||
pub enum ContactUpdate {
|
pub struct ContactUpdate {
|
||||||
Name(Option<String>),
|
pub name: Option<String>,
|
||||||
AddToGroup(String),
|
pub groups: HashSet<String>,
|
||||||
RemoveFromGroup(String),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, sqlx::FromRow, Clone)]
|
#[derive(Debug, sqlx::FromRow, Clone)]
|
||||||
|
|
|
@ -10,16 +10,16 @@ use super::XMLNS;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
from: Option<JID>,
|
pub from: Option<JID>,
|
||||||
id: Option<String>,
|
pub id: Option<String>,
|
||||||
to: Option<JID>,
|
pub to: Option<JID>,
|
||||||
// can be omitted, if so default to normal
|
// can be omitted, if so default to normal
|
||||||
r#type: MessageType,
|
pub r#type: MessageType,
|
||||||
lang: Option<String>,
|
pub lang: Option<String>,
|
||||||
// children
|
// children
|
||||||
subject: Option<Subject>,
|
pub subject: Option<Subject>,
|
||||||
body: Option<Body>,
|
pub body: Option<Body>,
|
||||||
thread: Option<Thread>,
|
pub thread: Option<Thread>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromElement for Message {
|
impl FromElement for Message {
|
||||||
|
@ -109,8 +109,8 @@ impl ToString for MessageType {
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Body {
|
pub struct Body {
|
||||||
lang: Option<String>,
|
pub lang: Option<String>,
|
||||||
body: Option<String>,
|
pub body: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromElement for Body {
|
impl FromElement for Body {
|
||||||
|
|
|
@ -10,18 +10,18 @@ use super::{error::Error, XMLNS};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Presence {
|
pub struct Presence {
|
||||||
from: Option<JID>,
|
pub from: Option<JID>,
|
||||||
id: Option<String>,
|
pub id: Option<String>,
|
||||||
to: Option<JID>,
|
pub to: Option<JID>,
|
||||||
r#type: Option<PresenceType>,
|
pub r#type: Option<PresenceType>,
|
||||||
lang: Option<String>,
|
pub lang: Option<String>,
|
||||||
// children
|
// children
|
||||||
show: Option<Show>,
|
pub show: Option<Show>,
|
||||||
status: Option<Status>,
|
pub status: Option<Status>,
|
||||||
priority: Option<Priority>,
|
pub priority: Option<Priority>,
|
||||||
// TODO: ##other
|
// TODO: ##other
|
||||||
// other: Vec<Other>,
|
// other: Vec<Other>,
|
||||||
errors: Vec<Error>,
|
pub errors: Vec<Error>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromElement for Presence {
|
impl FromElement for Presence {
|
||||||
|
@ -165,8 +165,8 @@ impl ToString for Show {
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Status {
|
pub struct Status {
|
||||||
lang: Option<String>,
|
pub lang: Option<String>,
|
||||||
status: String1024,
|
pub status: String1024,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromElement for Status {
|
impl FromElement for Status {
|
||||||
|
|
Loading…
Reference in New Issue