add iq hashmap for iq requests
This commit is contained in:
parent
1ed6317272
commit
3634828531
|
@ -1,6 +1,7 @@
|
||||||
// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
|
// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
|
@ -10,6 +11,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
|
||||||
use jid::JID;
|
use jid::JID;
|
||||||
use read::{ReadControl, ReadControlHandle};
|
use read::{ReadControl, ReadControlHandle};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
|
use stanza::client::Stanza;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot, Mutex},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
|
@ -30,6 +32,7 @@ pub struct Supervisor {
|
||||||
tokio::task::JoinSet<()>,
|
tokio::task::JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
writer_handle: WriteControlHandle,
|
writer_handle: WriteControlHandle,
|
||||||
|
@ -56,6 +59,7 @@ impl Supervisor {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
writer_handle: WriteControlHandle,
|
writer_handle: WriteControlHandle,
|
||||||
|
@ -108,7 +112,7 @@ impl Supervisor {
|
||||||
// consider awaiting/aborting the read and write threads
|
// consider awaiting/aborting the read and write threads
|
||||||
let (send, recv) = oneshot::channel();
|
let (send, recv) = oneshot::channel();
|
||||||
let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
|
let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
|
||||||
let (db, update_sender, tasks, supervisor_command, write_sender) = tokio::select! {
|
let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = tokio::select! {
|
||||||
Ok(s) = recv => s,
|
Ok(s) = recv => s,
|
||||||
Ok(s) = &mut self.reader_crash => s,
|
Ok(s) = &mut self.reader_crash => s,
|
||||||
// in case, just break as irrecoverable
|
// in case, just break as irrecoverable
|
||||||
|
@ -134,7 +138,8 @@ impl Supervisor {
|
||||||
update_sender,
|
update_sender,
|
||||||
supervisor_command,
|
supervisor_command,
|
||||||
write_sender,
|
write_sender,
|
||||||
tasks
|
tasks,
|
||||||
|
pending_iqs,
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -149,7 +154,7 @@ impl Supervisor {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Ok((db, update_sender, tasks, supervisor_control, write_handle)) = &mut self.reader_crash => {
|
Ok((db, update_sender, tasks, supervisor_control, write_handle, pending_iqs)) = &mut self.reader_crash => {
|
||||||
let (send, recv) = oneshot::channel();
|
let (send, recv) = oneshot::channel();
|
||||||
let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
|
let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
|
||||||
let (retry_msg, mut write_receiver) = tokio::select! {
|
let (retry_msg, mut write_receiver) = tokio::select! {
|
||||||
|
@ -182,7 +187,8 @@ impl Supervisor {
|
||||||
update_sender,
|
update_sender,
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
write_handle,
|
write_handle,
|
||||||
tasks
|
tasks,
|
||||||
|
pending_iqs,
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -252,6 +258,7 @@ impl SupervisorHandle {
|
||||||
on_shutdown: oneshot::Sender<()>,
|
on_shutdown: oneshot::Sender<()>,
|
||||||
jid: Arc<Mutex<JID>>,
|
jid: Arc<Mutex<JID>>,
|
||||||
password: Arc<String>,
|
password: Arc<String>,
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
) -> (WriteHandle, Self) {
|
) -> (WriteHandle, Self) {
|
||||||
let (command_sender, command_receiver) = mpsc::channel(20);
|
let (command_sender, command_receiver) = mpsc::channel(20);
|
||||||
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
|
||||||
|
@ -267,6 +274,7 @@ impl SupervisorHandle {
|
||||||
update_sender.clone(),
|
update_sender.clone(),
|
||||||
command_sender.clone(),
|
command_sender.clone(),
|
||||||
write_handle.clone(),
|
write_handle.clone(),
|
||||||
|
pending_iqs,
|
||||||
);
|
);
|
||||||
|
|
||||||
let actor = Supervisor::new(
|
let actor = Supervisor::new(
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -7,7 +9,7 @@ use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use stanza::client::Stanza;
|
use stanza::client::Stanza;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::{JoinHandle, JoinSet},
|
task::{JoinHandle, JoinSet},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -28,6 +30,7 @@ pub struct Read {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
@ -36,6 +39,8 @@ pub struct Read {
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
disconnecting: bool,
|
disconnecting: bool,
|
||||||
disconnect_timedout: oneshot::Receiver<()>,
|
disconnect_timedout: oneshot::Receiver<()>,
|
||||||
|
// TODO: use proper stanza ids
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Read {
|
impl Read {
|
||||||
|
@ -48,6 +53,7 @@ impl Read {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
update_sender: mpsc::Sender<UpdateMessage>,
|
update_sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
@ -55,6 +61,7 @@ impl Read {
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
write_handle: WriteHandle,
|
write_handle: WriteHandle,
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (send, recv) = oneshot::channel();
|
let (send, recv) = oneshot::channel();
|
||||||
Self {
|
Self {
|
||||||
|
@ -68,6 +75,7 @@ impl Read {
|
||||||
tasks,
|
tasks,
|
||||||
disconnecting: false,
|
disconnecting: false,
|
||||||
disconnect_timedout: recv,
|
disconnect_timedout: recv,
|
||||||
|
pending_iqs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +99,7 @@ impl Read {
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
ReadControl::Abort(sender) => {
|
ReadControl::Abort(sender) => {
|
||||||
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
|
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -126,7 +134,7 @@ impl Read {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
|
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
|
||||||
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
|
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
|
@ -134,6 +142,11 @@ impl Read {
|
||||||
},
|
},
|
||||||
else => break
|
else => break
|
||||||
}
|
}
|
||||||
|
// when it aborts, must clear iq map no matter what
|
||||||
|
let mut iqs = self.pending_iqs.lock().await;
|
||||||
|
for (_id, sender) in iqs.drain() {
|
||||||
|
let _ = sender.send(Err(Error::LostConnection));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,6 +175,7 @@ pub enum ReadControl {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
@ -194,11 +208,13 @@ impl ReadControlHandle {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
jabber_write: WriteHandle,
|
jabber_write: WriteHandle,
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
@ -211,6 +227,7 @@ impl ReadControlHandle {
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
jabber_write,
|
jabber_write,
|
||||||
JoinSet::new(),
|
JoinSet::new(),
|
||||||
|
pending_iqs,
|
||||||
);
|
);
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
@ -228,12 +245,14 @@ impl ReadControlHandle {
|
||||||
JoinSet<()>,
|
JoinSet<()>,
|
||||||
mpsc::Sender<SupervisorCommand>,
|
mpsc::Sender<SupervisorCommand>,
|
||||||
WriteHandle,
|
WriteHandle,
|
||||||
|
Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
)>,
|
)>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
supervisor_control: mpsc::Sender<SupervisorCommand>,
|
||||||
jabber_write: WriteHandle,
|
jabber_write: WriteHandle,
|
||||||
tasks: JoinSet<()>,
|
tasks: JoinSet<()>,
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (control_sender, control_receiver) = mpsc::channel(20);
|
let (control_sender, control_receiver) = mpsc::channel(20);
|
||||||
|
|
||||||
|
@ -246,6 +265,7 @@ impl ReadControlHandle {
|
||||||
supervisor_control,
|
supervisor_control,
|
||||||
jabber_write,
|
jabber_write,
|
||||||
tasks,
|
tasks,
|
||||||
|
pending_iqs,
|
||||||
);
|
);
|
||||||
let handle = tokio::spawn(async move { actor.run().await });
|
let handle = tokio::spawn(async move { actor.run().await });
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
use std::sync::Arc;
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
use connection::SupervisorSender;
|
use connection::SupervisorSender;
|
||||||
use jabber::JID;
|
use jabber::JID;
|
||||||
use sqlx::SqlitePool;
|
use sqlx::SqlitePool;
|
||||||
use stanza::roster;
|
use stanza::{client::Stanza, roster};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, oneshot, Mutex},
|
sync::{mpsc, oneshot, Mutex},
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
|
@ -22,6 +22,7 @@ pub struct Luz {
|
||||||
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
|
||||||
password: Arc<String>,
|
password: Arc<String>,
|
||||||
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
|
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
|
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
|
||||||
|
@ -50,6 +51,7 @@ impl Luz {
|
||||||
sender,
|
sender,
|
||||||
tasks: JoinSet::new(),
|
tasks: JoinSet::new(),
|
||||||
connection_supervisor_shutdown,
|
connection_supervisor_shutdown,
|
||||||
|
pending_iqs: Arc::new(Mutex::new(HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,6 +89,7 @@ impl Luz {
|
||||||
shutdown_send,
|
shutdown_send,
|
||||||
self.jid.clone(),
|
self.jid.clone(),
|
||||||
self.password.clone(),
|
self.password.clone(),
|
||||||
|
self.pending_iqs.clone(),
|
||||||
);
|
);
|
||||||
self.connection_supervisor_shutdown = shutdown_recv;
|
self.connection_supervisor_shutdown = shutdown_recv;
|
||||||
*self.connected.lock().await = Some((writer, supervisor));
|
*self.connected.lock().await = Some((writer, supervisor));
|
||||||
|
@ -121,6 +124,7 @@ impl Luz {
|
||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
self.sender.clone(),
|
self.sender.clone(),
|
||||||
// TODO: iq hashmap
|
// TODO: iq hashmap
|
||||||
|
self.pending_iqs.clone()
|
||||||
)),
|
)),
|
||||||
None => self.tasks.spawn(msg.handle_offline(
|
None => self.tasks.spawn(msg.handle_offline(
|
||||||
self.jid.clone(),
|
self.jid.clone(),
|
||||||
|
@ -155,6 +159,7 @@ impl CommandMessage {
|
||||||
jid: Arc<Mutex<JID>>,
|
jid: Arc<Mutex<JID>>,
|
||||||
db: SqlitePool,
|
db: SqlitePool,
|
||||||
sender: mpsc::Sender<UpdateMessage>,
|
sender: mpsc::Sender<UpdateMessage>,
|
||||||
|
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
|
||||||
) {
|
) {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue