Compare commits
No commits in common. "a1d96233e816e2b8378a629c6cc9e34028f2435b" and "e6c97ab82880ad4cd12b05bc1c8f2a0a3413735c" have entirely different histories.
a1d96233e8
...
e6c97ab828
|
@ -22,16 +22,71 @@ use crate::{
|
||||||
Connection, Error, JabberStream, Result, JID,
|
Connection, Error, JabberStream, Result, JID,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn connect_and_login(
|
// feed it client stanzas, receive client stanzas
|
||||||
mut jid: &mut JID,
|
pub struct JabberClient {
|
||||||
password: impl AsRef<str>,
|
connection: Option<BoundJabberStream<Tls>>,
|
||||||
server: &mut String,
|
jid: JID,
|
||||||
) -> Result<BoundJabberStream<Tls>> {
|
// TODO: have reconnection be handled by another part, so creds don't need to be stored in object
|
||||||
let auth = SASLConfig::with_credentials(
|
password: Arc<SASLConfig>,
|
||||||
|
server: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JabberClient {
|
||||||
|
pub fn new(
|
||||||
|
jid: impl TryInto<JID, Error = ParseError>,
|
||||||
|
password: impl ToString,
|
||||||
|
) -> Result<JabberClient> {
|
||||||
|
let jid = jid.try_into()?;
|
||||||
|
let sasl_config = SASLConfig::with_credentials(
|
||||||
None,
|
None,
|
||||||
jid.localpart.clone().ok_or(Error::NoLocalpart)?,
|
jid.localpart.clone().ok_or(Error::NoLocalpart)?,
|
||||||
password.as_ref().to_string(),
|
password.to_string(),
|
||||||
)?;
|
)?;
|
||||||
|
Ok(JabberClient {
|
||||||
|
connection: None,
|
||||||
|
jid: jid.clone(),
|
||||||
|
password: sasl_config,
|
||||||
|
server: jid.domainpart,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn jid(&self) -> JID {
|
||||||
|
self.jid.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect(&mut self) -> Result<()> {
|
||||||
|
match &self.connection {
|
||||||
|
Some(_) => Ok(()),
|
||||||
|
None => {
|
||||||
|
self.connection = Some(
|
||||||
|
connect_and_login(&mut self.jid, self.password.clone(), &mut self.server)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn into_inner(self) -> Result<BoundJabberStream<Tls>> {
|
||||||
|
self.connection.ok_or(Error::Disconnected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> {
|
||||||
|
// match &mut self.connection {
|
||||||
|
// ConnectionState::Disconnected => return Err(Error::Disconnected),
|
||||||
|
// ConnectionState::Connecting(_connecting) => return Err(Error::Connecting),
|
||||||
|
// ConnectionState::Connected(jabber_stream) => {
|
||||||
|
// Ok(jabber_stream.send_stanza(stanza).await?)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect_and_login(
|
||||||
|
jid: &mut JID,
|
||||||
|
auth: Arc<SASLConfig>,
|
||||||
|
server: &mut String,
|
||||||
|
) -> Result<BoundJabberStream<Tls>> {
|
||||||
let mut conn_state = Connecting::start(&server).await?;
|
let mut conn_state = Connecting::start(&server).await?;
|
||||||
loop {
|
loop {
|
||||||
match conn_state {
|
match conn_state {
|
||||||
|
@ -122,8 +177,8 @@ pub enum InsecureConnecting {
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
|
use super::JabberClient;
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use jid::JID;
|
|
||||||
use stanza::{
|
use stanza::{
|
||||||
client::{
|
client::{
|
||||||
iq::{Iq, IqType, Query},
|
iq::{Iq, IqType, Query},
|
||||||
|
@ -135,25 +190,21 @@ mod tests {
|
||||||
use tokio::{sync::Mutex, time::sleep};
|
use tokio::{sync::Mutex, time::sleep};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use super::connect_and_login;
|
|
||||||
|
|
||||||
#[test(tokio::test)]
|
#[test(tokio::test)]
|
||||||
async fn login() {
|
async fn login() {
|
||||||
let mut jid: JID = "test@blos.sm".try_into().unwrap();
|
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||||
let client = connect_and_login(&mut jid, "slayed", &mut "blos.sm".to_string())
|
client.connect().await.unwrap();
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
sleep(Duration::from_secs(5)).await
|
sleep(Duration::from_secs(5)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test(tokio::test)]
|
#[test(tokio::test)]
|
||||||
async fn ping_parallel() {
|
async fn ping_parallel() {
|
||||||
let mut jid: JID = "test@blos.sm".try_into().unwrap();
|
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||||
let mut server = "blos.sm".to_string();
|
client.connect().await.unwrap();
|
||||||
let client = connect_and_login(&mut jid, "slayed", &mut server)
|
sleep(Duration::from_secs(5)).await;
|
||||||
.await
|
let jid = client.jid.clone();
|
||||||
.unwrap();
|
let server = client.server.clone();
|
||||||
let (mut read, mut write) = client.split();
|
let (mut read, mut write) = client.into_inner().unwrap().split();
|
||||||
|
|
||||||
tokio::join!(
|
tokio::join!(
|
||||||
async {
|
async {
|
||||||
|
|
|
@ -415,7 +415,7 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{connection::Connection};
|
use crate::{client::JabberClient, connection::Connection};
|
||||||
use futures::sink;
|
use futures::sink;
|
||||||
use test_log::test;
|
use test_log::test;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
@ -468,8 +468,8 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn sink() {
|
async fn sink() {
|
||||||
// let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||||
// client.connect().await.unwrap();
|
client.connect().await.unwrap();
|
||||||
// let stream = client.inner().unwrap();
|
// let stream = client.inner().unwrap();
|
||||||
// let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
|
// let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
|
||||||
// stream.writer.write(&stanza).await?;
|
// stream.writer.write(&stanza).await?;
|
||||||
|
|
|
@ -15,7 +15,15 @@ pub use jid::JID;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
pub use client::connect_and_login;
|
pub async fn login<J: AsRef<str>, P: AsRef<str>>(jid: J, password: P) -> Result<JabberStream<Tls>> {
|
||||||
|
todo!()
|
||||||
|
// Ok(Connection::connect_user(jid, password.as_ref().to_string())
|
||||||
|
// .await?
|
||||||
|
// .ensure_tls()
|
||||||
|
// .await?
|
||||||
|
// .negotiate()
|
||||||
|
// .await?)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
|
@ -6,9 +6,6 @@ edition = "2021"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
jabber = { version = "0.1.0", path = "../jabber" }
|
jabber = { version = "0.1.0", path = "../jabber" }
|
||||||
peanuts = { version = "0.1.0", path = "../../peanuts" }
|
|
||||||
jid = { version = "0.1.0", path = "../jid" }
|
|
||||||
sqlx = { version = "0.8.3", features = [ "sqlite", "runtime-tokio" ] }
|
|
||||||
stanza = { version = "0.1.0", path = "../stanza" }
|
stanza = { version = "0.1.0", path = "../stanza" }
|
||||||
tokio = "1.42.0"
|
tokio = "1.42.0"
|
||||||
tokio-stream = "0.1.17"
|
tokio-stream = "0.1.17"
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
CREATE TABLE roster(
|
|
||||||
id INTEGER PRIMARY KEY,
|
|
||||||
jid TEXT NOT NULL,
|
|
||||||
nickname TEXT,
|
|
||||||
);
|
|
643
luz/src/lib.rs
643
luz/src/lib.rs
|
@ -1,21 +1,15 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet, VecDeque},
|
collections::{HashMap, HashSet, VecDeque},
|
||||||
fmt::Pointer,
|
|
||||||
pin::pin,
|
pin::pin,
|
||||||
sync::{atomic::AtomicBool, Arc},
|
|
||||||
task::{ready, Poll},
|
task::{ready, Poll},
|
||||||
|
thread::JoinHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
stream::{SplitSink, SplitStream},
|
stream::{SplitSink, SplitStream},
|
||||||
Sink, SinkExt, Stream, StreamExt,
|
Sink, SinkExt, Stream, StreamExt,
|
||||||
};
|
};
|
||||||
use jabber::{
|
use jabber::{client::JabberClient, JID};
|
||||||
connection::Tls,
|
|
||||||
jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter},
|
|
||||||
JID,
|
|
||||||
};
|
|
||||||
use sqlx::{query, Pool, Sqlite, SqlitePool};
|
|
||||||
use stanza::{
|
use stanza::{
|
||||||
client::{
|
client::{
|
||||||
iq::{Iq, IqType, Query},
|
iq::{Iq, IqType, Query},
|
||||||
|
@ -23,545 +17,58 @@ use stanza::{
|
||||||
},
|
},
|
||||||
roster,
|
roster,
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::sync::mpsc;
|
||||||
io::AsyncRead,
|
|
||||||
select,
|
|
||||||
sync::{
|
|
||||||
mpsc::{self, Receiver, Sender},
|
|
||||||
oneshot, Mutex,
|
|
||||||
},
|
|
||||||
task::{JoinHandle, JoinSet},
|
|
||||||
};
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tokio_util::sync::{PollSendError, PollSender};
|
use tokio_util::sync::{PollSendError, PollSender};
|
||||||
|
|
||||||
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
|
pub struct Client {
|
||||||
pub struct JabberWriter {
|
client: JabberClient,
|
||||||
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
pending_iqs: HashMap<String, mpsc::Sender<Iq>>,
|
||||||
control_receiver: mpsc::Receiver<JabberWriterControl>,
|
// database connection (sqlite)
|
||||||
stream: BoundJabberWriter<Tls>,
|
receiver: ReceiverStream<UpdateMessage>,
|
||||||
on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
sender: PollSender<CommandMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct JabberWrite {
|
impl Client {
|
||||||
stanza: Stanza,
|
pub async fn new(jid: String, password: &str) -> Result<Self, Error> {
|
||||||
respond_to: oneshot::Sender<Result<(), Error>>,
|
let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
|
||||||
}
|
let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
|
||||||
|
let mut jabber_client = JabberClient::new(jid, password)?;
|
||||||
enum JabberWriterControl {
|
jabber_client.connect().await?;
|
||||||
Disconnect,
|
let (write, read) = jabber_client.split();
|
||||||
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
|
let client = Self {
|
||||||
}
|
client: jabber_client,
|
||||||
|
receiver: ReceiverStream::new(read_receiver),
|
||||||
impl JabberWriter {
|
sender: PollSender::new(write_sender),
|
||||||
fn new(
|
pending_iqs: HashMap::new(),
|
||||||
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
|
||||||
control_receiver: mpsc::Receiver<JabberWriterControl>,
|
|
||||||
stream: BoundJabberWriter<Tls>,
|
|
||||||
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
stanza_receiver,
|
|
||||||
control_receiver,
|
|
||||||
stream,
|
|
||||||
on_crash: supervisor,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
|
|
||||||
Ok(self.stream.write(stanza).await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(mut self) {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
Some(msg) = self.control_receiver.recv() => {
|
|
||||||
match msg {
|
|
||||||
JabberWriterControl::Disconnect => {
|
|
||||||
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
|
|
||||||
self.stanza_receiver.close();
|
|
||||||
// TODO: put this in some kind of function to avoid code duplication
|
|
||||||
for msg in self.stanza_receiver.recv().await {
|
|
||||||
let result = self.stream.write(&msg.stanza).await;
|
|
||||||
match result {
|
|
||||||
Err(e) => match &e {
|
|
||||||
peanuts::Error::ReadError(error) => {
|
|
||||||
// make sure message is not lost from error, supervisor handles retry and reporting
|
|
||||||
self.on_crash.send((msg, self.stanza_receiver));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
msg.respond_to.send(Err(e.into()));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
msg.respond_to.send(Ok(()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.stream.try_close().await;
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
JabberWriterControl::Abort(sender) => {
|
|
||||||
sender.send(self.stanza_receiver);
|
|
||||||
break;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Some(msg) = self.stanza_receiver.recv() => {
|
|
||||||
let result = self.stream.write(&msg.stanza).await;
|
|
||||||
match result {
|
|
||||||
Err(e) => match &e {
|
|
||||||
peanuts::Error::ReadError(error) => {
|
|
||||||
// make sure message is not lost from error, supervisor handles retry and reporting
|
|
||||||
self.on_crash.send((msg, self.stanza_receiver));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
msg.respond_to.send(Err(e.into()));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
msg.respond_to.send(Ok(()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// TODO: check if this is ok to do
|
|
||||||
else => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct JabberWriteHandle {
|
|
||||||
sender: mpsc::Sender<JabberWrite>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct JabberWriterControlHandle {
|
|
||||||
sender: mpsc::Sender<JabberWriterControl>,
|
|
||||||
handle: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JabberWriterControlHandle {
|
|
||||||
pub fn new(
|
|
||||||
stream: BoundJabberWriter<Tls>,
|
|
||||||
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
|
||||||
) -> (JabberWriteHandle, JabberWriterControlHandle) {
|
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
|
||||||
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
|
|
||||||
|
|
||||||
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
|
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
|
||||||
|
|
||||||
(
|
|
||||||
JabberWriteHandle {
|
|
||||||
sender: stanza_sender,
|
|
||||||
},
|
|
||||||
Self {
|
|
||||||
sender: control_sender,
|
|
||||||
handle,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn reconnect(
|
|
||||||
stream: BoundJabberWriter<Tls>,
|
|
||||||
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
|
||||||
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
|
||||||
) -> Self {
|
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
|
||||||
|
|
||||||
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
|
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
|
||||||
|
|
||||||
Self {
|
|
||||||
sender: control_sender,
|
|
||||||
handle,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct JabberReader {
|
|
||||||
// TODO: place iq hashmap here
|
|
||||||
control_receiver: mpsc::Receiver<JabberReaderControl>,
|
|
||||||
stream: BoundJabberReader<Tls>,
|
|
||||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
|
||||||
write_sender: mpsc::Sender<JabberWrite>,
|
|
||||||
tasks: JoinSet<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JabberReader {
|
|
||||||
fn new(
|
|
||||||
control_receiver: mpsc::Receiver<JabberReaderControl>,
|
|
||||||
stream: BoundJabberReader<Tls>,
|
|
||||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
|
|
||||||
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
|
||||||
write_sender: mpsc::Sender<JabberWrite>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
control_receiver,
|
|
||||||
stream,
|
|
||||||
on_crash,
|
|
||||||
db,
|
|
||||||
sender,
|
|
||||||
supervisor_control,
|
|
||||||
write_sender,
|
|
||||||
tasks: JoinSet::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(mut self) {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
Some(msg) = self.control_receiver.recv() => {
|
|
||||||
match msg {
|
|
||||||
JabberReaderControl::Disconnect => todo!(),
|
|
||||||
JabberReaderControl::Abort(sender) => todo!(),
|
|
||||||
};
|
};
|
||||||
},
|
tokio::spawn(client.process_read(read, read_sender));
|
||||||
stanza = self.stream.read::<Stanza>() => {
|
tokio::spawn(client.process_write(write, write_receiver));
|
||||||
match stanza {
|
Ok(client)
|
||||||
Ok(_) => todo!(),
|
|
||||||
Err(_) => todo!(),
|
|
||||||
}
|
|
||||||
self.tasks.spawn();
|
|
||||||
},
|
|
||||||
else => break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Task {
|
pub async fn process_read(
|
||||||
async fn handle();
|
&self,
|
||||||
}
|
mut stream: SplitStream<JabberClient>,
|
||||||
|
|
||||||
impl Task for Stanza {
|
|
||||||
async fn handle() {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum JabberReaderControl {
|
|
||||||
Disconnect,
|
|
||||||
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
|
|
||||||
}
|
|
||||||
|
|
||||||
struct JabberReaderControlHandle {
|
|
||||||
sender: mpsc::Sender<JabberReaderControl>,
|
|
||||||
handle: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JabberReaderControlHandle {
|
|
||||||
pub fn new(
|
|
||||||
stream: BoundJabberReader<Tls>,
|
|
||||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
|
||||||
jabber_write: mpsc::Sender<JabberWrite>,
|
|
||||||
) -> Self {
|
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
|
||||||
|
|
||||||
let actor = JabberReader::new(
|
|
||||||
control_receiver,
|
|
||||||
stream,
|
|
||||||
on_crash,
|
|
||||||
db,
|
|
||||||
sender,
|
|
||||||
supervisor_control,
|
|
||||||
jabber_write,
|
|
||||||
);
|
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
|
||||||
|
|
||||||
Self {
|
|
||||||
sender: control_sender,
|
|
||||||
handle,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Luz {
|
|
||||||
receiver: mpsc::Receiver<CommandMessage>,
|
|
||||||
jid: Arc<Mutex<JID>>,
|
|
||||||
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
|
||||||
password: String,
|
|
||||||
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
tasks: JoinSet<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Luz {
|
|
||||||
fn new(
|
|
||||||
receiver: mpsc::Receiver<CommandMessage>,
|
|
||||||
jid: Arc<Mutex<JID>>,
|
|
||||||
password: String,
|
|
||||||
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
jid,
|
|
||||||
password,
|
|
||||||
connected,
|
|
||||||
db,
|
|
||||||
receiver,
|
|
||||||
sender,
|
|
||||||
tasks: JoinSet::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(mut self) {
|
|
||||||
while let Some(msg) = self.receiver.recv().await {
|
|
||||||
// TODO: consider separating disconnect/connect and commands apart from commandmessage
|
|
||||||
match msg {
|
|
||||||
CommandMessage::Connect => {
|
|
||||||
match self.connected.lock().await.as_ref() {
|
|
||||||
Some(_) => {
|
|
||||||
self.sender
|
|
||||||
.send(UpdateMessage::Error(Error::AlreadyConnected))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
let mut jid = self.jid.lock().await;
|
|
||||||
let mut domain = jid.domainpart.clone();
|
|
||||||
let streams_result =
|
|
||||||
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
|
|
||||||
.await;
|
|
||||||
match streams_result {
|
|
||||||
Ok(s) => {
|
|
||||||
let (writer, supervisor) =
|
|
||||||
JabberSupervisorHandle::new(s, self.sender.clone());
|
|
||||||
*self.connected.lock().await = Some((writer, supervisor));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
self.sender.send(UpdateMessage::Error(e.into()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
|
|
||||||
None => {
|
|
||||||
self.sender
|
|
||||||
.send(UpdateMessage::Error(Error::AlreadyDisonnected))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
mut c => {
|
|
||||||
if let Some((_write_handle, supervisor_handle)) = c.take() {
|
|
||||||
let _ = supervisor_handle
|
|
||||||
.sender
|
|
||||||
.send(JabberSupervisorCommand::Disconnect)
|
|
||||||
.await;
|
|
||||||
} else {
|
|
||||||
unreachable!()
|
|
||||||
};
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
match self.connected.lock().await.as_ref() {
|
|
||||||
Some((w, _)) => self.tasks.spawn(msg.handle_online(
|
|
||||||
w.clone(),
|
|
||||||
self.jid.clone(),
|
|
||||||
self.db.clone(),
|
|
||||||
self.sender.clone(),
|
|
||||||
)),
|
|
||||||
None => self.tasks.spawn(msg.handle_offline(
|
|
||||||
self.jid.clone(),
|
|
||||||
self.db.clone(),
|
|
||||||
self.sender.clone(),
|
|
||||||
)),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CommandMessage {
|
|
||||||
pub async fn handle_offline(
|
|
||||||
mut self,
|
|
||||||
jid: Arc<Mutex<JID>>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
) {
|
) {
|
||||||
todo!()
|
for stanza in stream.next().await {
|
||||||
|
tokio::spawn(self.process_stanza(stanza, sender.clone()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_online(
|
pub async fn process_write(
|
||||||
mut self,
|
&self,
|
||||||
jabber_write_handle: JabberWriteHandle,
|
mut sink: SplitSink<JabberClient, Stanza>,
|
||||||
// TODO: jid could lose resource by the end
|
receiver: mpsc::Receiver<CommandMessage>,
|
||||||
jid: Arc<Mutex<JID>>,
|
|
||||||
db: SqlitePool,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
) {
|
) {
|
||||||
todo!()
|
for message in receiver.recv_many(, )
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
|
|
||||||
pub struct LuzHandle {
|
|
||||||
sender: mpsc::Sender<CommandMessage>,
|
|
||||||
receiver: mpsc::Receiver<UpdateMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LuzHandle {
|
|
||||||
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
|
|
||||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
|
||||||
let (update_sender, update_receiver) = mpsc::channel(20);
|
|
||||||
|
|
||||||
let actor = Luz::new(
|
|
||||||
command_receiver,
|
|
||||||
Arc::new(Mutex::new(jid)),
|
|
||||||
password,
|
|
||||||
Arc::new(Mutex::new(None)),
|
|
||||||
db,
|
|
||||||
update_sender,
|
|
||||||
);
|
|
||||||
tokio::spawn(async move { actor.run().await });
|
|
||||||
|
|
||||||
Self {
|
|
||||||
sender: command_sender,
|
|
||||||
receiver: update_receiver,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct JabberSupervisor {
|
|
||||||
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
|
|
||||||
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
|
||||||
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
writer_handle: JabberWriterControlHandle,
|
|
||||||
reader_handle: JabberReaderControlHandle,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum JabberSupervisorCommand {
|
|
||||||
Disconnect,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JabberSupervisor {
|
|
||||||
fn new(
|
|
||||||
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
|
|
||||||
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
|
||||||
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
writer_handle: JabberWriterControlHandle,
|
|
||||||
reader_handle: JabberReaderControlHandle,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
connection_commands,
|
|
||||||
writer_crash,
|
|
||||||
sender,
|
|
||||||
writer_handle,
|
|
||||||
reader_handle,
|
|
||||||
reader_crash,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
|
|
||||||
|
|
||||||
async fn run(mut self) {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
Some(msg) = self.connection_commands.recv() => {
|
|
||||||
self.handle_command_message(msg).await;
|
|
||||||
},
|
|
||||||
error = self.writer_crash => {
|
|
||||||
|
|
||||||
},
|
|
||||||
error = self.reader_crash => {
|
|
||||||
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct JabberSupervisorHandle {
|
|
||||||
sender: mpsc::Sender<JabberSupervisorCommand>,
|
|
||||||
handle: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JabberSupervisorHandle {
|
|
||||||
pub fn new(
|
|
||||||
streams: BoundJabberStream<Tls>,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
db: SqlitePool,
|
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
) -> (JabberWriteHandle, Self) {
|
|
||||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
|
||||||
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
|
||||||
let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
|
|
||||||
|
|
||||||
let (reader, writer) = streams.split();
|
|
||||||
let (jabber_write_handle, jabber_writer_control_handle) =
|
|
||||||
JabberWriterControlHandle::new(writer, writer_error_sender);
|
|
||||||
let jabber_reader_control_handle = JabberReaderControlHandle::new(
|
|
||||||
reader,
|
|
||||||
reader_crash_sender,
|
|
||||||
db,
|
|
||||||
update_sender,
|
|
||||||
command_sender.clone(),
|
|
||||||
jabber_write_handle.sender.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let actor = JabberSupervisor::new(
|
|
||||||
command_receiver,
|
|
||||||
writer_error_receiver,
|
|
||||||
reader_crash_receiver,
|
|
||||||
sender,
|
|
||||||
jabber_writer_control_handle,
|
|
||||||
jabber_reader_control_handle,
|
|
||||||
);
|
|
||||||
|
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
|
||||||
|
|
||||||
(
|
|
||||||
jabber_write_handle,
|
|
||||||
Self {
|
|
||||||
sender: command_sender,
|
|
||||||
handle,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
AlreadyConnected,
|
|
||||||
PollSend(PollSendError<CommandMessage>),
|
PollSend(PollSendError<CommandMessage>),
|
||||||
Jabber(jabber::Error),
|
Jabber(jabber::Error),
|
||||||
XML(peanuts::Error),
|
|
||||||
SQL(sqlx::Error),
|
|
||||||
JID(jid::ParseError),
|
|
||||||
AlreadyDisonnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<peanuts::Error> for Error {
|
|
||||||
fn from(e: peanuts::Error) -> Self {
|
|
||||||
Self::XML(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<jid::ParseError> for Error {
|
|
||||||
fn from(e: jid::ParseError) -> Self {
|
|
||||||
Self::JID(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<sqlx::Error> for Error {
|
|
||||||
fn from(e: sqlx::Error) -> Self {
|
|
||||||
Self::SQL(e)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<jabber::Error> for Error {
|
impl From<jabber::Error> for Error {
|
||||||
|
@ -570,6 +77,46 @@ impl From<jabber::Error> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Stream for Client {
|
||||||
|
type Item = UpdateMessage;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Option<Self::Item>> {
|
||||||
|
pin!(self).receiver.poll_next_unpin(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sink<CommandMessage> for Client {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn poll_ready(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> Poll<Result<(), Self::Error>> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_close(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> Poll<Result<(), Self::Error>> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<PollSendError<CommandMessage>> for Error {
|
impl From<PollSendError<CommandMessage>> for Error {
|
||||||
fn from(e: PollSendError<CommandMessage>) -> Self {
|
fn from(e: PollSendError<CommandMessage>) -> Self {
|
||||||
Self::PollSend(e)
|
Self::PollSend(e)
|
||||||
|
@ -578,13 +125,47 @@ impl From<PollSendError<CommandMessage>> for Error {
|
||||||
|
|
||||||
pub enum CommandMessage {
|
pub enum CommandMessage {
|
||||||
Connect,
|
Connect,
|
||||||
Disconnect,
|
|
||||||
/// gets the roster. if offline, retreives cached version from database. should be stored in application memory.
|
|
||||||
GetRoster,
|
GetRoster,
|
||||||
SendMessage(JID, String),
|
SendMessage(JID, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum UpdateMessage {
|
pub enum UpdateMessage {
|
||||||
Error(Error),
|
|
||||||
Roster(Vec<roster::Item>),
|
Roster(Vec<roster::Item>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
pub async fn process_stanza(
|
||||||
|
&mut self,
|
||||||
|
stanza: Result<Stanza, jabber::Error>,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
) {
|
||||||
|
match stanza {
|
||||||
|
Ok(stanza) => todo!(),
|
||||||
|
Err(e) => self.process_error(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn iq(
|
||||||
|
&mut self,
|
||||||
|
to: Option<JID>,
|
||||||
|
r#type: IqType,
|
||||||
|
query: Option<Query>,
|
||||||
|
) -> Result<IqResponse, Error> {
|
||||||
|
self.client
|
||||||
|
.send(Stanza::Iq(Iq {
|
||||||
|
from: Some(self.client.jid()),
|
||||||
|
// TODO: generate id
|
||||||
|
id: "test".to_string(),
|
||||||
|
to,
|
||||||
|
r#type,
|
||||||
|
// TODO: lang
|
||||||
|
lang: None,
|
||||||
|
query,
|
||||||
|
errors: Vec::new(),
|
||||||
|
}))
|
||||||
|
.await?;
|
||||||
|
Ok(todo!())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn iq_process(&mut self, iq: Iq) {}
|
||||||
|
}
|
||||||
|
|
|
@ -10,8 +10,8 @@ pub const XMLNS: &str = "jabber:iq:roster";
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Query {
|
pub struct Query {
|
||||||
pub ver: Option<String>,
|
ver: Option<String>,
|
||||||
pub items: Vec<Item>,
|
items: Vec<Item>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromElement for Query {
|
impl FromElement for Query {
|
||||||
|
@ -36,16 +36,11 @@ impl IntoElement for Query {
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Item {
|
pub struct Item {
|
||||||
/// signals subscription pre-approval (server only)
|
|
||||||
approved: Option<bool>,
|
approved: Option<bool>,
|
||||||
/// signals subscription sub-states (server only)
|
|
||||||
ask: bool,
|
ask: bool,
|
||||||
/// uniquely identifies item
|
|
||||||
jid: JID,
|
jid: JID,
|
||||||
/// handle that is determined by user, not contact
|
|
||||||
name: Option<String>,
|
name: Option<String>,
|
||||||
/// state of the presence subscription
|
subscription: Subscription,
|
||||||
subscription: Option<Subscription>,
|
|
||||||
groups: Vec<Group>,
|
groups: Vec<Group>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,7 +63,7 @@ impl FromElement for Item {
|
||||||
};
|
};
|
||||||
let jid = element.attribute("jid")?;
|
let jid = element.attribute("jid")?;
|
||||||
let name = element.attribute_opt("name")?;
|
let name = element.attribute_opt("name")?;
|
||||||
let subscription = element.attribute_opt("subscription")?;
|
let subscription = element.attribute_opt("subscription")?.unwrap_or_default();
|
||||||
let groups = element.pop_children()?;
|
let groups = element.pop_children()?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
@ -96,7 +91,7 @@ impl IntoElement for Item {
|
||||||
)
|
)
|
||||||
.push_attribute("jid", self.jid.clone())
|
.push_attribute("jid", self.jid.clone())
|
||||||
.push_attribute_opt("name", self.name.clone())
|
.push_attribute_opt("name", self.name.clone())
|
||||||
.push_attribute_opt("subscription", self.subscription)
|
.push_attribute("subscription", self.subscription)
|
||||||
.push_children(self.groups.clone())
|
.push_children(self.groups.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,7 +134,6 @@ impl ToString for Subscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
// TODO: check if should be option or not
|
|
||||||
pub struct Group(Option<String>);
|
pub struct Group(Option<String>);
|
||||||
|
|
||||||
impl FromElement for Group {
|
impl FromElement for Group {
|
||||||
|
|
Loading…
Reference in New Issue