Compare commits
3 Commits
e6c97ab828
...
a1d96233e8
Author | SHA1 | Date |
---|---|---|
|
a1d96233e8 | |
|
b023c6b5f2 | |
|
866e134371 |
|
@ -22,71 +22,16 @@ use crate::{
|
||||||
Connection, Error, JabberStream, Result, JID,
|
Connection, Error, JabberStream, Result, JID,
|
||||||
};
|
};
|
||||||
|
|
||||||
// feed it client stanzas, receive client stanzas
|
|
||||||
pub struct JabberClient {
|
|
||||||
connection: Option<BoundJabberStream<Tls>>,
|
|
||||||
jid: JID,
|
|
||||||
// TODO: have reconnection be handled by another part, so creds don't need to be stored in object
|
|
||||||
password: Arc<SASLConfig>,
|
|
||||||
server: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl JabberClient {
|
|
||||||
pub fn new(
|
|
||||||
jid: impl TryInto<JID, Error = ParseError>,
|
|
||||||
password: impl ToString,
|
|
||||||
) -> Result<JabberClient> {
|
|
||||||
let jid = jid.try_into()?;
|
|
||||||
let sasl_config = SASLConfig::with_credentials(
|
|
||||||
None,
|
|
||||||
jid.localpart.clone().ok_or(Error::NoLocalpart)?,
|
|
||||||
password.to_string(),
|
|
||||||
)?;
|
|
||||||
Ok(JabberClient {
|
|
||||||
connection: None,
|
|
||||||
jid: jid.clone(),
|
|
||||||
password: sasl_config,
|
|
||||||
server: jid.domainpart,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn jid(&self) -> JID {
|
|
||||||
self.jid.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn connect(&mut self) -> Result<()> {
|
|
||||||
match &self.connection {
|
|
||||||
Some(_) => Ok(()),
|
|
||||||
None => {
|
|
||||||
self.connection = Some(
|
|
||||||
connect_and_login(&mut self.jid, self.password.clone(), &mut self.server)
|
|
||||||
.await?,
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn into_inner(self) -> Result<BoundJabberStream<Tls>> {
|
|
||||||
self.connection.ok_or(Error::Disconnected)
|
|
||||||
}
|
|
||||||
|
|
||||||
// pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> {
|
|
||||||
// match &mut self.connection {
|
|
||||||
// ConnectionState::Disconnected => return Err(Error::Disconnected),
|
|
||||||
// ConnectionState::Connecting(_connecting) => return Err(Error::Connecting),
|
|
||||||
// ConnectionState::Connected(jabber_stream) => {
|
|
||||||
// Ok(jabber_stream.send_stanza(stanza).await?)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn connect_and_login(
|
pub async fn connect_and_login(
|
||||||
jid: &mut JID,
|
mut jid: &mut JID,
|
||||||
auth: Arc<SASLConfig>,
|
password: impl AsRef<str>,
|
||||||
server: &mut String,
|
server: &mut String,
|
||||||
) -> Result<BoundJabberStream<Tls>> {
|
) -> Result<BoundJabberStream<Tls>> {
|
||||||
|
let auth = SASLConfig::with_credentials(
|
||||||
|
None,
|
||||||
|
jid.localpart.clone().ok_or(Error::NoLocalpart)?,
|
||||||
|
password.as_ref().to_string(),
|
||||||
|
)?;
|
||||||
let mut conn_state = Connecting::start(&server).await?;
|
let mut conn_state = Connecting::start(&server).await?;
|
||||||
loop {
|
loop {
|
||||||
match conn_state {
|
match conn_state {
|
||||||
|
@ -177,8 +122,8 @@ pub enum InsecureConnecting {
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use super::JabberClient;
|
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use jid::JID;
|
||||||
use stanza::{
|
use stanza::{
|
||||||
client::{
|
client::{
|
||||||
iq::{Iq, IqType, Query},
|
iq::{Iq, IqType, Query},
|
||||||
|
@ -190,21 +135,25 @@ mod tests {
|
||||||
use tokio::{sync::Mutex, time::sleep};
|
use tokio::{sync::Mutex, time::sleep};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
|
use super::connect_and_login;
|
||||||
|
|
||||||
#[test(tokio::test)]
|
#[test(tokio::test)]
|
||||||
async fn login() {
|
async fn login() {
|
||||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
let mut jid: JID = "test@blos.sm".try_into().unwrap();
|
||||||
client.connect().await.unwrap();
|
let client = connect_and_login(&mut jid, "slayed", &mut "blos.sm".to_string())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
sleep(Duration::from_secs(5)).await
|
sleep(Duration::from_secs(5)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test(tokio::test)]
|
#[test(tokio::test)]
|
||||||
async fn ping_parallel() {
|
async fn ping_parallel() {
|
||||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
let mut jid: JID = "test@blos.sm".try_into().unwrap();
|
||||||
client.connect().await.unwrap();
|
let mut server = "blos.sm".to_string();
|
||||||
sleep(Duration::from_secs(5)).await;
|
let client = connect_and_login(&mut jid, "slayed", &mut server)
|
||||||
let jid = client.jid.clone();
|
.await
|
||||||
let server = client.server.clone();
|
.unwrap();
|
||||||
let (mut read, mut write) = client.into_inner().unwrap().split();
|
let (mut read, mut write) = client.split();
|
||||||
|
|
||||||
tokio::join!(
|
tokio::join!(
|
||||||
async {
|
async {
|
||||||
|
|
|
@ -415,7 +415,7 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{client::JabberClient, connection::Connection};
|
use crate::{connection::Connection};
|
||||||
use futures::sink;
|
use futures::sink;
|
||||||
use test_log::test;
|
use test_log::test;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
@ -468,8 +468,8 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn sink() {
|
async fn sink() {
|
||||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
// let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||||
client.connect().await.unwrap();
|
// client.connect().await.unwrap();
|
||||||
// let stream = client.inner().unwrap();
|
// let stream = client.inner().unwrap();
|
||||||
// let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
|
// let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
|
||||||
// stream.writer.write(&stanza).await?;
|
// stream.writer.write(&stanza).await?;
|
||||||
|
|
|
@ -15,15 +15,7 @@ pub use jid::JID;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
pub async fn login<J: AsRef<str>, P: AsRef<str>>(jid: J, password: P) -> Result<JabberStream<Tls>> {
|
pub use client::connect_and_login;
|
||||||
todo!()
|
|
||||||
// Ok(Connection::connect_user(jid, password.as_ref().to_string())
|
|
||||||
// .await?
|
|
||||||
// .ensure_tls()
|
|
||||||
// .await?
|
|
||||||
// .negotiate()
|
|
||||||
// .await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
|
@ -6,6 +6,9 @@ edition = "2021"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
jabber = { version = "0.1.0", path = "../jabber" }
|
jabber = { version = "0.1.0", path = "../jabber" }
|
||||||
|
peanuts = { version = "0.1.0", path = "../../peanuts" }
|
||||||
|
jid = { version = "0.1.0", path = "../jid" }
|
||||||
|
sqlx = { version = "0.8.3", features = [ "sqlite", "runtime-tokio" ] }
|
||||||
stanza = { version = "0.1.0", path = "../stanza" }
|
stanza = { version = "0.1.0", path = "../stanza" }
|
||||||
tokio = "1.42.0"
|
tokio = "1.42.0"
|
||||||
tokio-stream = "0.1.17"
|
tokio-stream = "0.1.17"
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
CREATE TABLE roster(
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
jid TEXT NOT NULL,
|
||||||
|
nickname TEXT,
|
||||||
|
);
|
647
luz/src/lib.rs
647
luz/src/lib.rs
|
@ -1,15 +1,21 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet, VecDeque},
|
collections::{HashMap, HashSet, VecDeque},
|
||||||
|
fmt::Pointer,
|
||||||
pin::pin,
|
pin::pin,
|
||||||
|
sync::{atomic::AtomicBool, Arc},
|
||||||
task::{ready, Poll},
|
task::{ready, Poll},
|
||||||
thread::JoinHandle,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
stream::{SplitSink, SplitStream},
|
stream::{SplitSink, SplitStream},
|
||||||
Sink, SinkExt, Stream, StreamExt,
|
Sink, SinkExt, Stream, StreamExt,
|
||||||
};
|
};
|
||||||
use jabber::{client::JabberClient, JID};
|
use jabber::{
|
||||||
|
connection::Tls,
|
||||||
|
jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter},
|
||||||
|
JID,
|
||||||
|
};
|
||||||
|
use sqlx::{query, Pool, Sqlite, SqlitePool};
|
||||||
use stanza::{
|
use stanza::{
|
||||||
client::{
|
client::{
|
||||||
iq::{Iq, IqType, Query},
|
iq::{Iq, IqType, Query},
|
||||||
|
@ -17,58 +23,545 @@ use stanza::{
|
||||||
},
|
},
|
||||||
roster,
|
roster,
|
||||||
};
|
};
|
||||||
use tokio::sync::mpsc;
|
use tokio::{
|
||||||
|
io::AsyncRead,
|
||||||
|
select,
|
||||||
|
sync::{
|
||||||
|
mpsc::{self, Receiver, Sender},
|
||||||
|
oneshot, Mutex,
|
||||||
|
},
|
||||||
|
task::{JoinHandle, JoinSet},
|
||||||
|
};
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tokio_util::sync::{PollSendError, PollSender};
|
use tokio_util::sync::{PollSendError, PollSender};
|
||||||
|
|
||||||
pub struct Client {
|
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
|
||||||
client: JabberClient,
|
pub struct JabberWriter {
|
||||||
pending_iqs: HashMap<String, mpsc::Sender<Iq>>,
|
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
||||||
// database connection (sqlite)
|
control_receiver: mpsc::Receiver<JabberWriterControl>,
|
||||||
receiver: ReceiverStream<UpdateMessage>,
|
stream: BoundJabberWriter<Tls>,
|
||||||
sender: PollSender<CommandMessage>,
|
on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
struct JabberWrite {
|
||||||
pub async fn new(jid: String, password: &str) -> Result<Self, Error> {
|
stanza: Stanza,
|
||||||
let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
|
respond_to: oneshot::Sender<Result<(), Error>>,
|
||||||
let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
|
}
|
||||||
let mut jabber_client = JabberClient::new(jid, password)?;
|
|
||||||
jabber_client.connect().await?;
|
|
||||||
let (write, read) = jabber_client.split();
|
|
||||||
let client = Self {
|
|
||||||
client: jabber_client,
|
|
||||||
receiver: ReceiverStream::new(read_receiver),
|
|
||||||
sender: PollSender::new(write_sender),
|
|
||||||
pending_iqs: HashMap::new(),
|
|
||||||
};
|
|
||||||
tokio::spawn(client.process_read(read, read_sender));
|
|
||||||
tokio::spawn(client.process_write(write, write_receiver));
|
|
||||||
Ok(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn process_read(
|
enum JabberWriterControl {
|
||||||
&self,
|
Disconnect,
|
||||||
mut stream: SplitStream<JabberClient>,
|
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
}
|
||||||
) {
|
|
||||||
for stanza in stream.next().await {
|
impl JabberWriter {
|
||||||
tokio::spawn(self.process_stanza(stanza, sender.clone()));
|
fn new(
|
||||||
|
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
||||||
|
control_receiver: mpsc::Receiver<JabberWriterControl>,
|
||||||
|
stream: BoundJabberWriter<Tls>,
|
||||||
|
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
stanza_receiver,
|
||||||
|
control_receiver,
|
||||||
|
stream,
|
||||||
|
on_crash: supervisor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_write(
|
async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
|
||||||
&self,
|
Ok(self.stream.write(stanza).await?)
|
||||||
mut sink: SplitSink<JabberClient, Stanza>,
|
}
|
||||||
|
|
||||||
|
async fn run(mut self) {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(msg) = self.control_receiver.recv() => {
|
||||||
|
match msg {
|
||||||
|
JabberWriterControl::Disconnect => {
|
||||||
|
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
|
||||||
|
self.stanza_receiver.close();
|
||||||
|
// TODO: put this in some kind of function to avoid code duplication
|
||||||
|
for msg in self.stanza_receiver.recv().await {
|
||||||
|
let result = self.stream.write(&msg.stanza).await;
|
||||||
|
match result {
|
||||||
|
Err(e) => match &e {
|
||||||
|
peanuts::Error::ReadError(error) => {
|
||||||
|
// make sure message is not lost from error, supervisor handles retry and reporting
|
||||||
|
self.on_crash.send((msg, self.stanza_receiver));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
msg.respond_to.send(Err(e.into()));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
msg.respond_to.send(Ok(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.stream.try_close().await;
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
JabberWriterControl::Abort(sender) => {
|
||||||
|
sender.send(self.stanza_receiver);
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Some(msg) = self.stanza_receiver.recv() => {
|
||||||
|
let result = self.stream.write(&msg.stanza).await;
|
||||||
|
match result {
|
||||||
|
Err(e) => match &e {
|
||||||
|
peanuts::Error::ReadError(error) => {
|
||||||
|
// make sure message is not lost from error, supervisor handles retry and reporting
|
||||||
|
self.on_crash.send((msg, self.stanza_receiver));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
msg.respond_to.send(Err(e.into()));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
msg.respond_to.send(Ok(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// TODO: check if this is ok to do
|
||||||
|
else => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct JabberWriteHandle {
|
||||||
|
sender: mpsc::Sender<JabberWrite>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct JabberWriterControlHandle {
|
||||||
|
sender: mpsc::Sender<JabberWriterControl>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JabberWriterControlHandle {
|
||||||
|
pub fn new(
|
||||||
|
stream: BoundJabberWriter<Tls>,
|
||||||
|
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||||
|
) -> (JabberWriteHandle, JabberWriterControlHandle) {
|
||||||
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
|
||||||
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
(
|
||||||
|
JabberWriteHandle {
|
||||||
|
sender: stanza_sender,
|
||||||
|
},
|
||||||
|
Self {
|
||||||
|
sender: control_sender,
|
||||||
|
handle,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reconnect(
|
||||||
|
stream: BoundJabberWriter<Tls>,
|
||||||
|
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||||
|
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
||||||
|
) -> Self {
|
||||||
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
|
||||||
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
Self {
|
||||||
|
sender: control_sender,
|
||||||
|
handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct JabberReader {
|
||||||
|
// TODO: place iq hashmap here
|
||||||
|
control_receiver: mpsc::Receiver<JabberReaderControl>,
|
||||||
|
stream: BoundJabberReader<Tls>,
|
||||||
|
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
||||||
|
write_sender: mpsc::Sender<JabberWrite>,
|
||||||
|
tasks: JoinSet<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JabberReader {
|
||||||
|
fn new(
|
||||||
|
control_receiver: mpsc::Receiver<JabberReaderControl>,
|
||||||
|
stream: BoundJabberReader<Tls>,
|
||||||
|
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
|
||||||
|
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
||||||
|
write_sender: mpsc::Sender<JabberWrite>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
control_receiver,
|
||||||
|
stream,
|
||||||
|
on_crash,
|
||||||
|
db,
|
||||||
|
sender,
|
||||||
|
supervisor_control,
|
||||||
|
write_sender,
|
||||||
|
tasks: JoinSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(mut self) {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(msg) = self.control_receiver.recv() => {
|
||||||
|
match msg {
|
||||||
|
JabberReaderControl::Disconnect => todo!(),
|
||||||
|
JabberReaderControl::Abort(sender) => todo!(),
|
||||||
|
};
|
||||||
|
},
|
||||||
|
stanza = self.stream.read::<Stanza>() => {
|
||||||
|
match stanza {
|
||||||
|
Ok(_) => todo!(),
|
||||||
|
Err(_) => todo!(),
|
||||||
|
}
|
||||||
|
self.tasks.spawn();
|
||||||
|
},
|
||||||
|
else => break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait Task {
|
||||||
|
async fn handle();
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Task for Stanza {
|
||||||
|
async fn handle() {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum JabberReaderControl {
|
||||||
|
Disconnect,
|
||||||
|
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct JabberReaderControlHandle {
|
||||||
|
sender: mpsc::Sender<JabberReaderControl>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JabberReaderControlHandle {
|
||||||
|
pub fn new(
|
||||||
|
stream: BoundJabberReader<Tls>,
|
||||||
|
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
||||||
|
jabber_write: mpsc::Sender<JabberWrite>,
|
||||||
|
) -> Self {
|
||||||
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
let actor = JabberReader::new(
|
||||||
|
control_receiver,
|
||||||
|
stream,
|
||||||
|
on_crash,
|
||||||
|
db,
|
||||||
|
sender,
|
||||||
|
supervisor_control,
|
||||||
|
jabber_write,
|
||||||
|
);
|
||||||
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
Self {
|
||||||
|
sender: control_sender,
|
||||||
|
handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Luz {
|
||||||
|
receiver: mpsc::Receiver<CommandMessage>,
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
||||||
|
password: String,
|
||||||
|
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
tasks: JoinSet<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Luz {
|
||||||
|
fn new(
|
||||||
receiver: mpsc::Receiver<CommandMessage>,
|
receiver: mpsc::Receiver<CommandMessage>,
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
password: String,
|
||||||
|
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
jid,
|
||||||
|
password,
|
||||||
|
connected,
|
||||||
|
db,
|
||||||
|
receiver,
|
||||||
|
sender,
|
||||||
|
tasks: JoinSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(mut self) {
|
||||||
|
while let Some(msg) = self.receiver.recv().await {
|
||||||
|
// TODO: consider separating disconnect/connect and commands apart from commandmessage
|
||||||
|
match msg {
|
||||||
|
CommandMessage::Connect => {
|
||||||
|
match self.connected.lock().await.as_ref() {
|
||||||
|
Some(_) => {
|
||||||
|
self.sender
|
||||||
|
.send(UpdateMessage::Error(Error::AlreadyConnected))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let mut jid = self.jid.lock().await;
|
||||||
|
let mut domain = jid.domainpart.clone();
|
||||||
|
let streams_result =
|
||||||
|
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
|
||||||
|
.await;
|
||||||
|
match streams_result {
|
||||||
|
Ok(s) => {
|
||||||
|
let (writer, supervisor) =
|
||||||
|
JabberSupervisorHandle::new(s, self.sender.clone());
|
||||||
|
*self.connected.lock().await = Some((writer, supervisor));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
self.sender.send(UpdateMessage::Error(e.into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
|
||||||
|
None => {
|
||||||
|
self.sender
|
||||||
|
.send(UpdateMessage::Error(Error::AlreadyDisonnected))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
mut c => {
|
||||||
|
if let Some((_write_handle, supervisor_handle)) = c.take() {
|
||||||
|
let _ = supervisor_handle
|
||||||
|
.sender
|
||||||
|
.send(JabberSupervisorCommand::Disconnect)
|
||||||
|
.await;
|
||||||
|
} else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
match self.connected.lock().await.as_ref() {
|
||||||
|
Some((w, _)) => self.tasks.spawn(msg.handle_online(
|
||||||
|
w.clone(),
|
||||||
|
self.jid.clone(),
|
||||||
|
self.db.clone(),
|
||||||
|
self.sender.clone(),
|
||||||
|
)),
|
||||||
|
None => self.tasks.spawn(msg.handle_offline(
|
||||||
|
self.jid.clone(),
|
||||||
|
self.db.clone(),
|
||||||
|
self.sender.clone(),
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CommandMessage {
|
||||||
|
pub async fn handle_offline(
|
||||||
|
mut self,
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
) {
|
) {
|
||||||
for message in receiver.recv_many(, )
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_online(
|
||||||
|
mut self,
|
||||||
|
jabber_write_handle: JabberWriteHandle,
|
||||||
|
// TODO: jid could lose resource by the end
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
) {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
|
||||||
|
pub struct LuzHandle {
|
||||||
|
sender: mpsc::Sender<CommandMessage>,
|
||||||
|
receiver: mpsc::Receiver<UpdateMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LuzHandle {
|
||||||
|
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
|
||||||
|
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||||
|
let (update_sender, update_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
let actor = Luz::new(
|
||||||
|
command_receiver,
|
||||||
|
Arc::new(Mutex::new(jid)),
|
||||||
|
password,
|
||||||
|
Arc::new(Mutex::new(None)),
|
||||||
|
db,
|
||||||
|
update_sender,
|
||||||
|
);
|
||||||
|
tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
Self {
|
||||||
|
sender: command_sender,
|
||||||
|
receiver: update_receiver,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct JabberSupervisor {
|
||||||
|
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
|
||||||
|
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||||
|
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
writer_handle: JabberWriterControlHandle,
|
||||||
|
reader_handle: JabberReaderControlHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum JabberSupervisorCommand {
|
||||||
|
Disconnect,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JabberSupervisor {
|
||||||
|
fn new(
|
||||||
|
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
|
||||||
|
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||||
|
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
writer_handle: JabberWriterControlHandle,
|
||||||
|
reader_handle: JabberReaderControlHandle,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
connection_commands,
|
||||||
|
writer_crash,
|
||||||
|
sender,
|
||||||
|
writer_handle,
|
||||||
|
reader_handle,
|
||||||
|
reader_crash,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
|
||||||
|
|
||||||
|
async fn run(mut self) {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(msg) = self.connection_commands.recv() => {
|
||||||
|
self.handle_command_message(msg).await;
|
||||||
|
},
|
||||||
|
error = self.writer_crash => {
|
||||||
|
|
||||||
|
},
|
||||||
|
error = self.reader_crash => {
|
||||||
|
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct JabberSupervisorHandle {
|
||||||
|
sender: mpsc::Sender<JabberSupervisorCommand>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JabberSupervisorHandle {
|
||||||
|
pub fn new(
|
||||||
|
streams: BoundJabberStream<Tls>,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
db: SqlitePool,
|
||||||
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
) -> (JabberWriteHandle, Self) {
|
||||||
|
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||||
|
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
||||||
|
let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
let (reader, writer) = streams.split();
|
||||||
|
let (jabber_write_handle, jabber_writer_control_handle) =
|
||||||
|
JabberWriterControlHandle::new(writer, writer_error_sender);
|
||||||
|
let jabber_reader_control_handle = JabberReaderControlHandle::new(
|
||||||
|
reader,
|
||||||
|
reader_crash_sender,
|
||||||
|
db,
|
||||||
|
update_sender,
|
||||||
|
command_sender.clone(),
|
||||||
|
jabber_write_handle.sender.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let actor = JabberSupervisor::new(
|
||||||
|
command_receiver,
|
||||||
|
writer_error_receiver,
|
||||||
|
reader_crash_receiver,
|
||||||
|
sender,
|
||||||
|
jabber_writer_control_handle,
|
||||||
|
jabber_reader_control_handle,
|
||||||
|
);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
(
|
||||||
|
jabber_write_handle,
|
||||||
|
Self {
|
||||||
|
sender: command_sender,
|
||||||
|
handle,
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
AlreadyConnected,
|
||||||
PollSend(PollSendError<CommandMessage>),
|
PollSend(PollSendError<CommandMessage>),
|
||||||
Jabber(jabber::Error),
|
Jabber(jabber::Error),
|
||||||
|
XML(peanuts::Error),
|
||||||
|
SQL(sqlx::Error),
|
||||||
|
JID(jid::ParseError),
|
||||||
|
AlreadyDisonnected,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<peanuts::Error> for Error {
|
||||||
|
fn from(e: peanuts::Error) -> Self {
|
||||||
|
Self::XML(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<jid::ParseError> for Error {
|
||||||
|
fn from(e: jid::ParseError) -> Self {
|
||||||
|
Self::JID(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<sqlx::Error> for Error {
|
||||||
|
fn from(e: sqlx::Error) -> Self {
|
||||||
|
Self::SQL(e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<jabber::Error> for Error {
|
impl From<jabber::Error> for Error {
|
||||||
|
@ -77,46 +570,6 @@ impl From<jabber::Error> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for Client {
|
|
||||||
type Item = UpdateMessage;
|
|
||||||
|
|
||||||
fn poll_next(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> std::task::Poll<Option<Self::Item>> {
|
|
||||||
pin!(self).receiver.poll_next_unpin(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Sink<CommandMessage> for Client {
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll_ready(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> Poll<Result<(), Self::Error>> {
|
|
||||||
Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx)))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> Poll<Result<(), Self::Error>> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_close(
|
|
||||||
self: std::pin::Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> Poll<Result<(), Self::Error>> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<PollSendError<CommandMessage>> for Error {
|
impl From<PollSendError<CommandMessage>> for Error {
|
||||||
fn from(e: PollSendError<CommandMessage>) -> Self {
|
fn from(e: PollSendError<CommandMessage>) -> Self {
|
||||||
Self::PollSend(e)
|
Self::PollSend(e)
|
||||||
|
@ -125,47 +578,13 @@ impl From<PollSendError<CommandMessage>> for Error {
|
||||||
|
|
||||||
pub enum CommandMessage {
|
pub enum CommandMessage {
|
||||||
Connect,
|
Connect,
|
||||||
|
Disconnect,
|
||||||
|
/// gets the roster. if offline, retreives cached version from database. should be stored in application memory.
|
||||||
GetRoster,
|
GetRoster,
|
||||||
SendMessage(JID, String),
|
SendMessage(JID, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum UpdateMessage {
|
pub enum UpdateMessage {
|
||||||
|
Error(Error),
|
||||||
Roster(Vec<roster::Item>),
|
Roster(Vec<roster::Item>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
|
||||||
pub async fn process_stanza(
|
|
||||||
&mut self,
|
|
||||||
stanza: Result<Stanza, jabber::Error>,
|
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
|
||||||
) {
|
|
||||||
match stanza {
|
|
||||||
Ok(stanza) => todo!(),
|
|
||||||
Err(e) => self.process_error(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn iq(
|
|
||||||
&mut self,
|
|
||||||
to: Option<JID>,
|
|
||||||
r#type: IqType,
|
|
||||||
query: Option<Query>,
|
|
||||||
) -> Result<IqResponse, Error> {
|
|
||||||
self.client
|
|
||||||
.send(Stanza::Iq(Iq {
|
|
||||||
from: Some(self.client.jid()),
|
|
||||||
// TODO: generate id
|
|
||||||
id: "test".to_string(),
|
|
||||||
to,
|
|
||||||
r#type,
|
|
||||||
// TODO: lang
|
|
||||||
lang: None,
|
|
||||||
query,
|
|
||||||
errors: Vec::new(),
|
|
||||||
}))
|
|
||||||
.await?;
|
|
||||||
Ok(todo!())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn iq_process(&mut self, iq: Iq) {}
|
|
||||||
}
|
|
||||||
|
|
|
@ -10,8 +10,8 @@ pub const XMLNS: &str = "jabber:iq:roster";
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Query {
|
pub struct Query {
|
||||||
ver: Option<String>,
|
pub ver: Option<String>,
|
||||||
items: Vec<Item>,
|
pub items: Vec<Item>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromElement for Query {
|
impl FromElement for Query {
|
||||||
|
@ -36,11 +36,16 @@ impl IntoElement for Query {
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Item {
|
pub struct Item {
|
||||||
|
/// signals subscription pre-approval (server only)
|
||||||
approved: Option<bool>,
|
approved: Option<bool>,
|
||||||
|
/// signals subscription sub-states (server only)
|
||||||
ask: bool,
|
ask: bool,
|
||||||
|
/// uniquely identifies item
|
||||||
jid: JID,
|
jid: JID,
|
||||||
|
/// handle that is determined by user, not contact
|
||||||
name: Option<String>,
|
name: Option<String>,
|
||||||
subscription: Subscription,
|
/// state of the presence subscription
|
||||||
|
subscription: Option<Subscription>,
|
||||||
groups: Vec<Group>,
|
groups: Vec<Group>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +68,7 @@ impl FromElement for Item {
|
||||||
};
|
};
|
||||||
let jid = element.attribute("jid")?;
|
let jid = element.attribute("jid")?;
|
||||||
let name = element.attribute_opt("name")?;
|
let name = element.attribute_opt("name")?;
|
||||||
let subscription = element.attribute_opt("subscription")?.unwrap_or_default();
|
let subscription = element.attribute_opt("subscription")?;
|
||||||
let groups = element.pop_children()?;
|
let groups = element.pop_children()?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
@ -91,7 +96,7 @@ impl IntoElement for Item {
|
||||||
)
|
)
|
||||||
.push_attribute("jid", self.jid.clone())
|
.push_attribute("jid", self.jid.clone())
|
||||||
.push_attribute_opt("name", self.name.clone())
|
.push_attribute_opt("name", self.name.clone())
|
||||||
.push_attribute("subscription", self.subscription)
|
.push_attribute_opt("subscription", self.subscription)
|
||||||
.push_children(self.groups.clone())
|
.push_children(self.groups.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,6 +139,7 @@ impl ToString for Subscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
// TODO: check if should be option or not
|
||||||
pub struct Group(Option<String>);
|
pub struct Group(Option<String>);
|
||||||
|
|
||||||
impl FromElement for Group {
|
impl FromElement for Group {
|
||||||
|
|
Loading…
Reference in New Issue