Compare commits
3 Commits
e6c97ab828
...
a1d96233e8
Author | SHA1 | Date |
---|---|---|
|
a1d96233e8 | |
|
b023c6b5f2 | |
|
866e134371 |
|
@ -22,71 +22,16 @@ use crate::{
|
|||
Connection, Error, JabberStream, Result, JID,
|
||||
};
|
||||
|
||||
// feed it client stanzas, receive client stanzas
|
||||
pub struct JabberClient {
|
||||
connection: Option<BoundJabberStream<Tls>>,
|
||||
jid: JID,
|
||||
// TODO: have reconnection be handled by another part, so creds don't need to be stored in object
|
||||
password: Arc<SASLConfig>,
|
||||
server: String,
|
||||
}
|
||||
|
||||
impl JabberClient {
|
||||
pub fn new(
|
||||
jid: impl TryInto<JID, Error = ParseError>,
|
||||
password: impl ToString,
|
||||
) -> Result<JabberClient> {
|
||||
let jid = jid.try_into()?;
|
||||
let sasl_config = SASLConfig::with_credentials(
|
||||
None,
|
||||
jid.localpart.clone().ok_or(Error::NoLocalpart)?,
|
||||
password.to_string(),
|
||||
)?;
|
||||
Ok(JabberClient {
|
||||
connection: None,
|
||||
jid: jid.clone(),
|
||||
password: sasl_config,
|
||||
server: jid.domainpart,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn jid(&self) -> JID {
|
||||
self.jid.clone()
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self) -> Result<()> {
|
||||
match &self.connection {
|
||||
Some(_) => Ok(()),
|
||||
None => {
|
||||
self.connection = Some(
|
||||
connect_and_login(&mut self.jid, self.password.clone(), &mut self.server)
|
||||
.await?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_inner(self) -> Result<BoundJabberStream<Tls>> {
|
||||
self.connection.ok_or(Error::Disconnected)
|
||||
}
|
||||
|
||||
// pub async fn send_stanza(&mut self, stanza: &Stanza) -> Result<()> {
|
||||
// match &mut self.connection {
|
||||
// ConnectionState::Disconnected => return Err(Error::Disconnected),
|
||||
// ConnectionState::Connecting(_connecting) => return Err(Error::Connecting),
|
||||
// ConnectionState::Connected(jabber_stream) => {
|
||||
// Ok(jabber_stream.send_stanza(stanza).await?)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
pub async fn connect_and_login(
|
||||
jid: &mut JID,
|
||||
auth: Arc<SASLConfig>,
|
||||
mut jid: &mut JID,
|
||||
password: impl AsRef<str>,
|
||||
server: &mut String,
|
||||
) -> Result<BoundJabberStream<Tls>> {
|
||||
let auth = SASLConfig::with_credentials(
|
||||
None,
|
||||
jid.localpart.clone().ok_or(Error::NoLocalpart)?,
|
||||
password.as_ref().to_string(),
|
||||
)?;
|
||||
let mut conn_state = Connecting::start(&server).await?;
|
||||
loop {
|
||||
match conn_state {
|
||||
|
@ -177,8 +122,8 @@ pub enum InsecureConnecting {
|
|||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use super::JabberClient;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use jid::JID;
|
||||
use stanza::{
|
||||
client::{
|
||||
iq::{Iq, IqType, Query},
|
||||
|
@ -190,21 +135,25 @@ mod tests {
|
|||
use tokio::{sync::Mutex, time::sleep};
|
||||
use tracing::info;
|
||||
|
||||
use super::connect_and_login;
|
||||
|
||||
#[test(tokio::test)]
|
||||
async fn login() {
|
||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||
client.connect().await.unwrap();
|
||||
let mut jid: JID = "test@blos.sm".try_into().unwrap();
|
||||
let client = connect_and_login(&mut jid, "slayed", &mut "blos.sm".to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
sleep(Duration::from_secs(5)).await
|
||||
}
|
||||
|
||||
#[test(tokio::test)]
|
||||
async fn ping_parallel() {
|
||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||
client.connect().await.unwrap();
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
let jid = client.jid.clone();
|
||||
let server = client.server.clone();
|
||||
let (mut read, mut write) = client.into_inner().unwrap().split();
|
||||
let mut jid: JID = "test@blos.sm".try_into().unwrap();
|
||||
let mut server = "blos.sm".to_string();
|
||||
let client = connect_and_login(&mut jid, "slayed", &mut server)
|
||||
.await
|
||||
.unwrap();
|
||||
let (mut read, mut write) = client.split();
|
||||
|
||||
tokio::join!(
|
||||
async {
|
||||
|
|
|
@ -415,7 +415,7 @@ mod tests {
|
|||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
use crate::{client::JabberClient, connection::Connection};
|
||||
use crate::{connection::Connection};
|
||||
use futures::sink;
|
||||
use test_log::test;
|
||||
use tokio::time::sleep;
|
||||
|
@ -468,8 +468,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn sink() {
|
||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||
client.connect().await.unwrap();
|
||||
// let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||
// client.connect().await.unwrap();
|
||||
// let stream = client.inner().unwrap();
|
||||
// let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
|
||||
// stream.writer.write(&stanza).await?;
|
||||
|
|
|
@ -15,15 +15,7 @@ pub use jid::JID;
|
|||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub async fn login<J: AsRef<str>, P: AsRef<str>>(jid: J, password: P) -> Result<JabberStream<Tls>> {
|
||||
todo!()
|
||||
// Ok(Connection::connect_user(jid, password.as_ref().to_string())
|
||||
// .await?
|
||||
// .ensure_tls()
|
||||
// .await?
|
||||
// .negotiate()
|
||||
// .await?)
|
||||
}
|
||||
pub use client::connect_and_login;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
|
|
@ -6,6 +6,9 @@ edition = "2021"
|
|||
[dependencies]
|
||||
futures = "0.3.31"
|
||||
jabber = { version = "0.1.0", path = "../jabber" }
|
||||
peanuts = { version = "0.1.0", path = "../../peanuts" }
|
||||
jid = { version = "0.1.0", path = "../jid" }
|
||||
sqlx = { version = "0.8.3", features = [ "sqlite", "runtime-tokio" ] }
|
||||
stanza = { version = "0.1.0", path = "../stanza" }
|
||||
tokio = "1.42.0"
|
||||
tokio-stream = "0.1.17"
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
CREATE TABLE roster(
|
||||
id INTEGER PRIMARY KEY,
|
||||
jid TEXT NOT NULL,
|
||||
nickname TEXT,
|
||||
);
|
647
luz/src/lib.rs
647
luz/src/lib.rs
|
@ -1,15 +1,21 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
fmt::Pointer,
|
||||
pin::pin,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
task::{ready, Poll},
|
||||
thread::JoinHandle,
|
||||
};
|
||||
|
||||
use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
Sink, SinkExt, Stream, StreamExt,
|
||||
};
|
||||
use jabber::{client::JabberClient, JID};
|
||||
use jabber::{
|
||||
connection::Tls,
|
||||
jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter},
|
||||
JID,
|
||||
};
|
||||
use sqlx::{query, Pool, Sqlite, SqlitePool};
|
||||
use stanza::{
|
||||
client::{
|
||||
iq::{Iq, IqType, Query},
|
||||
|
@ -17,58 +23,545 @@ use stanza::{
|
|||
},
|
||||
roster,
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::{
|
||||
io::AsyncRead,
|
||||
select,
|
||||
sync::{
|
||||
mpsc::{self, Receiver, Sender},
|
||||
oneshot, Mutex,
|
||||
},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::{PollSendError, PollSender};
|
||||
|
||||
pub struct Client {
|
||||
client: JabberClient,
|
||||
pending_iqs: HashMap<String, mpsc::Sender<Iq>>,
|
||||
// database connection (sqlite)
|
||||
receiver: ReceiverStream<UpdateMessage>,
|
||||
sender: PollSender<CommandMessage>,
|
||||
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
|
||||
pub struct JabberWriter {
|
||||
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
||||
control_receiver: mpsc::Receiver<JabberWriterControl>,
|
||||
stream: BoundJabberWriter<Tls>,
|
||||
on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new(jid: String, password: &str) -> Result<Self, Error> {
|
||||
let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
|
||||
let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
|
||||
let mut jabber_client = JabberClient::new(jid, password)?;
|
||||
jabber_client.connect().await?;
|
||||
let (write, read) = jabber_client.split();
|
||||
let client = Self {
|
||||
client: jabber_client,
|
||||
receiver: ReceiverStream::new(read_receiver),
|
||||
sender: PollSender::new(write_sender),
|
||||
pending_iqs: HashMap::new(),
|
||||
};
|
||||
tokio::spawn(client.process_read(read, read_sender));
|
||||
tokio::spawn(client.process_write(write, write_receiver));
|
||||
Ok(client)
|
||||
}
|
||||
struct JabberWrite {
|
||||
stanza: Stanza,
|
||||
respond_to: oneshot::Sender<Result<(), Error>>,
|
||||
}
|
||||
|
||||
pub async fn process_read(
|
||||
&self,
|
||||
mut stream: SplitStream<JabberClient>,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
) {
|
||||
for stanza in stream.next().await {
|
||||
tokio::spawn(self.process_stanza(stanza, sender.clone()));
|
||||
enum JabberWriterControl {
|
||||
Disconnect,
|
||||
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
|
||||
}
|
||||
|
||||
impl JabberWriter {
|
||||
fn new(
|
||||
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
||||
control_receiver: mpsc::Receiver<JabberWriterControl>,
|
||||
stream: BoundJabberWriter<Tls>,
|
||||
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||
) -> Self {
|
||||
Self {
|
||||
stanza_receiver,
|
||||
control_receiver,
|
||||
stream,
|
||||
on_crash: supervisor,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_write(
|
||||
&self,
|
||||
mut sink: SplitSink<JabberClient, Stanza>,
|
||||
async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
|
||||
Ok(self.stream.write(stanza).await?)
|
||||
}
|
||||
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = self.control_receiver.recv() => {
|
||||
match msg {
|
||||
JabberWriterControl::Disconnect => {
|
||||
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
|
||||
self.stanza_receiver.close();
|
||||
// TODO: put this in some kind of function to avoid code duplication
|
||||
for msg in self.stanza_receiver.recv().await {
|
||||
let result = self.stream.write(&msg.stanza).await;
|
||||
match result {
|
||||
Err(e) => match &e {
|
||||
peanuts::Error::ReadError(error) => {
|
||||
// make sure message is not lost from error, supervisor handles retry and reporting
|
||||
self.on_crash.send((msg, self.stanza_receiver));
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
msg.respond_to.send(Err(e.into()));
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
msg.respond_to.send(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
self.stream.try_close().await;
|
||||
break;
|
||||
},
|
||||
JabberWriterControl::Abort(sender) => {
|
||||
sender.send(self.stanza_receiver);
|
||||
break;
|
||||
},
|
||||
}
|
||||
},
|
||||
Some(msg) = self.stanza_receiver.recv() => {
|
||||
let result = self.stream.write(&msg.stanza).await;
|
||||
match result {
|
||||
Err(e) => match &e {
|
||||
peanuts::Error::ReadError(error) => {
|
||||
// make sure message is not lost from error, supervisor handles retry and reporting
|
||||
self.on_crash.send((msg, self.stanza_receiver));
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
msg.respond_to.send(Err(e.into()));
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
msg.respond_to.send(Ok(()));
|
||||
}
|
||||
}
|
||||
},
|
||||
// TODO: check if this is ok to do
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct JabberWriteHandle {
|
||||
sender: mpsc::Sender<JabberWrite>,
|
||||
}
|
||||
|
||||
pub struct JabberWriterControlHandle {
|
||||
sender: mpsc::Sender<JabberWriterControl>,
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl JabberWriterControlHandle {
|
||||
pub fn new(
|
||||
stream: BoundJabberWriter<Tls>,
|
||||
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||
) -> (JabberWriteHandle, JabberWriterControlHandle) {
|
||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
|
||||
|
||||
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
|
||||
let handle = tokio::spawn(async move { actor.run().await });
|
||||
|
||||
(
|
||||
JabberWriteHandle {
|
||||
sender: stanza_sender,
|
||||
},
|
||||
Self {
|
||||
sender: control_sender,
|
||||
handle,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn reconnect(
|
||||
stream: BoundJabberWriter<Tls>,
|
||||
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||
stanza_receiver: mpsc::Receiver<JabberWrite>,
|
||||
) -> Self {
|
||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||
|
||||
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
|
||||
let handle = tokio::spawn(async move { actor.run().await });
|
||||
|
||||
Self {
|
||||
sender: control_sender,
|
||||
handle,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JabberReader {
|
||||
// TODO: place iq hashmap here
|
||||
control_receiver: mpsc::Receiver<JabberReaderControl>,
|
||||
stream: BoundJabberReader<Tls>,
|
||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
||||
write_sender: mpsc::Sender<JabberWrite>,
|
||||
tasks: JoinSet<()>,
|
||||
}
|
||||
|
||||
impl JabberReader {
|
||||
fn new(
|
||||
control_receiver: mpsc::Receiver<JabberReaderControl>,
|
||||
stream: BoundJabberReader<Tls>,
|
||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
|
||||
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
||||
write_sender: mpsc::Sender<JabberWrite>,
|
||||
) -> Self {
|
||||
Self {
|
||||
control_receiver,
|
||||
stream,
|
||||
on_crash,
|
||||
db,
|
||||
sender,
|
||||
supervisor_control,
|
||||
write_sender,
|
||||
tasks: JoinSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = self.control_receiver.recv() => {
|
||||
match msg {
|
||||
JabberReaderControl::Disconnect => todo!(),
|
||||
JabberReaderControl::Abort(sender) => todo!(),
|
||||
};
|
||||
},
|
||||
stanza = self.stream.read::<Stanza>() => {
|
||||
match stanza {
|
||||
Ok(_) => todo!(),
|
||||
Err(_) => todo!(),
|
||||
}
|
||||
self.tasks.spawn();
|
||||
},
|
||||
else => break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait Task {
|
||||
async fn handle();
|
||||
}
|
||||
|
||||
impl Task for Stanza {
|
||||
async fn handle() {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
enum JabberReaderControl {
|
||||
Disconnect,
|
||||
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
|
||||
}
|
||||
|
||||
struct JabberReaderControlHandle {
|
||||
sender: mpsc::Sender<JabberReaderControl>,
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl JabberReaderControlHandle {
|
||||
pub fn new(
|
||||
stream: BoundJabberReader<Tls>,
|
||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
|
||||
jabber_write: mpsc::Sender<JabberWrite>,
|
||||
) -> Self {
|
||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||
|
||||
let actor = JabberReader::new(
|
||||
control_receiver,
|
||||
stream,
|
||||
on_crash,
|
||||
db,
|
||||
sender,
|
||||
supervisor_control,
|
||||
jabber_write,
|
||||
);
|
||||
let handle = tokio::spawn(async move { actor.run().await });
|
||||
|
||||
Self {
|
||||
sender: control_sender,
|
||||
handle,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Luz {
|
||||
receiver: mpsc::Receiver<CommandMessage>,
|
||||
jid: Arc<Mutex<JID>>,
|
||||
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
||||
password: String,
|
||||
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
tasks: JoinSet<()>,
|
||||
}
|
||||
|
||||
impl Luz {
|
||||
fn new(
|
||||
receiver: mpsc::Receiver<CommandMessage>,
|
||||
jid: Arc<Mutex<JID>>,
|
||||
password: String,
|
||||
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
) -> Self {
|
||||
Self {
|
||||
jid,
|
||||
password,
|
||||
connected,
|
||||
db,
|
||||
receiver,
|
||||
sender,
|
||||
tasks: JoinSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(mut self) {
|
||||
while let Some(msg) = self.receiver.recv().await {
|
||||
// TODO: consider separating disconnect/connect and commands apart from commandmessage
|
||||
match msg {
|
||||
CommandMessage::Connect => {
|
||||
match self.connected.lock().await.as_ref() {
|
||||
Some(_) => {
|
||||
self.sender
|
||||
.send(UpdateMessage::Error(Error::AlreadyConnected))
|
||||
.await;
|
||||
}
|
||||
None => {
|
||||
let mut jid = self.jid.lock().await;
|
||||
let mut domain = jid.domainpart.clone();
|
||||
let streams_result =
|
||||
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
|
||||
.await;
|
||||
match streams_result {
|
||||
Ok(s) => {
|
||||
let (writer, supervisor) =
|
||||
JabberSupervisorHandle::new(s, self.sender.clone());
|
||||
*self.connected.lock().await = Some((writer, supervisor));
|
||||
}
|
||||
Err(e) => {
|
||||
self.sender.send(UpdateMessage::Error(e.into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
|
||||
None => {
|
||||
self.sender
|
||||
.send(UpdateMessage::Error(Error::AlreadyDisonnected))
|
||||
.await;
|
||||
}
|
||||
mut c => {
|
||||
if let Some((_write_handle, supervisor_handle)) = c.take() {
|
||||
let _ = supervisor_handle
|
||||
.sender
|
||||
.send(JabberSupervisorCommand::Disconnect)
|
||||
.await;
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
match self.connected.lock().await.as_ref() {
|
||||
Some((w, _)) => self.tasks.spawn(msg.handle_online(
|
||||
w.clone(),
|
||||
self.jid.clone(),
|
||||
self.db.clone(),
|
||||
self.sender.clone(),
|
||||
)),
|
||||
None => self.tasks.spawn(msg.handle_offline(
|
||||
self.jid.clone(),
|
||||
self.db.clone(),
|
||||
self.sender.clone(),
|
||||
)),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CommandMessage {
|
||||
pub async fn handle_offline(
|
||||
mut self,
|
||||
jid: Arc<Mutex<JID>>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
) {
|
||||
for message in receiver.recv_many(, )
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn handle_online(
|
||||
mut self,
|
||||
jabber_write_handle: JabberWriteHandle,
|
||||
// TODO: jid could lose resource by the end
|
||||
jid: Arc<Mutex<JID>>,
|
||||
db: SqlitePool,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
|
||||
pub struct LuzHandle {
|
||||
sender: mpsc::Sender<CommandMessage>,
|
||||
receiver: mpsc::Receiver<UpdateMessage>,
|
||||
}
|
||||
|
||||
impl LuzHandle {
|
||||
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
|
||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||
let (update_sender, update_receiver) = mpsc::channel(20);
|
||||
|
||||
let actor = Luz::new(
|
||||
command_receiver,
|
||||
Arc::new(Mutex::new(jid)),
|
||||
password,
|
||||
Arc::new(Mutex::new(None)),
|
||||
db,
|
||||
update_sender,
|
||||
);
|
||||
tokio::spawn(async move { actor.run().await });
|
||||
|
||||
Self {
|
||||
sender: command_sender,
|
||||
receiver: update_receiver,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JabberSupervisor {
|
||||
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
|
||||
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
writer_handle: JabberWriterControlHandle,
|
||||
reader_handle: JabberReaderControlHandle,
|
||||
}
|
||||
|
||||
pub enum JabberSupervisorCommand {
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
impl JabberSupervisor {
|
||||
fn new(
|
||||
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
|
||||
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
|
||||
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
writer_handle: JabberWriterControlHandle,
|
||||
reader_handle: JabberReaderControlHandle,
|
||||
) -> Self {
|
||||
Self {
|
||||
connection_commands,
|
||||
writer_crash,
|
||||
sender,
|
||||
writer_handle,
|
||||
reader_handle,
|
||||
reader_crash,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
|
||||
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = self.connection_commands.recv() => {
|
||||
self.handle_command_message(msg).await;
|
||||
},
|
||||
error = self.writer_crash => {
|
||||
|
||||
},
|
||||
error = self.reader_crash => {
|
||||
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JabberSupervisorHandle {
|
||||
sender: mpsc::Sender<JabberSupervisorCommand>,
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl JabberSupervisorHandle {
|
||||
pub fn new(
|
||||
streams: BoundJabberStream<Tls>,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
db: SqlitePool,
|
||||
update_sender: mpsc::Sender<UpdateMessage>,
|
||||
) -> (JabberWriteHandle, Self) {
|
||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
||||
let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
|
||||
|
||||
let (reader, writer) = streams.split();
|
||||
let (jabber_write_handle, jabber_writer_control_handle) =
|
||||
JabberWriterControlHandle::new(writer, writer_error_sender);
|
||||
let jabber_reader_control_handle = JabberReaderControlHandle::new(
|
||||
reader,
|
||||
reader_crash_sender,
|
||||
db,
|
||||
update_sender,
|
||||
command_sender.clone(),
|
||||
jabber_write_handle.sender.clone(),
|
||||
);
|
||||
|
||||
let actor = JabberSupervisor::new(
|
||||
command_receiver,
|
||||
writer_error_receiver,
|
||||
reader_crash_receiver,
|
||||
sender,
|
||||
jabber_writer_control_handle,
|
||||
jabber_reader_control_handle,
|
||||
);
|
||||
|
||||
let handle = tokio::spawn(async move { actor.run().await });
|
||||
|
||||
(
|
||||
jabber_write_handle,
|
||||
Self {
|
||||
sender: command_sender,
|
||||
handle,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Error {
|
||||
AlreadyConnected,
|
||||
PollSend(PollSendError<CommandMessage>),
|
||||
Jabber(jabber::Error),
|
||||
XML(peanuts::Error),
|
||||
SQL(sqlx::Error),
|
||||
JID(jid::ParseError),
|
||||
AlreadyDisonnected,
|
||||
}
|
||||
|
||||
impl From<peanuts::Error> for Error {
|
||||
fn from(e: peanuts::Error) -> Self {
|
||||
Self::XML(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<jid::ParseError> for Error {
|
||||
fn from(e: jid::ParseError) -> Self {
|
||||
Self::JID(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<sqlx::Error> for Error {
|
||||
fn from(e: sqlx::Error) -> Self {
|
||||
Self::SQL(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<jabber::Error> for Error {
|
||||
|
@ -77,46 +570,6 @@ impl From<jabber::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl Stream for Client {
|
||||
type Item = UpdateMessage;
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
pin!(self).receiver.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sink<CommandMessage> for Client {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx)))
|
||||
}
|
||||
|
||||
fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PollSendError<CommandMessage>> for Error {
|
||||
fn from(e: PollSendError<CommandMessage>) -> Self {
|
||||
Self::PollSend(e)
|
||||
|
@ -125,47 +578,13 @@ impl From<PollSendError<CommandMessage>> for Error {
|
|||
|
||||
pub enum CommandMessage {
|
||||
Connect,
|
||||
Disconnect,
|
||||
/// gets the roster. if offline, retreives cached version from database. should be stored in application memory.
|
||||
GetRoster,
|
||||
SendMessage(JID, String),
|
||||
}
|
||||
|
||||
pub enum UpdateMessage {
|
||||
Error(Error),
|
||||
Roster(Vec<roster::Item>),
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn process_stanza(
|
||||
&mut self,
|
||||
stanza: Result<Stanza, jabber::Error>,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
) {
|
||||
match stanza {
|
||||
Ok(stanza) => todo!(),
|
||||
Err(e) => self.process_error(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn iq(
|
||||
&mut self,
|
||||
to: Option<JID>,
|
||||
r#type: IqType,
|
||||
query: Option<Query>,
|
||||
) -> Result<IqResponse, Error> {
|
||||
self.client
|
||||
.send(Stanza::Iq(Iq {
|
||||
from: Some(self.client.jid()),
|
||||
// TODO: generate id
|
||||
id: "test".to_string(),
|
||||
to,
|
||||
r#type,
|
||||
// TODO: lang
|
||||
lang: None,
|
||||
query,
|
||||
errors: Vec::new(),
|
||||
}))
|
||||
.await?;
|
||||
Ok(todo!())
|
||||
}
|
||||
|
||||
pub async fn iq_process(&mut self, iq: Iq) {}
|
||||
}
|
||||
|
|
|
@ -10,8 +10,8 @@ pub const XMLNS: &str = "jabber:iq:roster";
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Query {
|
||||
ver: Option<String>,
|
||||
items: Vec<Item>,
|
||||
pub ver: Option<String>,
|
||||
pub items: Vec<Item>,
|
||||
}
|
||||
|
||||
impl FromElement for Query {
|
||||
|
@ -36,11 +36,16 @@ impl IntoElement for Query {
|
|||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Item {
|
||||
/// signals subscription pre-approval (server only)
|
||||
approved: Option<bool>,
|
||||
/// signals subscription sub-states (server only)
|
||||
ask: bool,
|
||||
/// uniquely identifies item
|
||||
jid: JID,
|
||||
/// handle that is determined by user, not contact
|
||||
name: Option<String>,
|
||||
subscription: Subscription,
|
||||
/// state of the presence subscription
|
||||
subscription: Option<Subscription>,
|
||||
groups: Vec<Group>,
|
||||
}
|
||||
|
||||
|
@ -63,7 +68,7 @@ impl FromElement for Item {
|
|||
};
|
||||
let jid = element.attribute("jid")?;
|
||||
let name = element.attribute_opt("name")?;
|
||||
let subscription = element.attribute_opt("subscription")?.unwrap_or_default();
|
||||
let subscription = element.attribute_opt("subscription")?;
|
||||
let groups = element.pop_children()?;
|
||||
|
||||
Ok(Self {
|
||||
|
@ -91,7 +96,7 @@ impl IntoElement for Item {
|
|||
)
|
||||
.push_attribute("jid", self.jid.clone())
|
||||
.push_attribute_opt("name", self.name.clone())
|
||||
.push_attribute("subscription", self.subscription)
|
||||
.push_attribute_opt("subscription", self.subscription)
|
||||
.push_children(self.groups.clone())
|
||||
}
|
||||
}
|
||||
|
@ -134,6 +139,7 @@ impl ToString for Subscription {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
// TODO: check if should be option or not
|
||||
pub struct Group(Option<String>);
|
||||
|
||||
impl FromElement for Group {
|
||||
|
|
Loading…
Reference in New Issue