WIP: code cleanup

This commit is contained in:
cel 🌸 2025-02-10 17:48:39 +00:00
parent a1d96233e8
commit 41c1ba15ef
5 changed files with 587 additions and 485 deletions

166
luz/src/connection/mod.rs Normal file
View File

@ -0,0 +1,166 @@
use std::ops::{Deref, DerefMut};
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use read::ReadControlHandle;
use sqlx::SqlitePool;
use tokio::{
sync::{mpsc, oneshot},
task::{JoinHandle, JoinSet},
};
use write::{WriteControlHandle, WriteHandle, WriteMessage};
use crate::UpdateMessage;
mod read;
pub(crate) mod write;
pub struct Supervisor {
connection_commands: mpsc::Receiver<SupervisorCommand>,
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
reader_crash: oneshot::Receiver<(
SqlitePool,
mpsc::Sender<UpdateMessage>,
tokio::task::JoinSet<()>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
reader_handle: ReadControlHandle,
on_shutdown: oneshot::Sender<()>,
}
pub enum SupervisorCommand {
Disconnect,
// for if there was a stream error, require to reconnect
Reconnect,
}
impl Supervisor {
fn new(
connection_commands: mpsc::Receiver<SupervisorCommand>,
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
reader_handle: ReadControlHandle,
on_shutdown: oneshot::Sender<()>,
) -> Self {
Self {
connection_commands,
writer_crash,
sender,
writer_handle,
reader_handle,
reader_crash,
on_shutdown,
}
}
async fn handle_command_message(&mut self, msg: SupervisorCommand) {}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.connection_commands.recv() => {
self.handle_command_message(msg).await;
},
error = &mut self.writer_crash => {
},
error = &mut self.reader_crash => {
},
else => break,
}
}
self.on_shutdown.send(());
}
}
pub struct SupervisorHandle {
sender: SupervisorSender,
handle: JoinHandle<()>,
}
impl Deref for SupervisorHandle {
type Target = SupervisorSender;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for SupervisorHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
#[derive(Clone)]
pub struct SupervisorSender {
sender: mpsc::Sender<SupervisorCommand>,
}
impl Deref for SupervisorSender {
type Target = mpsc::Sender<SupervisorCommand>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for SupervisorSender {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
impl SupervisorHandle {
pub fn new(
streams: BoundJabberStream<Tls>,
update_sender: mpsc::Sender<UpdateMessage>,
db: SqlitePool,
on_shutdown: oneshot::Sender<()>,
) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
let (reader, writer) = streams.split();
let (write_handle, write_control_handle) =
WriteControlHandle::new(writer, writer_error_sender);
let jabber_reader_control_handle = ReadControlHandle::new(
reader,
reader_crash_sender,
db,
update_sender.clone(),
command_sender.clone(),
write_handle.clone(),
);
let actor = Supervisor::new(
command_receiver,
writer_error_receiver,
reader_crash_receiver,
update_sender,
write_control_handle,
jabber_reader_control_handle,
on_shutdown,
);
let handle = tokio::spawn(async move { actor.run().await });
(
write_handle,
Self {
sender: SupervisorSender {
sender: command_sender,
},
handle,
},
)
}
pub fn sender(&self) -> SupervisorSender {
self.sender.clone()
}
}

120
luz/src/connection/read.rs Normal file
View File

@ -0,0 +1,120 @@
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use sqlx::SqlitePool;
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot},
task::{JoinHandle, JoinSet},
};
use crate::UpdateMessage;
use super::{
write::{WriteHandle, WriteMessage},
SupervisorCommand,
};
pub struct Read {
// TODO: place iq hashmap here
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
}
impl Read {
fn new(
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_sender: WriteHandle,
) -> Self {
Self {
control_receiver,
stream,
on_crash,
db,
update_sender,
supervisor_control,
write_handle: write_sender,
tasks: JoinSet::new(),
}
}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
ReadControl::Disconnect => todo!(),
ReadControl::Abort(sender) => todo!(),
};
},
stanza = self.stream.read::<Stanza>() => {
match stanza {
Ok(_) => todo!(),
Err(_) => todo!(),
}
self.tasks.spawn();
},
else => break
}
}
}
}
trait Task {
async fn handle();
}
impl Task for Stanza {
async fn handle() {
todo!()
}
}
enum ReadControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
}
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
handle: JoinHandle<()>,
}
impl ReadControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
control_receiver,
stream,
on_crash,
db,
sender,
supervisor_control,
jabber_write,
);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}

171
luz/src/connection/write.rs Normal file
View File

@ -0,0 +1,171 @@
use std::ops::{Deref, DerefMut};
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
use crate::error::Error;
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
stanza_receiver: mpsc::Receiver<WriteMessage>,
control_receiver: mpsc::Receiver<WriteControl>,
stream: BoundJabberWriter<Tls>,
on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
}
pub struct WriteMessage {
stanza: Stanza,
respond_to: oneshot::Sender<Result<(), Error>>,
}
enum WriteControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
}
impl Write {
fn new(
stanza_receiver: mpsc::Receiver<WriteMessage>,
control_receiver: mpsc::Receiver<WriteControl>,
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
) -> Self {
Self {
stanza_receiver,
control_receiver,
stream,
on_crash: supervisor,
}
}
async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
Ok(self.stream.write(stanza).await?)
}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
WriteControl::Disconnect => {
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
self.stanza_receiver.close();
// TODO: put this in some kind of function to avoid code duplication
for msg in self.stanza_receiver.recv().await {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
peanuts::Error::ReadError(error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
msg.respond_to.send(Err(e.into()));
}
},
_ => {
msg.respond_to.send(Ok(()));
}
}
}
self.stream.try_close().await;
break;
},
WriteControl::Abort(sender) => {
sender.send(self.stanza_receiver);
break;
},
}
},
Some(msg) = self.stanza_receiver.recv() => {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
peanuts::Error::ReadError(error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
msg.respond_to.send(Err(e.into()));
}
},
_ => {
msg.respond_to.send(Ok(()));
}
}
},
// TODO: check if this is ok to do
else => break,
}
}
}
}
#[derive(Clone)]
pub struct WriteHandle {
sender: mpsc::Sender<WriteMessage>,
}
impl Deref for WriteHandle {
type Target = mpsc::Sender<WriteMessage>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for WriteHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
pub struct WriteControlHandle {
sender: mpsc::Sender<WriteControl>,
handle: JoinHandle<()>,
}
impl WriteControlHandle {
pub fn new(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
) -> (WriteHandle, Self) {
let (control_sender, control_receiver) = mpsc::channel(20);
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
let handle = tokio::spawn(async move { actor.run().await });
(
WriteHandle {
sender: stanza_sender,
},
Self {
sender: control_sender,
handle,
},
)
}
pub fn reconnect(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}

32
luz/src/error.rs Normal file
View File

@ -0,0 +1,32 @@
pub enum Error {
AlreadyConnected,
Jabber(jabber::Error),
XML(peanuts::Error),
SQL(sqlx::Error),
JID(jid::ParseError),
AlreadyDisconnected,
}
impl From<peanuts::Error> for Error {
fn from(e: peanuts::Error) -> Self {
Self::XML(e)
}
}
impl From<jid::ParseError> for Error {
fn from(e: jid::ParseError) -> Self {
Self::JID(e)
}
}
impl From<sqlx::Error> for Error {
fn from(e: sqlx::Error) -> Self {
Self::SQL(e)
}
}
impl From<jabber::Error> for Error {
fn from(e: jabber::Error) -> Self {
Self::Jabber(e)
}
}

View File

@ -1,301 +1,33 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
fmt::Pointer,
pin::pin,
sync::{atomic::AtomicBool, Arc},
task::{ready, Poll},
};
use std::sync::Arc;
use futures::{
stream::{SplitSink, SplitStream},
Sink, SinkExt, Stream, StreamExt,
};
use jabber::{
connection::Tls,
jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter},
JID,
};
use sqlx::{query, Pool, Sqlite, SqlitePool};
use stanza::{
client::{
iq::{Iq, IqType, Query},
Stanza,
},
roster,
};
use connection::SupervisorSender;
use jabber::JID;
use sqlx::SqlitePool;
use stanza::roster;
use tokio::{
io::AsyncRead,
select,
sync::{
mpsc::{self, Receiver, Sender},
oneshot, Mutex,
},
task::{JoinHandle, JoinSet},
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::{PollSendError, PollSender};
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct JabberWriter {
stanza_receiver: mpsc::Receiver<JabberWrite>,
control_receiver: mpsc::Receiver<JabberWriterControl>,
stream: BoundJabberWriter<Tls>,
on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
}
use crate::connection::write::WriteHandle;
use crate::connection::{SupervisorCommand, SupervisorHandle};
use crate::error::Error;
struct JabberWrite {
stanza: Stanza,
respond_to: oneshot::Sender<Result<(), Error>>,
}
enum JabberWriterControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
}
impl JabberWriter {
fn new(
stanza_receiver: mpsc::Receiver<JabberWrite>,
control_receiver: mpsc::Receiver<JabberWriterControl>,
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
) -> Self {
Self {
stanza_receiver,
control_receiver,
stream,
on_crash: supervisor,
}
}
async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
Ok(self.stream.write(stanza).await?)
}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
JabberWriterControl::Disconnect => {
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
self.stanza_receiver.close();
// TODO: put this in some kind of function to avoid code duplication
for msg in self.stanza_receiver.recv().await {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
peanuts::Error::ReadError(error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
msg.respond_to.send(Err(e.into()));
}
},
_ => {
msg.respond_to.send(Ok(()));
}
}
}
self.stream.try_close().await;
break;
},
JabberWriterControl::Abort(sender) => {
sender.send(self.stanza_receiver);
break;
},
}
},
Some(msg) = self.stanza_receiver.recv() => {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
peanuts::Error::ReadError(error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
msg.respond_to.send(Err(e.into()));
}
},
_ => {
msg.respond_to.send(Ok(()));
}
}
},
// TODO: check if this is ok to do
else => break,
}
}
}
}
#[derive(Clone)]
pub struct JabberWriteHandle {
sender: mpsc::Sender<JabberWrite>,
}
pub struct JabberWriterControlHandle {
sender: mpsc::Sender<JabberWriterControl>,
handle: JoinHandle<()>,
}
impl JabberWriterControlHandle {
pub fn new(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
) -> (JabberWriteHandle, JabberWriterControlHandle) {
let (control_sender, control_receiver) = mpsc::channel(20);
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
let handle = tokio::spawn(async move { actor.run().await });
(
JabberWriteHandle {
sender: stanza_sender,
},
Self {
sender: control_sender,
handle,
},
)
}
pub fn reconnect(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
stanza_receiver: mpsc::Receiver<JabberWrite>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}
pub struct JabberReader {
// TODO: place iq hashmap here
control_receiver: mpsc::Receiver<JabberReaderControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
write_sender: mpsc::Sender<JabberWrite>,
tasks: JoinSet<()>,
}
impl JabberReader {
fn new(
control_receiver: mpsc::Receiver<JabberReaderControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
write_sender: mpsc::Sender<JabberWrite>,
) -> Self {
Self {
control_receiver,
stream,
on_crash,
db,
sender,
supervisor_control,
write_sender,
tasks: JoinSet::new(),
}
}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
JabberReaderControl::Disconnect => todo!(),
JabberReaderControl::Abort(sender) => todo!(),
};
},
stanza = self.stream.read::<Stanza>() => {
match stanza {
Ok(_) => todo!(),
Err(_) => todo!(),
}
self.tasks.spawn();
},
else => break
}
}
}
}
trait Task {
async fn handle();
}
impl Task for Stanza {
async fn handle() {
todo!()
}
}
enum JabberReaderControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
}
struct JabberReaderControlHandle {
sender: mpsc::Sender<JabberReaderControl>,
handle: JoinHandle<()>,
}
impl JabberReaderControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
jabber_write: mpsc::Sender<JabberWrite>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = JabberReader::new(
control_receiver,
stream,
on_crash,
db,
sender,
supervisor_control,
jabber_write,
);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}
mod connection;
mod error;
pub struct Luz {
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: String,
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
connection_supervisor_shutdown: oneshot::Receiver<()>,
// TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
tasks: JoinSet<()>,
}
@ -304,7 +36,8 @@ impl Luz {
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
password: String,
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
connection_supervisor_shutdown: oneshot::Receiver<()>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
) -> Self {
@ -316,71 +49,87 @@ impl Luz {
receiver,
sender,
tasks: JoinSet::new(),
connection_supervisor_shutdown,
}
}
async fn run(mut self) {
while let Some(msg) = self.receiver.recv().await {
// TODO: consider separating disconnect/connect and commands apart from commandmessage
match msg {
CommandMessage::Connect => {
match self.connected.lock().await.as_ref() {
Some(_) => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected))
.await;
}
None => {
let mut jid = self.jid.lock().await;
let mut domain = jid.domainpart.clone();
let streams_result =
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
.await;
match streams_result {
Ok(s) => {
let (writer, supervisor) =
JabberSupervisorHandle::new(s, self.sender.clone());
*self.connected.lock().await = Some((writer, supervisor));
}
Err(e) => {
self.sender.send(UpdateMessage::Error(e.into()));
}
}
}
};
loop {
tokio::select! {
_ = &mut self.connection_supervisor_shutdown => {
*self.connected.lock().await = None
}
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
None => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyDisonnected))
.await;
}
mut c => {
if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle
.sender
.send(JabberSupervisorCommand::Disconnect)
.await;
} else {
unreachable!()
};
Some(msg) = self.receiver.recv() => {
// TODO: consider separating disconnect/connect and commands apart from commandmessage
// TODO: dispatch commands separate tasks
match msg {
CommandMessage::Connect => {
match self.connected.lock().await.as_ref() {
Some(_) => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected))
.await;
}
None => {
let mut jid = self.jid.lock().await;
let mut domain = jid.domainpart.clone();
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
let streams_result =
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
.await;
match streams_result {
Ok(s) => {
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
let (writer, supervisor) = SupervisorHandle::new(
s,
self.sender.clone(),
self.db.clone(),
shutdown_send,
);
self.connection_supervisor_shutdown = shutdown_recv;
*self.connected.lock().await = Some((writer, supervisor));
}
Err(e) => {
self.sender.send(UpdateMessage::Error(e.into()));
}
}
}
};
}
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
None => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyDisconnected))
.await;
}
mut c => {
if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
} else {
unreachable!()
};
}
},
_ => {
match self.connected.lock().await.as_ref() {
Some((w, s)) => self.tasks.spawn(msg.handle_online(
w.clone(),
s.sender(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
// TODO: iq hashmap
)),
None => self.tasks.spawn(msg.handle_offline(
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
)),
};
}
}
},
_ => {
match self.connected.lock().await.as_ref() {
Some((w, _)) => self.tasks.spawn(msg.handle_online(
w.clone(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
)),
None => self.tasks.spawn(msg.handle_offline(
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
)),
};
}
else => break,
}
}
}
@ -398,7 +147,8 @@ impl CommandMessage {
pub async fn handle_online(
mut self,
jabber_write_handle: JabberWriteHandle,
write_handle: WriteHandle,
supervisor_control: SupervisorSender,
// TODO: jid could lose resource by the end
jid: Arc<Mutex<JID>>,
db: SqlitePool,
@ -418,12 +168,15 @@ impl LuzHandle {
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
let (sup_send, sup_recv) = oneshot::channel();
let actor = Luz::new(
command_receiver,
Arc::new(Mutex::new(jid)),
password,
Arc::new(Mutex::new(None)),
sup_recv,
db,
update_sender,
);
@ -436,146 +189,6 @@ impl LuzHandle {
}
}
pub struct JabberSupervisor {
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: JabberWriterControlHandle,
reader_handle: JabberReaderControlHandle,
}
pub enum JabberSupervisorCommand {
Disconnect,
}
impl JabberSupervisor {
fn new(
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: JabberWriterControlHandle,
reader_handle: JabberReaderControlHandle,
) -> Self {
Self {
connection_commands,
writer_crash,
sender,
writer_handle,
reader_handle,
reader_crash,
}
}
async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.connection_commands.recv() => {
self.handle_command_message(msg).await;
},
error = self.writer_crash => {
},
error = self.reader_crash => {
},
}
}
}
}
pub struct JabberSupervisorHandle {
sender: mpsc::Sender<JabberSupervisorCommand>,
handle: JoinHandle<()>,
}
impl JabberSupervisorHandle {
pub fn new(
streams: BoundJabberStream<Tls>,
sender: mpsc::Sender<UpdateMessage>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
) -> (JabberWriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
let (reader, writer) = streams.split();
let (jabber_write_handle, jabber_writer_control_handle) =
JabberWriterControlHandle::new(writer, writer_error_sender);
let jabber_reader_control_handle = JabberReaderControlHandle::new(
reader,
reader_crash_sender,
db,
update_sender,
command_sender.clone(),
jabber_write_handle.sender.clone(),
);
let actor = JabberSupervisor::new(
command_receiver,
writer_error_receiver,
reader_crash_receiver,
sender,
jabber_writer_control_handle,
jabber_reader_control_handle,
);
let handle = tokio::spawn(async move { actor.run().await });
(
jabber_write_handle,
Self {
sender: command_sender,
handle,
},
)
}
}
pub enum Error {
AlreadyConnected,
PollSend(PollSendError<CommandMessage>),
Jabber(jabber::Error),
XML(peanuts::Error),
SQL(sqlx::Error),
JID(jid::ParseError),
AlreadyDisonnected,
}
impl From<peanuts::Error> for Error {
fn from(e: peanuts::Error) -> Self {
Self::XML(e)
}
}
impl From<jid::ParseError> for Error {
fn from(e: jid::ParseError) -> Self {
Self::JID(e)
}
}
impl From<sqlx::Error> for Error {
fn from(e: sqlx::Error) -> Self {
Self::SQL(e)
}
}
impl From<jabber::Error> for Error {
fn from(e: jabber::Error) -> Self {
Self::Jabber(e)
}
}
impl From<PollSendError<CommandMessage>> for Error {
fn from(e: PollSendError<CommandMessage>) -> Self {
Self::PollSend(e)
}
}
pub enum CommandMessage {
Connect,
Disconnect,