actors complete ?

This commit is contained in:
cel 🌸 2025-02-11 22:20:16 +00:00
parent 3634828531
commit ad0054ea56
7 changed files with 129 additions and 26 deletions

1
luz/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
luz.db

View File

@ -13,3 +13,5 @@ stanza = { version = "0.1.0", path = "../stanza" }
tokio = "1.42.0"
tokio-stream = "0.1.17"
tokio-util = "0.7.13"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"

View File

@ -80,10 +80,15 @@ impl Read {
}
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
// println!("{:?}", stanza);
loop {
tokio::select! {
// if still haven't received the end tag in time, just kill itself
_ = &mut self.disconnect_timedout => {
// TODO: is this okay??? what if notification thread dies?
Ok(()) = &mut self.disconnect_timedout => {
println!("disconnect_timedout");
break;
}
Some(msg) = self.control_receiver.recv() => {
@ -99,17 +104,19 @@ impl Read {
})
},
ReadControl::Abort(sender) => {
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
break;
},
};
},
stanza = self.stream.read::<Stanza>() => {
match stanza {
s = self.stream.read::<Stanza>() => {
println!("read stanza");
match s {
Ok(s) => {
self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
},
Err(e) => {
println!("error: {:?}", e);
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
// match e {
// peanuts::Error::ReadError(error) => todo!(),
@ -134,7 +141,7 @@ impl Read {
break;
} else {
// AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs));
let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
}
break;
},
@ -142,11 +149,12 @@ impl Read {
},
else => break
}
// when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(Error::LostConnection));
}
}
println!("stopping read thread");
// when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(Error::LostConnection));
}
}
}
@ -163,7 +171,7 @@ async fn handle_stanza(
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
) {
todo!()
println!("{:?}", stanza)
}
pub enum ReadControl {

View File

@ -18,7 +18,7 @@ pub struct Write {
}
pub struct WriteMessage {
stanza: Stanza,
pub stanza: Stanza,
pub respond_to: oneshot::Sender<Result<(), Error>>,
}

View File

@ -1,3 +1,4 @@
#[derive(Debug)]
pub enum Error {
AlreadyConnected,
Jabber(jabber::Error),

View File

@ -1,9 +1,19 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::Arc,
};
use connection::SupervisorSender;
use connection::{write::WriteMessage, SupervisorSender};
use jabber::JID;
use sqlx::SqlitePool;
use stanza::{client::Stanza, roster};
use stanza::{
client::{
iq::{self, Iq, IqType},
Stanza,
},
roster::{self, Query},
};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
@ -58,6 +68,7 @@ impl Luz {
async fn run(mut self) {
loop {
tokio::select! {
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
_ = &mut self.connection_supervisor_shutdown => {
*self.connected.lock().await = None
}
@ -66,7 +77,8 @@ impl Luz {
// TODO: dispatch commands separate tasks
match msg {
CommandMessage::Connect => {
match self.connected.lock().await.as_ref() {
let mut connection_lock = self.connected.lock().await;
match connection_lock.as_ref() {
Some(_) => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected))
@ -92,7 +104,10 @@ impl Luz {
self.pending_iqs.clone(),
);
self.connection_supervisor_shutdown = shutdown_recv;
*self.connected.lock().await = Some((writer, supervisor));
*connection_lock = Some((writer, supervisor));
self.sender
.send(UpdateMessage::Connected)
.await;
}
Err(e) => {
self.sender.send(UpdateMessage::Error(e.into()));
@ -161,18 +176,70 @@ impl CommandMessage {
sender: mpsc::Sender<UpdateMessage>,
pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
) {
todo!()
match self {
CommandMessage::Connect => unreachable!(),
CommandMessage::Disconnect => unreachable!(),
CommandMessage::GetRoster => {
// TODO: jid resource should probably be stored within the connection
let owned_jid: JID;
{
owned_jid = jid.lock().await.clone();
}
let stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
id: "getting-roster".to_string(),
to: None,
r#type: IqType::Get,
lang: None,
query: Some(iq::Query::Roster(roster::Query {
ver: None,
items: Vec::new(),
})),
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
let _ = write_handle
.send(WriteMessage {
stanza,
respond_to: send,
})
.await;
match recv.await {
Ok(Ok(())) => println!("roster request sent"),
e => println!("error: {:?}", e),
};
}
CommandMessage::SendMessage(jid, _) => todo!(),
}
}
}
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
// #[derive(Clone)]
pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<UpdateMessage>,
}
impl Deref for LuzHandle {
type Target = mpsc::Sender<CommandMessage>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for LuzHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
impl LuzHandle {
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
pub fn new(
jid: JID,
password: String,
db: SqlitePool,
) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
@ -189,10 +256,12 @@ impl LuzHandle {
);
tokio::spawn(async move { actor.run().await });
Self {
sender: command_sender,
receiver: update_receiver,
}
(
Self {
sender: command_sender,
},
update_receiver,
)
}
}
@ -204,7 +273,9 @@ pub enum CommandMessage {
SendMessage(JID, String),
}
#[derive(Debug)]
pub enum UpdateMessage {
Error(Error),
Connected,
Roster(Vec<roster::Item>),
}

View File

@ -1,3 +1,23 @@
fn main() {
println!("Hello, world!");
use std::time::Duration;
use luz::{CommandMessage, LuzHandle};
use sqlx::SqlitePool;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let db = SqlitePool::connect("./luz.db").await.unwrap();
let (luz, mut recv) =
LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
tokio::spawn(async move {
while let Some(msg) = recv.recv().await {
println!("{:#?}", msg)
}
});
luz.send(CommandMessage::Connect).await.unwrap();
luz.send(CommandMessage::GetRoster).await.unwrap();
tokio::time::sleep(Duration::from_secs(15)).await;
}