From 41c1ba15ef5865f4513db525ed595f3ce903dd26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?cel=20=F0=9F=8C=B8?= Date: Mon, 10 Feb 2025 17:48:39 +0000 Subject: [PATCH] WIP: code cleanup --- luz/src/connection/mod.rs | 166 ++++++++++ luz/src/connection/read.rs | 120 ++++++++ luz/src/connection/write.rs | 171 +++++++++++ luz/src/error.rs | 32 ++ luz/src/lib.rs | 583 ++++++------------------------------ 5 files changed, 587 insertions(+), 485 deletions(-) create mode 100644 luz/src/connection/mod.rs create mode 100644 luz/src/connection/read.rs create mode 100644 luz/src/connection/write.rs create mode 100644 luz/src/error.rs diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs new file mode 100644 index 0000000..3ad2648 --- /dev/null +++ b/luz/src/connection/mod.rs @@ -0,0 +1,166 @@ +use std::ops::{Deref, DerefMut}; + +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use read::ReadControlHandle; +use sqlx::SqlitePool; +use tokio::{ + sync::{mpsc, oneshot}, + task::{JoinHandle, JoinSet}, +}; +use write::{WriteControlHandle, WriteHandle, WriteMessage}; + +use crate::UpdateMessage; + +mod read; +pub(crate) mod write; + +pub struct Supervisor { + connection_commands: mpsc::Receiver, + writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver)>, + reader_crash: oneshot::Receiver<( + SqlitePool, + mpsc::Sender, + tokio::task::JoinSet<()>, + )>, + sender: mpsc::Sender, + writer_handle: WriteControlHandle, + reader_handle: ReadControlHandle, + on_shutdown: oneshot::Sender<()>, +} + +pub enum SupervisorCommand { + Disconnect, + // for if there was a stream error, require to reconnect + Reconnect, +} + +impl Supervisor { + fn new( + connection_commands: mpsc::Receiver, + writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver)>, + reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender, JoinSet<()>)>, + sender: mpsc::Sender, + writer_handle: WriteControlHandle, + reader_handle: ReadControlHandle, + on_shutdown: oneshot::Sender<()>, + ) -> Self { + Self { + connection_commands, + writer_crash, + sender, + writer_handle, + reader_handle, + reader_crash, + on_shutdown, + } + } + + async fn handle_command_message(&mut self, msg: SupervisorCommand) {} + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.connection_commands.recv() => { + self.handle_command_message(msg).await; + }, + error = &mut self.writer_crash => { + + }, + error = &mut self.reader_crash => { + + }, + else => break, + } + } + self.on_shutdown.send(()); + } +} + +pub struct SupervisorHandle { + sender: SupervisorSender, + handle: JoinHandle<()>, +} + +impl Deref for SupervisorHandle { + type Target = SupervisorSender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for SupervisorHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +#[derive(Clone)] +pub struct SupervisorSender { + sender: mpsc::Sender, +} + +impl Deref for SupervisorSender { + type Target = mpsc::Sender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for SupervisorSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl SupervisorHandle { + pub fn new( + streams: BoundJabberStream, + update_sender: mpsc::Sender, + db: SqlitePool, + on_shutdown: oneshot::Sender<()>, + ) -> (WriteHandle, Self) { + let (command_sender, command_receiver) = mpsc::channel(20); + let (writer_error_sender, writer_error_receiver) = oneshot::channel(); + let (reader_crash_sender, reader_crash_receiver) = oneshot::channel(); + + let (reader, writer) = streams.split(); + let (write_handle, write_control_handle) = + WriteControlHandle::new(writer, writer_error_sender); + let jabber_reader_control_handle = ReadControlHandle::new( + reader, + reader_crash_sender, + db, + update_sender.clone(), + command_sender.clone(), + write_handle.clone(), + ); + + let actor = Supervisor::new( + command_receiver, + writer_error_receiver, + reader_crash_receiver, + update_sender, + write_control_handle, + jabber_reader_control_handle, + on_shutdown, + ); + + let handle = tokio::spawn(async move { actor.run().await }); + + ( + write_handle, + Self { + sender: SupervisorSender { + sender: command_sender, + }, + handle, + }, + ) + } + + pub fn sender(&self) -> SupervisorSender { + self.sender.clone() + } +} diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs new file mode 100644 index 0000000..7800d56 --- /dev/null +++ b/luz/src/connection/read.rs @@ -0,0 +1,120 @@ +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use sqlx::SqlitePool; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot}, + task::{JoinHandle, JoinSet}, +}; + +use crate::UpdateMessage; + +use super::{ + write::{WriteHandle, WriteMessage}, + SupervisorCommand, +}; + +pub struct Read { + // TODO: place iq hashmap here + control_receiver: mpsc::Receiver, + stream: BoundJabberReader, + on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender, JoinSet<()>)>, + db: SqlitePool, + update_sender: mpsc::Sender, + supervisor_control: mpsc::Sender, + write_handle: WriteHandle, + tasks: JoinSet<()>, +} + +impl Read { + fn new( + control_receiver: mpsc::Receiver, + stream: BoundJabberReader, + on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender, JoinSet<()>)>, + db: SqlitePool, + update_sender: mpsc::Sender, + // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) + supervisor_control: mpsc::Sender, + write_sender: WriteHandle, + ) -> Self { + Self { + control_receiver, + stream, + on_crash, + db, + update_sender, + supervisor_control, + write_handle: write_sender, + tasks: JoinSet::new(), + } + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.control_receiver.recv() => { + match msg { + ReadControl::Disconnect => todo!(), + ReadControl::Abort(sender) => todo!(), + }; + }, + stanza = self.stream.read::() => { + match stanza { + Ok(_) => todo!(), + Err(_) => todo!(), + } + self.tasks.spawn(); + }, + else => break + } + } + } +} + +trait Task { + async fn handle(); +} + +impl Task for Stanza { + async fn handle() { + todo!() + } +} + +enum ReadControl { + Disconnect, + Abort(oneshot::Sender>), +} + +pub struct ReadControlHandle { + sender: mpsc::Sender, + handle: JoinHandle<()>, +} + +impl ReadControlHandle { + pub fn new( + stream: BoundJabberReader, + on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender, JoinSet<()>)>, + db: SqlitePool, + sender: mpsc::Sender, + supervisor_control: mpsc::Sender, + jabber_write: WriteHandle, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + control_receiver, + stream, + on_crash, + db, + sender, + supervisor_control, + jabber_write, + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs new file mode 100644 index 0000000..9c01519 --- /dev/null +++ b/luz/src/connection/write.rs @@ -0,0 +1,171 @@ +use std::ops::{Deref, DerefMut}; + +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; + +use crate::error::Error; + +// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. +pub struct Write { + stanza_receiver: mpsc::Receiver, + control_receiver: mpsc::Receiver, + stream: BoundJabberWriter, + on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, +} + +pub struct WriteMessage { + stanza: Stanza, + respond_to: oneshot::Sender>, +} + +enum WriteControl { + Disconnect, + Abort(oneshot::Sender>), +} + +impl Write { + fn new( + stanza_receiver: mpsc::Receiver, + control_receiver: mpsc::Receiver, + stream: BoundJabberWriter, + supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + ) -> Self { + Self { + stanza_receiver, + control_receiver, + stream, + on_crash: supervisor, + } + } + + async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { + Ok(self.stream.write(stanza).await?) + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.control_receiver.recv() => { + match msg { + WriteControl::Disconnect => { + // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send + self.stanza_receiver.close(); + // TODO: put this in some kind of function to avoid code duplication + for msg in self.stanza_receiver.recv().await { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + self.on_crash.send((msg, self.stanza_receiver)); + break; + } + _ => { + msg.respond_to.send(Err(e.into())); + } + }, + _ => { + msg.respond_to.send(Ok(())); + } + } + } + self.stream.try_close().await; + break; + }, + WriteControl::Abort(sender) => { + sender.send(self.stanza_receiver); + break; + }, + } + }, + Some(msg) = self.stanza_receiver.recv() => { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + self.on_crash.send((msg, self.stanza_receiver)); + break; + } + _ => { + msg.respond_to.send(Err(e.into())); + } + }, + _ => { + msg.respond_to.send(Ok(())); + } + } + }, + // TODO: check if this is ok to do + else => break, + } + } + } +} + +#[derive(Clone)] +pub struct WriteHandle { + sender: mpsc::Sender, +} + +impl Deref for WriteHandle { + type Target = mpsc::Sender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +pub struct WriteControlHandle { + sender: mpsc::Sender, + handle: JoinHandle<()>, +} + +impl WriteControlHandle { + pub fn new( + stream: BoundJabberWriter, + supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + ) -> (WriteHandle, Self) { + let (control_sender, control_receiver) = mpsc::channel(20); + let (stanza_sender, stanza_receiver) = mpsc::channel(20); + + let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); + let handle = tokio::spawn(async move { actor.run().await }); + + ( + WriteHandle { + sender: stanza_sender, + }, + Self { + sender: control_sender, + handle, + }, + ) + } + + pub fn reconnect( + stream: BoundJabberWriter, + supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver)>, + stanza_receiver: mpsc::Receiver, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} diff --git a/luz/src/error.rs b/luz/src/error.rs new file mode 100644 index 0000000..d9dfaba --- /dev/null +++ b/luz/src/error.rs @@ -0,0 +1,32 @@ +pub enum Error { + AlreadyConnected, + Jabber(jabber::Error), + XML(peanuts::Error), + SQL(sqlx::Error), + JID(jid::ParseError), + AlreadyDisconnected, +} + +impl From for Error { + fn from(e: peanuts::Error) -> Self { + Self::XML(e) + } +} + +impl From for Error { + fn from(e: jid::ParseError) -> Self { + Self::JID(e) + } +} + +impl From for Error { + fn from(e: sqlx::Error) -> Self { + Self::SQL(e) + } +} + +impl From for Error { + fn from(e: jabber::Error) -> Self { + Self::Jabber(e) + } +} diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 6372fe0..0dfc30c 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -1,301 +1,33 @@ -use std::{ - collections::{HashMap, HashSet, VecDeque}, - fmt::Pointer, - pin::pin, - sync::{atomic::AtomicBool, Arc}, - task::{ready, Poll}, -}; +use std::sync::Arc; -use futures::{ - stream::{SplitSink, SplitStream}, - Sink, SinkExt, Stream, StreamExt, -}; -use jabber::{ - connection::Tls, - jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter}, - JID, -}; -use sqlx::{query, Pool, Sqlite, SqlitePool}; -use stanza::{ - client::{ - iq::{Iq, IqType, Query}, - Stanza, - }, - roster, -}; +use connection::SupervisorSender; +use jabber::JID; +use sqlx::SqlitePool; +use stanza::roster; use tokio::{ - io::AsyncRead, - select, - sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, Mutex, - }, - task::{JoinHandle, JoinSet}, + sync::{mpsc, oneshot, Mutex}, + task::JoinSet, }; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::sync::{PollSendError, PollSender}; -// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. -pub struct JabberWriter { - stanza_receiver: mpsc::Receiver, - control_receiver: mpsc::Receiver, - stream: BoundJabberWriter, - on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver)>, -} +use crate::connection::write::WriteHandle; +use crate::connection::{SupervisorCommand, SupervisorHandle}; +use crate::error::Error; -struct JabberWrite { - stanza: Stanza, - respond_to: oneshot::Sender>, -} - -enum JabberWriterControl { - Disconnect, - Abort(oneshot::Sender>), -} - -impl JabberWriter { - fn new( - stanza_receiver: mpsc::Receiver, - control_receiver: mpsc::Receiver, - stream: BoundJabberWriter, - supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver)>, - ) -> Self { - Self { - stanza_receiver, - control_receiver, - stream, - on_crash: supervisor, - } - } - - async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { - Ok(self.stream.write(stanza).await?) - } - - async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.control_receiver.recv() => { - match msg { - JabberWriterControl::Disconnect => { - // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send - self.stanza_receiver.close(); - // TODO: put this in some kind of function to avoid code duplication - for msg in self.stanza_receiver.recv().await { - let result = self.stream.write(&msg.stanza).await; - match result { - Err(e) => match &e { - peanuts::Error::ReadError(error) => { - // make sure message is not lost from error, supervisor handles retry and reporting - self.on_crash.send((msg, self.stanza_receiver)); - break; - } - _ => { - msg.respond_to.send(Err(e.into())); - } - }, - _ => { - msg.respond_to.send(Ok(())); - } - } - } - self.stream.try_close().await; - break; - }, - JabberWriterControl::Abort(sender) => { - sender.send(self.stanza_receiver); - break; - }, - } - }, - Some(msg) = self.stanza_receiver.recv() => { - let result = self.stream.write(&msg.stanza).await; - match result { - Err(e) => match &e { - peanuts::Error::ReadError(error) => { - // make sure message is not lost from error, supervisor handles retry and reporting - self.on_crash.send((msg, self.stanza_receiver)); - break; - } - _ => { - msg.respond_to.send(Err(e.into())); - } - }, - _ => { - msg.respond_to.send(Ok(())); - } - } - }, - // TODO: check if this is ok to do - else => break, - } - } - } -} - -#[derive(Clone)] -pub struct JabberWriteHandle { - sender: mpsc::Sender, -} - -pub struct JabberWriterControlHandle { - sender: mpsc::Sender, - handle: JoinHandle<()>, -} - -impl JabberWriterControlHandle { - pub fn new( - stream: BoundJabberWriter, - supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver)>, - ) -> (JabberWriteHandle, JabberWriterControlHandle) { - let (control_sender, control_receiver) = mpsc::channel(20); - let (stanza_sender, stanza_receiver) = mpsc::channel(20); - - let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor); - let handle = tokio::spawn(async move { actor.run().await }); - - ( - JabberWriteHandle { - sender: stanza_sender, - }, - Self { - sender: control_sender, - handle, - }, - ) - } - - pub fn reconnect( - stream: BoundJabberWriter, - supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver)>, - stanza_receiver: mpsc::Receiver, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } -} - -pub struct JabberReader { - // TODO: place iq hashmap here - control_receiver: mpsc::Receiver, - stream: BoundJabberReader, - on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender, JoinSet<()>)>, - db: SqlitePool, - sender: mpsc::Sender, - supervisor_control: mpsc::Sender, - write_sender: mpsc::Sender, - tasks: JoinSet<()>, -} - -impl JabberReader { - fn new( - control_receiver: mpsc::Receiver, - stream: BoundJabberReader, - on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender, JoinSet<()>)>, - db: SqlitePool, - sender: mpsc::Sender, - // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) - supervisor_control: mpsc::Sender, - write_sender: mpsc::Sender, - ) -> Self { - Self { - control_receiver, - stream, - on_crash, - db, - sender, - supervisor_control, - write_sender, - tasks: JoinSet::new(), - } - } - - async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.control_receiver.recv() => { - match msg { - JabberReaderControl::Disconnect => todo!(), - JabberReaderControl::Abort(sender) => todo!(), - }; - }, - stanza = self.stream.read::() => { - match stanza { - Ok(_) => todo!(), - Err(_) => todo!(), - } - self.tasks.spawn(); - }, - else => break - } - } - } -} - -trait Task { - async fn handle(); -} - -impl Task for Stanza { - async fn handle() { - todo!() - } -} - -enum JabberReaderControl { - Disconnect, - Abort(oneshot::Sender>), -} - -struct JabberReaderControlHandle { - sender: mpsc::Sender, - handle: JoinHandle<()>, -} - -impl JabberReaderControlHandle { - pub fn new( - stream: BoundJabberReader, - on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender, JoinSet<()>)>, - db: SqlitePool, - sender: mpsc::Sender, - supervisor_control: mpsc::Sender, - jabber_write: mpsc::Sender, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = JabberReader::new( - control_receiver, - stream, - on_crash, - db, - sender, - supervisor_control, - jabber_write, - ); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } -} +mod connection; +mod error; pub struct Luz { receiver: mpsc::Receiver, jid: Arc>, // TODO: use a dyn passwordprovider trait to avoid storing password in memory password: String, - connected: Arc>>, + connected: Arc>>, db: SqlitePool, sender: mpsc::Sender, + /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected + connection_supervisor_shutdown: oneshot::Receiver<()>, + // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) + // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? tasks: JoinSet<()>, } @@ -304,7 +36,8 @@ impl Luz { receiver: mpsc::Receiver, jid: Arc>, password: String, - connected: Arc>>, + connected: Arc>>, + connection_supervisor_shutdown: oneshot::Receiver<()>, db: SqlitePool, sender: mpsc::Sender, ) -> Self { @@ -316,71 +49,87 @@ impl Luz { receiver, sender, tasks: JoinSet::new(), + connection_supervisor_shutdown, } } async fn run(mut self) { - while let Some(msg) = self.receiver.recv().await { - // TODO: consider separating disconnect/connect and commands apart from commandmessage - match msg { - CommandMessage::Connect => { - match self.connected.lock().await.as_ref() { - Some(_) => { - self.sender - .send(UpdateMessage::Error(Error::AlreadyConnected)) - .await; - } - None => { - let mut jid = self.jid.lock().await; - let mut domain = jid.domainpart.clone(); - let streams_result = - jabber::connect_and_login(&mut jid, &self.password, &mut domain) - .await; - match streams_result { - Ok(s) => { - let (writer, supervisor) = - JabberSupervisorHandle::new(s, self.sender.clone()); - *self.connected.lock().await = Some((writer, supervisor)); - } - Err(e) => { - self.sender.send(UpdateMessage::Error(e.into())); - } - } - } - }; + loop { + tokio::select! { + _ = &mut self.connection_supervisor_shutdown => { + *self.connected.lock().await = None } - CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { - None => { - self.sender - .send(UpdateMessage::Error(Error::AlreadyDisonnected)) - .await; - } - mut c => { - if let Some((_write_handle, supervisor_handle)) = c.take() { - let _ = supervisor_handle - .sender - .send(JabberSupervisorCommand::Disconnect) - .await; - } else { - unreachable!() - }; + Some(msg) = self.receiver.recv() => { + // TODO: consider separating disconnect/connect and commands apart from commandmessage + // TODO: dispatch commands separate tasks + match msg { + CommandMessage::Connect => { + match self.connected.lock().await.as_ref() { + Some(_) => { + self.sender + .send(UpdateMessage::Error(Error::AlreadyConnected)) + .await; + } + None => { + let mut jid = self.jid.lock().await; + let mut domain = jid.domainpart.clone(); + // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) + let streams_result = + jabber::connect_and_login(&mut jid, &self.password, &mut domain) + .await; + match streams_result { + Ok(s) => { + let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); + let (writer, supervisor) = SupervisorHandle::new( + s, + self.sender.clone(), + self.db.clone(), + shutdown_send, + ); + self.connection_supervisor_shutdown = shutdown_recv; + *self.connected.lock().await = Some((writer, supervisor)); + } + Err(e) => { + self.sender.send(UpdateMessage::Error(e.into())); + } + } + } + }; + } + CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { + None => { + self.sender + .send(UpdateMessage::Error(Error::AlreadyDisconnected)) + .await; + } + mut c => { + if let Some((_write_handle, supervisor_handle)) = c.take() { + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } + }, + _ => { + match self.connected.lock().await.as_ref() { + Some((w, s)) => self.tasks.spawn(msg.handle_online( + w.clone(), + s.sender(), + self.jid.clone(), + self.db.clone(), + self.sender.clone(), + // TODO: iq hashmap + )), + None => self.tasks.spawn(msg.handle_offline( + self.jid.clone(), + self.db.clone(), + self.sender.clone(), + )), + }; + } } }, - _ => { - match self.connected.lock().await.as_ref() { - Some((w, _)) => self.tasks.spawn(msg.handle_online( - w.clone(), - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - )), - None => self.tasks.spawn(msg.handle_offline( - self.jid.clone(), - self.db.clone(), - self.sender.clone(), - )), - }; - } + else => break, } } } @@ -398,7 +147,8 @@ impl CommandMessage { pub async fn handle_online( mut self, - jabber_write_handle: JabberWriteHandle, + write_handle: WriteHandle, + supervisor_control: SupervisorSender, // TODO: jid could lose resource by the end jid: Arc>, db: SqlitePool, @@ -418,12 +168,15 @@ impl LuzHandle { pub fn new(jid: JID, password: String, db: SqlitePool) -> Self { let (command_sender, command_receiver) = mpsc::channel(20); let (update_sender, update_receiver) = mpsc::channel(20); + // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) + let (sup_send, sup_recv) = oneshot::channel(); let actor = Luz::new( command_receiver, Arc::new(Mutex::new(jid)), password, Arc::new(Mutex::new(None)), + sup_recv, db, update_sender, ); @@ -436,146 +189,6 @@ impl LuzHandle { } } -pub struct JabberSupervisor { - connection_commands: mpsc::Receiver, - writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver)>, - reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender, JoinSet<()>)>, - sender: mpsc::Sender, - writer_handle: JabberWriterControlHandle, - reader_handle: JabberReaderControlHandle, -} - -pub enum JabberSupervisorCommand { - Disconnect, -} - -impl JabberSupervisor { - fn new( - connection_commands: mpsc::Receiver, - writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver)>, - reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender, JoinSet<()>)>, - sender: mpsc::Sender, - writer_handle: JabberWriterControlHandle, - reader_handle: JabberReaderControlHandle, - ) -> Self { - Self { - connection_commands, - writer_crash, - sender, - writer_handle, - reader_handle, - reader_crash, - } - } - - async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {} - - async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.connection_commands.recv() => { - self.handle_command_message(msg).await; - }, - error = self.writer_crash => { - - }, - error = self.reader_crash => { - - }, - } - } - } -} - -pub struct JabberSupervisorHandle { - sender: mpsc::Sender, - handle: JoinHandle<()>, -} - -impl JabberSupervisorHandle { - pub fn new( - streams: BoundJabberStream, - sender: mpsc::Sender, - db: SqlitePool, - update_sender: mpsc::Sender, - ) -> (JabberWriteHandle, Self) { - let (command_sender, command_receiver) = mpsc::channel(20); - let (writer_error_sender, writer_error_receiver) = oneshot::channel(); - let (reader_crash_sender, reader_crash_receiver) = oneshot::channel(); - - let (reader, writer) = streams.split(); - let (jabber_write_handle, jabber_writer_control_handle) = - JabberWriterControlHandle::new(writer, writer_error_sender); - let jabber_reader_control_handle = JabberReaderControlHandle::new( - reader, - reader_crash_sender, - db, - update_sender, - command_sender.clone(), - jabber_write_handle.sender.clone(), - ); - - let actor = JabberSupervisor::new( - command_receiver, - writer_error_receiver, - reader_crash_receiver, - sender, - jabber_writer_control_handle, - jabber_reader_control_handle, - ); - - let handle = tokio::spawn(async move { actor.run().await }); - - ( - jabber_write_handle, - Self { - sender: command_sender, - handle, - }, - ) - } -} - -pub enum Error { - AlreadyConnected, - PollSend(PollSendError), - Jabber(jabber::Error), - XML(peanuts::Error), - SQL(sqlx::Error), - JID(jid::ParseError), - AlreadyDisonnected, -} - -impl From for Error { - fn from(e: peanuts::Error) -> Self { - Self::XML(e) - } -} - -impl From for Error { - fn from(e: jid::ParseError) -> Self { - Self::JID(e) - } -} - -impl From for Error { - fn from(e: sqlx::Error) -> Self { - Self::SQL(e) - } -} - -impl From for Error { - fn from(e: jabber::Error) -> Self { - Self::Jabber(e) - } -} - -impl From> for Error { - fn from(e: PollSendError) -> Self { - Self::PollSend(e) - } -} - pub enum CommandMessage { Connect, Disconnect,