reconnection supervisor command

This commit is contained in:
cel 🌸 2025-02-12 06:19:02 +00:00
parent ec41f1d4ff
commit 8e6aa698b3
4 changed files with 105 additions and 19 deletions

42
TODO.md
View File

@ -2,22 +2,36 @@
## next
ci/cd: doc generation
feature: error handling on stream according to rfc6120
docs: jid
docs: jabber
docs: starttls
docs: sasl
docs: resource binding
feature: starttls
feature: sasl
feature: resource binding
## in progress
feature: logging
feat(luz): everything in rfc6120 and rfc6121
feat(luz): handle_online
feat(luz): handle_offline
feat(luz): handle_stanza
feat(luz): database
feat(luz): error handling on stream according to rfc6120
feat(luz): send message
feat(luz): receive message
feat(luz): retreive messages stored in database
feat(luz): get roster (online and offline)
feat(luz): set roster
feat(luz): reconnect supervisorcommand
feat: thiserror everywhere
feat(luz): proper stanza ids
test: proper tests
ci: doc generation
docs(jid): jid
feat(peanuts): derive macros for IntoElement and FromElement
docs(jabber): connection, starttls, sasl, binding, bound_stream, etc.
feat: proper logging for everything basically
feat(luz): passwordprovider trait, to avoid storing password in struct
feat(luz): auto-reconnect state stored in luz actor, for if e.g. server shut down
refactor(luz): dealing properly with all the joinsets and joinhandles
feat(peanuts): some kind of way to configure the reader and writer to log the raw xml written to the stream, probably by having a method that allows you to add a log writer to them. will need to investigate some kind of log namespacing.
feat(jabber): storing resource within the bound_stream connection
## done
feature: starttls
feature: sasl
feature: resource binding
feature: jabber client connection
feature: jid

View File

@ -46,7 +46,21 @@ pub enum SupervisorCommand {
Disconnect,
// for if there was a stream error, require to reconnect
// couldn't stream errors just cause a crash? lol
Reconnect,
Reconnect(State),
}
pub enum State {
Write(mpsc::Receiver<WriteMessage>),
Read(
(
SqlitePool,
mpsc::Sender<UpdateMessage>,
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
),
),
}
impl Supervisor {
@ -101,10 +115,70 @@ impl Supervisor {
}
break;
},
SupervisorCommand::Reconnect => {
SupervisorCommand::Reconnect(state) => {
// TODO: please omfg
// send abort to read stream, as already done, consider
todo!()
let (read_state, mut write_state);
match state {
// TODO: proper state things for read and write thread
State::Write(receiver) => {
write_state = receiver;
let (send, recv) = oneshot::channel();
let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
if let Ok(state) = recv.await {
read_state = state;
} else {
break
}
},
State::Read(read) => {
read_state = read;
let (send, recv) = oneshot::channel();
let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
// TODO: need a tokio select, in case the state arrives from somewhere else
if let Ok(state) = recv.await {
write_state = state;
} else {
break
}
},
}
let mut jid = self.jid.lock().await;
let mut domain = jid.domainpart.clone();
let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
self.writer_handle =
WriteControlHandle::reconnect(write, send, write_state);
let (send, recv) = oneshot::channel();
self.reader_crash = recv;
let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = read_state;
self.reader_handle = ReadControlHandle::reconnect(
read,
send,
db,
update_sender,
supervisor_command,
write_sender,
tasks,
pending_iqs,
);
},
Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_state.close();
while let Some(msg) = write_state.recv().await {
let _ = msg.respond_to.send(Err(Error::LostConnection));
}
let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
break;
},
}
},
}
},

View File

@ -21,7 +21,6 @@ use super::{
};
pub struct Read {
// TODO: place iq hashmap here
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(

View File

@ -138,7 +138,6 @@ impl Luz {
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
// TODO: iq hashmap
self.pending_iqs.clone()
)),
None => self.tasks.spawn(msg.handle_offline(