From 8e6aa698b35f62dcd3d5c627f39dde53d0b1154d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?cel=20=F0=9F=8C=B8?= Date: Wed, 12 Feb 2025 06:19:02 +0000 Subject: [PATCH] reconnection supervisor command --- TODO.md | 42 +++++++++++++------- luz/src/connection/mod.rs | 80 ++++++++++++++++++++++++++++++++++++-- luz/src/connection/read.rs | 1 - luz/src/lib.rs | 1 - 4 files changed, 105 insertions(+), 19 deletions(-) diff --git a/TODO.md b/TODO.md index 48e0556..47c86d9 100644 --- a/TODO.md +++ b/TODO.md @@ -2,22 +2,36 @@ ## next -ci/cd: doc generation -feature: error handling on stream according to rfc6120 -docs: jid -docs: jabber -docs: starttls -docs: sasl -docs: resource binding -feature: starttls -feature: sasl -feature: resource binding - -## in progress - -feature: logging +feat(luz): everything in rfc6120 and rfc6121 + feat(luz): handle_online + feat(luz): handle_offline + feat(luz): handle_stanza + feat(luz): database + feat(luz): error handling on stream according to rfc6120 + feat(luz): send message + feat(luz): receive message + feat(luz): retreive messages stored in database + feat(luz): get roster (online and offline) + feat(luz): set roster + feat(luz): reconnect supervisorcommand +feat: thiserror everywhere +feat(luz): proper stanza ids +test: proper tests +ci: doc generation +docs(jid): jid +feat(peanuts): derive macros for IntoElement and FromElement +docs(jabber): connection, starttls, sasl, binding, bound_stream, etc. +feat: proper logging for everything basically +feat(luz): passwordprovider trait, to avoid storing password in struct +feat(luz): auto-reconnect state stored in luz actor, for if e.g. server shut down +refactor(luz): dealing properly with all the joinsets and joinhandles +feat(peanuts): some kind of way to configure the reader and writer to log the raw xml written to the stream, probably by having a method that allows you to add a log writer to them. will need to investigate some kind of log namespacing. +feat(jabber): storing resource within the bound_stream connection ## done +feature: starttls +feature: sasl +feature: resource binding feature: jabber client connection feature: jid diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index f8cf18b..d05eb62 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -46,7 +46,21 @@ pub enum SupervisorCommand { Disconnect, // for if there was a stream error, require to reconnect // couldn't stream errors just cause a crash? lol - Reconnect, + Reconnect(State), +} + +pub enum State { + Write(mpsc::Receiver), + Read( + ( + SqlitePool, + mpsc::Sender, + tokio::task::JoinSet<()>, + mpsc::Sender, + WriteHandle, + Arc>>>>, + ), + ), } impl Supervisor { @@ -101,10 +115,70 @@ impl Supervisor { } break; }, - SupervisorCommand::Reconnect => { + SupervisorCommand::Reconnect(state) => { // TODO: please omfg // send abort to read stream, as already done, consider - todo!() + let (read_state, mut write_state); + match state { + // TODO: proper state things for read and write thread + State::Write(receiver) => { + write_state = receiver; + let (send, recv) = oneshot::channel(); + let _ = self.reader_handle.send(ReadControl::Abort(send)).await; + if let Ok(state) = recv.await { + read_state = state; + } else { + break + } + }, + State::Read(read) => { + read_state = read; + let (send, recv) = oneshot::channel(); + let _ = self.writer_handle.send(WriteControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + write_state = state; + } else { + break + } + }, + } + + let mut jid = self.jid.lock().await; + let mut domain = jid.domainpart.clone(); + let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + self.writer_handle = + WriteControlHandle::reconnect(write, send, write_state); + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = read_state; + self.reader_handle = ReadControlHandle::reconnect( + read, + send, + db, + update_sender, + supervisor_command, + write_sender, + tasks, + pending_iqs, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. + write_state.close(); + while let Some(msg) = write_state.recv().await { + let _ = msg.respond_to.send(Err(Error::LostConnection)); + } + let _ = self.sender.send(UpdateMessage::Error(e.into())).await; + break; + }, + } }, } }, diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 3c61780..c2828ad 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -21,7 +21,6 @@ use super::{ }; pub struct Read { - // TODO: place iq hashmap here control_receiver: mpsc::Receiver, stream: BoundJabberReader, on_crash: oneshot::Sender<( diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 4d36a81..4c5a841 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -138,7 +138,6 @@ impl Luz { self.jid.clone(), self.db.clone(), self.sender.clone(), - // TODO: iq hashmap self.pending_iqs.clone() )), None => self.tasks.spawn(msg.handle_offline(