supervision and reconnection
This commit is contained in:
parent
41c1ba15ef
commit
1ed6317272
|
@ -1,15 +1,22 @@
|
||||||
use std::ops::{Deref, DerefMut};
|
// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
|
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
|
||||||
use read::ReadControlHandle;
|
use jid::JID;
|
||||||
|
use read::{ReadControl, ReadControlHandle};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
};
|
};
|
||||||
use write::{WriteControlHandle, WriteHandle, WriteMessage};
|
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
|
||||||
|
|
||||||
use crate::UpdateMessage;
|
use crate::{error::Error, UpdateMessage};
|
||||||
|
|
||||||
mod read;
|
mod read;
|
||||||
pub(crate) mod write;
|
pub(crate) mod write;
|
||||||
|
@ -21,16 +28,21 @@ pub struct Supervisor {
|
||||||
SqlitePool,
|
SqlitePool,
|
||||||
mpsc::Sender<UpdateMessage>,
|
mpsc::Sender<UpdateMessage>,
|
||||||
tokio::task::JoinSet<()>,
|
tokio::task::JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
)>,
|
)>,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
writer_handle: WriteControlHandle,
|
writer_handle: WriteControlHandle,
|
||||||
reader_handle: ReadControlHandle,
|
reader_handle: ReadControlHandle,
|
||||||
on_shutdown: oneshot::Sender<()>,
|
on_shutdown: oneshot::Sender<()>,
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
password: Arc<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum SupervisorCommand {
|
pub enum SupervisorCommand {
|
||||||
Disconnect,
|
Disconnect,
|
||||||
// for if there was a stream error, require to reconnect
|
// for if there was a stream error, require to reconnect
|
||||||
|
// couldn't stream errors just cause a crash? lol
|
||||||
Reconnect,
|
Reconnect,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,11 +50,19 @@ impl Supervisor {
|
||||||
fn new(
|
fn new(
|
||||||
connection_commands: mpsc::Receiver<SupervisorCommand>,
|
connection_commands: mpsc::Receiver<SupervisorCommand>,
|
||||||
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
|
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
|
||||||
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
reader_crash: oneshot::Receiver<(
|
||||||
|
SqlitePool,
|
||||||
|
mpsc::Sender<UpdateMessage>,
|
||||||
|
JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
|
)>,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
writer_handle: WriteControlHandle,
|
writer_handle: WriteControlHandle,
|
||||||
reader_handle: ReadControlHandle,
|
reader_handle: ReadControlHandle,
|
||||||
on_shutdown: oneshot::Sender<()>,
|
on_shutdown: oneshot::Sender<()>,
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
password: Arc<String>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
connection_commands,
|
connection_commands,
|
||||||
|
@ -52,27 +72,137 @@ impl Supervisor {
|
||||||
reader_handle,
|
reader_handle,
|
||||||
reader_crash,
|
reader_crash,
|
||||||
on_shutdown,
|
on_shutdown,
|
||||||
|
jid,
|
||||||
|
password,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_command_message(&mut self, msg: SupervisorCommand) {}
|
|
||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(msg) = self.connection_commands.recv() => {
|
Some(msg) = self.connection_commands.recv() => {
|
||||||
self.handle_command_message(msg).await;
|
match msg {
|
||||||
|
SupervisorCommand::Disconnect => {
|
||||||
|
let _ = self.writer_handle.send(WriteControl::Disconnect).await;
|
||||||
|
let _ = self.reader_handle.send(ReadControl::Disconnect).await;
|
||||||
|
tokio::select! {
|
||||||
|
_ = async { tokio::join!(
|
||||||
|
async { let _ = (&mut self.writer_handle.handle).await; },
|
||||||
|
async { let _ = (&mut self.reader_handle.handle).await; }
|
||||||
|
) } => {},
|
||||||
|
_ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
|
||||||
|
(&mut self.reader_handle.handle).abort();
|
||||||
|
(&mut self.writer_handle.handle).abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
SupervisorCommand::Reconnect => {
|
||||||
|
// TODO: please omfg
|
||||||
|
// send abort to read stream, as already done, consider
|
||||||
|
todo!()
|
||||||
|
},
|
||||||
|
}
|
||||||
},
|
},
|
||||||
error = &mut self.writer_crash => {
|
Ok((write_msg, mut write_recv)) = &mut self.writer_crash => {
|
||||||
|
// consider awaiting/aborting the read and write threads
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
|
||||||
|
let (db, update_sender, tasks, supervisor_command, write_sender) = tokio::select! {
|
||||||
|
Ok(s) = recv => s,
|
||||||
|
Ok(s) = &mut self.reader_crash => s,
|
||||||
|
// in case, just break as irrecoverable
|
||||||
|
else => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut jid = self.jid.lock().await;
|
||||||
|
let mut domain = jid.domainpart.clone();
|
||||||
|
let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
|
||||||
|
match connection {
|
||||||
|
Ok(c) => {
|
||||||
|
let (read, write) = c.split();
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.writer_crash = recv;
|
||||||
|
self.writer_handle =
|
||||||
|
WriteControlHandle::reconnect_retry(write, send, write_msg, write_recv);
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.reader_crash = recv;
|
||||||
|
self.reader_handle = ReadControlHandle::reconnect(
|
||||||
|
read,
|
||||||
|
send,
|
||||||
|
db,
|
||||||
|
update_sender,
|
||||||
|
supervisor_command,
|
||||||
|
write_sender,
|
||||||
|
tasks
|
||||||
|
);
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
|
||||||
|
write_recv.close();
|
||||||
|
let _ = write_msg.respond_to.send(Err(Error::LostConnection));
|
||||||
|
while let Some(msg) = write_recv.recv().await {
|
||||||
|
let _ = msg.respond_to.send(Err(Error::LostConnection));
|
||||||
|
}
|
||||||
|
let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
}
|
||||||
},
|
},
|
||||||
error = &mut self.reader_crash => {
|
Ok((db, update_sender, tasks, supervisor_control, write_handle)) = &mut self.reader_crash => {
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
|
||||||
|
let (retry_msg, mut write_receiver) = tokio::select! {
|
||||||
|
Ok(s) = recv => (None, s),
|
||||||
|
Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
|
||||||
|
// in case, just break as irrecoverable
|
||||||
|
else => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut jid = self.jid.lock().await;
|
||||||
|
let mut domain = jid.domainpart.clone();
|
||||||
|
let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
|
||||||
|
match connection {
|
||||||
|
Ok(c) => {
|
||||||
|
let (read, write) = c.split();
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.writer_crash = recv;
|
||||||
|
if let Some(msg) = retry_msg {
|
||||||
|
self.writer_handle =
|
||||||
|
WriteControlHandle::reconnect_retry(write, send, msg, write_receiver);
|
||||||
|
} else {
|
||||||
|
self.writer_handle = WriteControlHandle::reconnect(write, send, write_receiver)
|
||||||
|
}
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.reader_crash = recv;
|
||||||
|
self.reader_handle = ReadControlHandle::reconnect(
|
||||||
|
read,
|
||||||
|
send,
|
||||||
|
db,
|
||||||
|
update_sender,
|
||||||
|
supervisor_control,
|
||||||
|
write_handle,
|
||||||
|
tasks
|
||||||
|
);
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// if reconnection failure, respond to all current messages with lost connection error.
|
||||||
|
write_receiver.close();
|
||||||
|
if let Some(msg) = retry_msg {
|
||||||
|
msg.respond_to.send(Err(Error::LostConnection));
|
||||||
|
}
|
||||||
|
while let Some(msg) = write_receiver.recv().await {
|
||||||
|
msg.respond_to.send(Err(Error::LostConnection));
|
||||||
|
}
|
||||||
|
let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
}
|
||||||
},
|
},
|
||||||
else => break,
|
else => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.on_shutdown.send(());
|
let _ = self.on_shutdown.send(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +250,8 @@ impl SupervisorHandle {
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
on_shutdown: oneshot::Sender<()>,
|
on_shutdown: oneshot::Sender<()>,
|
||||||
|
jid: Arc<Mutex<JID>>,
|
||||||
|
password: Arc<String>,
|
||||||
) -> (WriteHandle, Self) {
|
) -> (WriteHandle, Self) {
|
||||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||||
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
||||||
|
@ -145,6 +277,8 @@ impl SupervisorHandle {
|
||||||
write_control_handle,
|
write_control_handle,
|
||||||
jabber_reader_control_handle,
|
jabber_reader_control_handle,
|
||||||
on_shutdown,
|
on_shutdown,
|
||||||
|
jid,
|
||||||
|
password,
|
||||||
);
|
);
|
||||||
|
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
|
@ -1,3 +1,8 @@
|
||||||
|
use std::{
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
|
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use stanza::client::Stanza;
|
use stanza::client::Stanza;
|
||||||
|
@ -6,7 +11,7 @@ use tokio::{
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::UpdateMessage;
|
use crate::{error::Error, UpdateMessage};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
write::{WriteHandle, WriteMessage},
|
write::{WriteHandle, WriteMessage},
|
||||||
|
@ -17,25 +22,41 @@ pub struct Read {
|
||||||
// TODO: place iq hashmap here
|
// TODO: place iq hashmap here
|
||||||
control_receiver: mpsc::Receiver<ReadControl>,
|
control_receiver: mpsc::Receiver<ReadControl>,
|
||||||
stream: BoundJabberReader<Tls>,
|
stream: BoundJabberReader<Tls>,
|
||||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
on_crash: oneshot::Sender<(
|
||||||
|
SqlitePool,
|
||||||
|
mpsc::Sender<UpdateMessage>,
|
||||||
|
JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
write_handle: WriteHandle,
|
write_handle: WriteHandle,
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
|
disconnecting: bool,
|
||||||
|
disconnect_timedout: oneshot::Receiver<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Read {
|
impl Read {
|
||||||
fn new(
|
fn new(
|
||||||
control_receiver: mpsc::Receiver<ReadControl>,
|
control_receiver: mpsc::Receiver<ReadControl>,
|
||||||
stream: BoundJabberReader<Tls>,
|
stream: BoundJabberReader<Tls>,
|
||||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
on_crash: oneshot::Sender<(
|
||||||
|
SqlitePool,
|
||||||
|
mpsc::Sender<UpdateMessage>,
|
||||||
|
JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
|
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
write_sender: WriteHandle,
|
write_handle: WriteHandle,
|
||||||
|
tasks: JoinSet<()>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
Self {
|
Self {
|
||||||
control_receiver,
|
control_receiver,
|
||||||
stream,
|
stream,
|
||||||
|
@ -43,26 +64,73 @@ impl Read {
|
||||||
db,
|
db,
|
||||||
update_sender,
|
update_sender,
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
write_handle: write_sender,
|
write_handle,
|
||||||
tasks: JoinSet::new(),
|
tasks,
|
||||||
|
disconnecting: false,
|
||||||
|
disconnect_timedout: recv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
// if still haven't received the end tag in time, just kill itself
|
||||||
|
_ = &mut self.disconnect_timedout => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
Some(msg) = self.control_receiver.recv() => {
|
Some(msg) = self.control_receiver.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
ReadControl::Disconnect => todo!(),
|
// when disconnect received,
|
||||||
ReadControl::Abort(sender) => todo!(),
|
ReadControl::Disconnect => {
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.disconnect_timedout = recv;
|
||||||
|
self.disconnecting = true;
|
||||||
|
tokio::spawn(async {
|
||||||
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
|
let _ = send.send(());
|
||||||
|
})
|
||||||
|
},
|
||||||
|
ReadControl::Abort(sender) => {
|
||||||
|
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
|
||||||
|
break;
|
||||||
|
},
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
stanza = self.stream.read::<Stanza>() => {
|
stanza = self.stream.read::<Stanza>() => {
|
||||||
match stanza {
|
match stanza {
|
||||||
Ok(_) => todo!(),
|
Ok(s) => {
|
||||||
Err(_) => todo!(),
|
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
|
||||||
|
// match e {
|
||||||
|
// peanuts::Error::ReadError(error) => todo!(),
|
||||||
|
// peanuts::Error::Utf8Error(utf8_error) => todo!(),
|
||||||
|
// peanuts::Error::ParseError(_) => todo!(),
|
||||||
|
// peanuts::Error::EntityProcessError(_) => todo!(),
|
||||||
|
// peanuts::Error::InvalidCharRef(_) => todo!(),
|
||||||
|
// peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
|
||||||
|
// peanuts::Error::DuplicateAttribute(_) => todo!(),
|
||||||
|
// peanuts::Error::UnqualifiedNamespace(_) => todo!(),
|
||||||
|
// peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
|
||||||
|
// peanuts::Error::NotInElement(_) => todo!(),
|
||||||
|
// peanuts::Error::ExtraData(_) => todo!(),
|
||||||
|
// peanuts::Error::UndeclaredNamespace(_) => todo!(),
|
||||||
|
// peanuts::Error::IncorrectName(name) => todo!(),
|
||||||
|
// peanuts::Error::DeserializeError(_) => todo!(),
|
||||||
|
// peanuts::Error::Deserialize(deserialize_error) => todo!(),
|
||||||
|
// peanuts::Error::RootElementEnded => todo!(),
|
||||||
|
// }
|
||||||
|
// TODO: make sure this only happens when an end tag is received
|
||||||
|
if self.disconnecting == true {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
|
||||||
|
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
},
|
||||||
}
|
}
|
||||||
self.tasks.spawn();
|
|
||||||
},
|
},
|
||||||
else => break
|
else => break
|
||||||
}
|
}
|
||||||
|
@ -70,30 +138,63 @@ impl Read {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Task {
|
// what do stanza processes do?
|
||||||
async fn handle();
|
// - update ui
|
||||||
|
// - access database
|
||||||
|
// - disconnect proper, reconnect
|
||||||
|
// - respond to server requests
|
||||||
|
async fn handle_stanza(
|
||||||
|
stanza: Stanza,
|
||||||
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
db: SqlitePool,
|
||||||
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
|
write_handle: WriteHandle,
|
||||||
|
) {
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Task for Stanza {
|
pub enum ReadControl {
|
||||||
async fn handle() {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ReadControl {
|
|
||||||
Disconnect,
|
Disconnect,
|
||||||
Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
|
Abort(
|
||||||
|
oneshot::Sender<(
|
||||||
|
SqlitePool,
|
||||||
|
mpsc::Sender<UpdateMessage>,
|
||||||
|
JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
|
)>,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ReadControlHandle {
|
pub struct ReadControlHandle {
|
||||||
sender: mpsc::Sender<ReadControl>,
|
sender: mpsc::Sender<ReadControl>,
|
||||||
handle: JoinHandle<()>,
|
pub(crate) handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for ReadControlHandle {
|
||||||
|
type Target = mpsc::Sender<ReadControl>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.sender
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DerefMut for ReadControlHandle {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.sender
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReadControlHandle {
|
impl ReadControlHandle {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
stream: BoundJabberReader<Tls>,
|
stream: BoundJabberReader<Tls>,
|
||||||
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
|
on_crash: oneshot::Sender<(
|
||||||
|
SqlitePool,
|
||||||
|
mpsc::Sender<UpdateMessage>,
|
||||||
|
JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
|
@ -109,6 +210,42 @@ impl ReadControlHandle {
|
||||||
sender,
|
sender,
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
jabber_write,
|
jabber_write,
|
||||||
|
JoinSet::new(),
|
||||||
|
);
|
||||||
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
Self {
|
||||||
|
sender: control_sender,
|
||||||
|
handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reconnect(
|
||||||
|
stream: BoundJabberReader<Tls>,
|
||||||
|
on_crash: oneshot::Sender<(
|
||||||
|
SqlitePool,
|
||||||
|
mpsc::Sender<UpdateMessage>,
|
||||||
|
JoinSet<()>,
|
||||||
|
mpsc::Sender<SupervisorCommand>,
|
||||||
|
WriteHandle,
|
||||||
|
)>,
|
||||||
|
db: SqlitePool,
|
||||||
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
|
jabber_write: WriteHandle,
|
||||||
|
tasks: JoinSet<()>,
|
||||||
|
) -> Self {
|
||||||
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
let actor = Read::new(
|
||||||
|
control_receiver,
|
||||||
|
stream,
|
||||||
|
on_crash,
|
||||||
|
db,
|
||||||
|
sender,
|
||||||
|
supervisor_control,
|
||||||
|
jabber_write,
|
||||||
|
tasks,
|
||||||
);
|
);
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
|
|
@ -19,10 +19,10 @@ pub struct Write {
|
||||||
|
|
||||||
pub struct WriteMessage {
|
pub struct WriteMessage {
|
||||||
stanza: Stanza,
|
stanza: Stanza,
|
||||||
respond_to: oneshot::Sender<Result<(), Error>>,
|
pub respond_to: oneshot::Sender<Result<(), Error>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum WriteControl {
|
pub enum WriteControl {
|
||||||
Disconnect,
|
Disconnect,
|
||||||
Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
|
Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
|
||||||
}
|
}
|
||||||
|
@ -46,38 +46,66 @@ impl Write {
|
||||||
Ok(self.stream.write(stanza).await?)
|
Ok(self.stream.write(stanza).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn run_reconnected(mut self, retry_msg: WriteMessage) {
|
||||||
|
// try to retry sending the message that failed to send previously
|
||||||
|
let result = self.stream.write(&retry_msg.stanza).await;
|
||||||
|
match result {
|
||||||
|
Err(e) => match &e {
|
||||||
|
peanuts::Error::ReadError(_error) => {
|
||||||
|
// make sure message is not lost from error, supervisor handles retry and reporting
|
||||||
|
// TODO: upon reconnect, make sure we are not stuck in a reconnection loop
|
||||||
|
let _ = self.on_crash.send((retry_msg, self.stanza_receiver));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let _ = retry_msg.respond_to.send(Err(e.into()));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
let _ = retry_msg.respond_to.send(Ok(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// return to normal loop
|
||||||
|
self.run().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(msg) = self.control_receiver.recv() => {
|
Some(msg) = self.control_receiver.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
WriteControl::Disconnect => {
|
WriteControl::Disconnect => {
|
||||||
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
|
// close the stanza_receiver channel and drain out all of the remaining stanzas to send
|
||||||
self.stanza_receiver.close();
|
self.stanza_receiver.close();
|
||||||
// TODO: put this in some kind of function to avoid code duplication
|
// TODO: put this in some kind of function to avoid code duplication
|
||||||
for msg in self.stanza_receiver.recv().await {
|
while let Some(msg) = self.stanza_receiver.recv().await {
|
||||||
let result = self.stream.write(&msg.stanza).await;
|
let result = self.stream.write(&msg.stanza).await;
|
||||||
match result {
|
match result {
|
||||||
Err(e) => match &e {
|
Err(e) => match &e {
|
||||||
peanuts::Error::ReadError(error) => {
|
peanuts::Error::ReadError(_error) => {
|
||||||
// make sure message is not lost from error, supervisor handles retry and reporting
|
// if connection lost during disconnection, just send lost connection error to the write requests
|
||||||
self.on_crash.send((msg, self.stanza_receiver));
|
let _ = msg.respond_to.send(Err(Error::LostConnection));
|
||||||
|
while let Some(msg) = self.stanza_receiver.recv().await {
|
||||||
|
let _ = msg.respond_to.send(Err(Error::LostConnection));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
// otherwise complete sending all the stanzas currently in the queue
|
||||||
_ => {
|
_ => {
|
||||||
msg.respond_to.send(Err(e.into()));
|
let _ = msg.respond_to.send(Err(e.into()));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
msg.respond_to.send(Ok(()));
|
let _ = msg.respond_to.send(Ok(()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.stream.try_close().await;
|
let _ = self.stream.try_close().await;
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
|
// in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
|
||||||
WriteControl::Abort(sender) => {
|
WriteControl::Abort(sender) => {
|
||||||
sender.send(self.stanza_receiver);
|
let _ = sender.send(self.stanza_receiver);
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -86,21 +114,20 @@ impl Write {
|
||||||
let result = self.stream.write(&msg.stanza).await;
|
let result = self.stream.write(&msg.stanza).await;
|
||||||
match result {
|
match result {
|
||||||
Err(e) => match &e {
|
Err(e) => match &e {
|
||||||
peanuts::Error::ReadError(error) => {
|
peanuts::Error::ReadError(_error) => {
|
||||||
// make sure message is not lost from error, supervisor handles retry and reporting
|
// make sure message is not lost from error, supervisor handles retry and reporting
|
||||||
self.on_crash.send((msg, self.stanza_receiver));
|
let _ = self.on_crash.send((msg, self.stanza_receiver));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
msg.respond_to.send(Err(e.into()));
|
let _ = msg.respond_to.send(Err(e.into()));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
msg.respond_to.send(Ok(()));
|
let _ = msg.respond_to.send(Ok(()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// TODO: check if this is ok to do
|
|
||||||
else => break,
|
else => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -128,7 +155,21 @@ impl DerefMut for WriteHandle {
|
||||||
|
|
||||||
pub struct WriteControlHandle {
|
pub struct WriteControlHandle {
|
||||||
sender: mpsc::Sender<WriteControl>,
|
sender: mpsc::Sender<WriteControl>,
|
||||||
handle: JoinHandle<()>,
|
pub(crate) handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for WriteControlHandle {
|
||||||
|
type Target = mpsc::Sender<WriteControl>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.sender
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DerefMut for WriteControlHandle {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.sender
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WriteControlHandle {
|
impl WriteControlHandle {
|
||||||
|
@ -153,6 +194,23 @@ impl WriteControlHandle {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn reconnect_retry(
|
||||||
|
stream: BoundJabberWriter<Tls>,
|
||||||
|
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
|
||||||
|
retry_msg: WriteMessage,
|
||||||
|
stanza_receiver: mpsc::Receiver<WriteMessage>,
|
||||||
|
) -> Self {
|
||||||
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
|
||||||
|
let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
|
||||||
|
|
||||||
|
Self {
|
||||||
|
sender: control_sender,
|
||||||
|
handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn reconnect(
|
pub fn reconnect(
|
||||||
stream: BoundJabberWriter<Tls>,
|
stream: BoundJabberWriter<Tls>,
|
||||||
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
|
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
|
||||||
|
|
|
@ -5,6 +5,7 @@ pub enum Error {
|
||||||
SQL(sqlx::Error),
|
SQL(sqlx::Error),
|
||||||
JID(jid::ParseError),
|
JID(jid::ParseError),
|
||||||
AlreadyDisconnected,
|
AlreadyDisconnected,
|
||||||
|
LostConnection,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<peanuts::Error> for Error {
|
impl From<peanuts::Error> for Error {
|
||||||
|
|
|
@ -20,7 +20,7 @@ pub struct Luz {
|
||||||
receiver: mpsc::Receiver<CommandMessage>,
|
receiver: mpsc::Receiver<CommandMessage>,
|
||||||
jid: Arc<Mutex<JID>>,
|
jid: Arc<Mutex<JID>>,
|
||||||
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
||||||
password: String,
|
password: Arc<String>,
|
||||||
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
|
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
@ -43,7 +43,7 @@ impl Luz {
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
jid,
|
jid,
|
||||||
password,
|
password: Arc::new(password),
|
||||||
connected,
|
connected,
|
||||||
db,
|
db,
|
||||||
receiver,
|
receiver,
|
||||||
|
@ -75,7 +75,7 @@ impl Luz {
|
||||||
let mut domain = jid.domainpart.clone();
|
let mut domain = jid.domainpart.clone();
|
||||||
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
|
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
|
||||||
let streams_result =
|
let streams_result =
|
||||||
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
|
jabber::connect_and_login(&mut jid, &*self.password, &mut domain)
|
||||||
.await;
|
.await;
|
||||||
match streams_result {
|
match streams_result {
|
||||||
Ok(s) => {
|
Ok(s) => {
|
||||||
|
@ -85,6 +85,8 @@ impl Luz {
|
||||||
self.sender.clone(),
|
self.sender.clone(),
|
||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
shutdown_send,
|
shutdown_send,
|
||||||
|
self.jid.clone(),
|
||||||
|
self.password.clone(),
|
||||||
);
|
);
|
||||||
self.connection_supervisor_shutdown = shutdown_recv;
|
self.connection_supervisor_shutdown = shutdown_recv;
|
||||||
*self.connected.lock().await = Some((writer, supervisor));
|
*self.connected.lock().await = Some((writer, supervisor));
|
||||||
|
|
Loading…
Reference in New Issue