diff --git a/README.md b/README.md index 63094ae..b40173a 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ - [x] rfc 7590: tls - [x] xep-0368: srv records for xmpp over tls - [ ] server side downgrade protection for sasl +- [x] xep-0199: xmpp ping - [ ] xep-0030: service discovery - [ ] xep-0115: entity capabilities - [ ] xep-0163: pep diff --git a/jabber/src/client.rs b/jabber/src/client.rs index f5d5dc7..2e59d98 100644 --- a/jabber/src/client.rs +++ b/jabber/src/client.rs @@ -26,6 +26,7 @@ use crate::{ pub struct JabberClient { connection: ConnectionState, jid: JID, + // TODO: have reconnection be handled by another part, so creds don't need to be stored in object password: Arc, server: String, } @@ -49,6 +50,10 @@ impl JabberClient { }) } + pub fn jid(&self) -> JID { + self.jid.clone() + } + pub async fn connect(&mut self) -> Result<()> { match &self.connection { ConnectionState::Disconnected => { diff --git a/jabber/src/jabber_stream/bound_stream.rs b/jabber/src/jabber_stream/bound_stream.rs index c0d67b0..627158a 100644 --- a/jabber/src/jabber_stream/bound_stream.rs +++ b/jabber/src/jabber_stream/bound_stream.rs @@ -63,6 +63,7 @@ where if let Some(_write_handle) = this.write_handle { panic!("start_send called without poll_ready") } else { + // TODO: switch to buffer of one rather than thread spawning and joining *this.write_handle = Some(tokio::spawn(write(this.writer.clone(), item))); Ok(()) } diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 11d4197..646d8e8 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -4,3 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] +futures = "0.3.31" +jabber = { version = "0.1.0", path = "../jabber" } +stanza = { version = "0.1.0", path = "../stanza" } +tokio = "1.42.0" +tokio-stream = "0.1.17" +tokio-util = "0.7.13" diff --git a/luz/src/lib.rs b/luz/src/lib.rs new file mode 100644 index 0000000..1a750a3 --- /dev/null +++ b/luz/src/lib.rs @@ -0,0 +1,171 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, + pin::pin, + task::{ready, Poll}, + thread::JoinHandle, +}; + +use futures::{ + stream::{SplitSink, SplitStream}, + Sink, SinkExt, Stream, StreamExt, +}; +use jabber::{client::JabberClient, JID}; +use stanza::{ + client::{ + iq::{Iq, IqType, Query}, + Stanza, + }, + roster, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::{PollSendError, PollSender}; + +pub struct Client { + client: JabberClient, + pending_iqs: HashMap>, + // database connection (sqlite) + receiver: ReceiverStream, + sender: PollSender, +} + +impl Client { + pub async fn new(jid: String, password: &str) -> Result { + let (read_sender, read_receiver) = mpsc::channel::(20); + let (write_sender, write_receiver) = mpsc::channel::(20); + let mut jabber_client = JabberClient::new(jid, password)?; + jabber_client.connect().await?; + let (write, read) = jabber_client.split(); + let client = Self { + client: jabber_client, + receiver: ReceiverStream::new(read_receiver), + sender: PollSender::new(write_sender), + pending_iqs: HashMap::new(), + }; + tokio::spawn(client.process_read(read, read_sender)); + tokio::spawn(client.process_write(write, write_receiver)); + Ok(client) + } + + pub async fn process_read( + &self, + mut stream: SplitStream, + sender: mpsc::Sender, + ) { + for stanza in stream.next().await { + tokio::spawn(self.process_stanza(stanza, sender.clone())); + } + } + + pub async fn process_write( + &self, + mut sink: SplitSink, + receiver: mpsc::Receiver, + ) { + for message in receiver.recv_many(, ) + } +} + +pub enum Error { + PollSend(PollSendError), + Jabber(jabber::Error), +} + +impl From for Error { + fn from(e: jabber::Error) -> Self { + Self::Jabber(e) + } +} + +impl Stream for Client { + type Item = UpdateMessage; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + pin!(self).receiver.poll_next_unpin(cx) + } +} + +impl Sink for Client { + type Error = Error; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx))) + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + todo!() + } +} + +impl From> for Error { + fn from(e: PollSendError) -> Self { + Self::PollSend(e) + } +} + +pub enum CommandMessage { + Connect, + GetRoster, + SendMessage(JID, String), +} + +pub enum UpdateMessage { + Roster(Vec), +} + +impl Client { + pub async fn process_stanza( + &mut self, + stanza: Result, + sender: mpsc::Sender, + ) { + match stanza { + Ok(stanza) => todo!(), + Err(e) => self.process_error(e), + } + } + + pub async fn iq( + &mut self, + to: Option, + r#type: IqType, + query: Option, + ) -> Result { + self.client + .send(Stanza::Iq(Iq { + from: Some(self.client.jid()), + // TODO: generate id + id: "test".to_string(), + to, + r#type, + // TODO: lang + lang: None, + query, + errors: Vec::new(), + })) + .await?; + Ok(todo!()) + } + + pub async fn iq_process(&mut self, iq: Iq) {} +} diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs index 6ee80ea..2e87636 100644 --- a/stanza/src/client/iq.rs +++ b/stanza/src/client/iq.rs @@ -9,6 +9,7 @@ use peanuts::{ use crate::{ bind::{self, Bind}, client::error::Error, + roster, xep_0199::{self, Ping}, }; @@ -31,6 +32,7 @@ pub struct Iq { pub enum Query { Bind(Bind), Ping(Ping), + Roster(roster::Query), Unsupported, } @@ -39,6 +41,9 @@ impl FromElement for Query { match element.identify() { (Some(bind::XMLNS), "bind") => Ok(Query::Bind(Bind::from_element(element)?)), (Some(xep_0199::XMLNS), "ping") => Ok(Query::Ping(Ping::from_element(element)?)), + (Some(roster::XMLNS), "query") => { + Ok(Query::Roster(roster::Query::from_element(element)?)) + } _ => Ok(Query::Unsupported), } } @@ -49,6 +54,7 @@ impl IntoElement for Query { match self { Query::Bind(bind) => bind.builder(), Query::Ping(ping) => ping.builder(), + Query::Roster(query) => query.builder(), // TODO: consider what to do if attempt to serialize unsupported Query::Unsupported => todo!(), } diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs index b49fcc3..9209fad 100644 --- a/stanza/src/roster.rs +++ b/stanza/src/roster.rs @@ -8,6 +8,7 @@ use peanuts::{ pub const XMLNS: &str = "jabber:iq:roster"; +#[derive(Debug, Clone)] pub struct Query { ver: Option, items: Vec, @@ -33,7 +34,7 @@ impl IntoElement for Query { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Item { approved: Option, ask: bool, @@ -95,7 +96,7 @@ impl IntoElement for Item { } } -#[derive(Default, Clone, Copy)] +#[derive(Default, Clone, Copy, Debug)] pub enum Subscription { Both, From, @@ -132,7 +133,7 @@ impl ToString for Subscription { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Group(Option); impl FromElement for Group {