WIP: luz actor-based client

This commit is contained in:
cel 🌸 2025-02-08 02:15:44 +00:00
parent b023c6b5f2
commit a1d96233e8
2 changed files with 486 additions and 243 deletions

View File

@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
futures = "0.3.31"
jabber = { version = "0.1.0", path = "../jabber" }
peanuts = { version = "0.1.0", path = "../../peanuts" }
jid = { version = "0.1.0", path = "../jid" }
sqlx = { version = "0.8.3", features = [ "sqlite", "runtime-tokio" ] }
stanza = { version = "0.1.0", path = "../stanza" }

View File

@ -2,7 +2,7 @@ use std::{
collections::{HashMap, HashSet, VecDeque},
fmt::Pointer,
pin::pin,
sync::Arc,
sync::{atomic::AtomicBool, Arc},
task::{ready, Poll},
};
@ -28,239 +28,528 @@ use tokio::{
select,
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
oneshot, Mutex,
},
task::{JoinHandle, JoinSet},
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::{PollSendError, PollSender};
pub struct Luz {
receiver: ReceiverStream<UpdateMessage>,
sender: PollSender<CommandMessage>,
// TODO: oneshot
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct JabberWriter {
stanza_receiver: mpsc::Receiver<JabberWrite>,
control_receiver: mpsc::Receiver<JabberWriterControl>,
stream: BoundJabberWriter<Tls>,
on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
}
struct JabberWrite {
stanza: Stanza,
respond_to: oneshot::Sender<Result<(), Error>>,
}
enum JabberWriterControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
}
impl JabberWriter {
fn new(
stanza_receiver: mpsc::Receiver<JabberWrite>,
control_receiver: mpsc::Receiver<JabberWriterControl>,
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
) -> Self {
Self {
stanza_receiver,
control_receiver,
stream,
on_crash: supervisor,
}
}
async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
Ok(self.stream.write(stanza).await?)
}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
JabberWriterControl::Disconnect => {
// TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
self.stanza_receiver.close();
// TODO: put this in some kind of function to avoid code duplication
for msg in self.stanza_receiver.recv().await {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
peanuts::Error::ReadError(error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
msg.respond_to.send(Err(e.into()));
}
},
_ => {
msg.respond_to.send(Ok(()));
}
}
}
self.stream.try_close().await;
break;
},
JabberWriterControl::Abort(sender) => {
sender.send(self.stanza_receiver);
break;
},
}
},
Some(msg) = self.stanza_receiver.recv() => {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
peanuts::Error::ReadError(error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
msg.respond_to.send(Err(e.into()));
}
},
_ => {
msg.respond_to.send(Ok(()));
}
}
},
// TODO: check if this is ok to do
else => break,
}
}
}
}
#[derive(Clone)]
pub struct JabberWriteHandle {
sender: mpsc::Sender<JabberWrite>,
}
pub struct JabberWriterControlHandle {
sender: mpsc::Sender<JabberWriterControl>,
handle: JoinHandle<()>,
}
impl JabberWriterControlHandle {
pub fn new(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
) -> (JabberWriteHandle, JabberWriterControlHandle) {
let (control_sender, control_receiver) = mpsc::channel(20);
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
let handle = tokio::spawn(async move { actor.run().await });
(
JabberWriteHandle {
sender: stanza_sender,
},
Self {
sender: control_sender,
handle,
},
)
}
pub fn reconnect(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
stanza_receiver: mpsc::Receiver<JabberWrite>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}
pub struct JabberReader {
// TODO: place iq hashmap here
control_receiver: mpsc::Receiver<JabberReaderControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
tasks: JoinSet<()>,
jid: JID,
}
impl Luz {
pub async fn new() {}
pub async fn supervisor(
mut read_sender: Sender<UpdateMessage>,
mut write_receiver: Receiver<CommandMessage>,
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
db: SqlitePool,
jid: &JID,
) {
let connection: Arc<
Mutex<Option<(JoinHandle<()>, (JoinHandle<()>, Sender<CommandMessage>))>>,
> = Arc::new(Mutex::new(None));
let mut jid = jid.clone();
let mut server = jid.domainpart.clone();
let tasks: Arc<Mutex<JoinSet<()>>> = Arc::new(Mutex::new(JoinSet::new()));
for command in write_receiver.recv().await {
match command {
CommandMessage::Connect(_) => todo!(),
CommandMessage::Disconnect => todo!(),
CommandMessage::Kill => todo!(),
CommandMessage::GetRoster => todo!(),
CommandMessage::SendMessage(jid, _) => todo!(),
}
tasks.lock().await.spawn(handle_command(connection.clone()));
}
}
}
pub async fn handle_command(
connection: Arc<Mutex<Option<(JoinHandle<()>, (JoinHandle<()>, Sender<CommandMessage>))>>>,
) {
match command {
CommandMessage::Connect(password) => {
let streams = jabber::connect_and_login(&mut jid, password, &mut server)
.await
.unwrap();
let (read, write) = streams.split();
let (sender, receiver) = mpsc::channel(20);
let reads = tokio::spawn(jabber_reads(read, read_sender.clone(), db.clone()));
let writes = tokio::spawn(jabber_writes(write, receiver, db.clone()));
// properly handle previous connection
*connection.lock().await = Some((reads, (writes, sender)));
}
CommandMessage::Disconnect => {
if let Some((reads, (writes, write_channel))) = connection.lock().await.take() {
write_channel.send(CommandMessage::Disconnect).await;
writes.await;
reads.await;
jid = jid.as_bare();
}
}
CommandMessage::GetRoster => match *connection.lock().await {
Some(_) => todo!(),
None => todo!(),
},
CommandMessage::SendMessage(jid, _) => todo!(),
}
}
pub async fn jabber_writes(
mut jabber_writer: BoundJabberWriter<Tls>,
mut write_receiver: Receiver<CommandMessage>,
/*error_sender: Sender<UpdateMessage>,*/ db: SqlitePool,
) {
}
pub async fn jabber_reads<S>(
mut read: BoundJabberReader<S>,
read_sender: Sender<UpdateMessage>,
db: SqlitePool,
) where
S: AsyncRead + Unpin + Sync,
{
while let Ok(stanza) = read.read::<Stanza>().await {
let upd = match stanza {
Stanza::Message(message) => todo!(),
Stanza::Presence(presence) => todo!(),
Stanza::Iq(iq) => match iq.r#type {
IqType::Error => todo!(),
IqType::Get => todo!(),
IqType::Result => match iq.query {
Some(q) => match q {
Query::Bind(bind) => todo!(),
Query::Ping(ping) => todo!(),
Query::Roster(query) => UpdateMessage::Roster(query.items),
Query::Unsupported => todo!(),
},
None => todo!(),
},
IqType::Set => todo!(),
},
Stanza::Error(error) => todo!(),
Stanza::OtherContent(content) => todo!(),
};
read_sender.send(upd).await.expect("read_sender");
}
todo!()
}
pub enum JabberWriteCommand {}
pub async fn funnel(
mut jabber_reads: Receiver<UpdateMessage>,
mut db_reads: Receiver<UpdateMessage>,
out: Sender<UpdateMessage>,
) {
select! {
j = jabber_reads.recv() => {
out.send(j.unwrap()).await;
},
d = db_reads.recv() => {
out.send(d.unwrap()).await;
}
}
}
pub async fn write_thread(mut write: BoundJabberWriter<S>, write_recv: Receiver<CommandMessage>) {
while let Some(cmd) = write_recv.recv().await {
match cmd {
CommandMessage::GetRoster => todo!(),
CommandMessage::SendMessage(jid, _) => todo!(),
CommandMessage::Connect => continue,
CommandMessage::Disconnect => write_thread_disconnected(write_recv),
}
}
todo!()
}
impl Client {
pub fn send_command(self: Arc<Self>, cmd: CommandMessage) {
match cmd {
CommandMessage::Connect => todo!(),
CommandMessage::Disconnect => todo!(),
CommandMessage::GetRoster => self.get_roster(),
CommandMessage::SendMessage(jid, _) => todo!(),
}
}
async fn begin_query_roster(self: Arc<Self>) -> Result<()> {
if self.connected {
self.db.start_get_roster().await;
self.write_chan.send(CommandMessage::GetRoster).await;
} else {
let roster = self.db.get_roster();
self.db_chan.send(roster).await;
}
}
}
pub struct ReadStreamThread {
// held by both read thread and write thread
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
write_sender: mpsc::Sender<JabberWrite>,
tasks: JoinSet<()>,
}
pub struct WriteStreamThread {
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
tasks: JoinSet<()>,
}
impl Luz {
// pub async fn new(jid: &str, dburl: &str) -> Result<Self, Error> {
// let client = Client::new(jid, dburl).await?;
// let luz = Arc::new(Mutex::new(client));
// client
// .tasks
// .spawn(Client::read_thread(luz.clone(), read_sender));
// client.tasks.spawn(Client::write_thread(write_receiver));
// let luz = luz.into_inner();
// Ok(Luz(luz))
// }
}
// instead of having a channel to send stanzas down, have a channel to send channels that you can send stanzas down, so if one channel fails (through disconnection, etc), you can move on to the next. could also have an enum to let them sit until reconnection.
impl Client {
async fn new(jid: &str, dburl: &str) -> Result<Self, Error> {
let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
let jid: JID = jid.try_into()?;
let server = jid.domainpart.clone();
let db = SqlitePool::connect(dburl).await?;
// let bound_streams = jabber::connect_and_login(&mut jid, password, &mut server).await?;
// let (read, write) = bound_streams.split();
// let streams = SplitStreams { read: Mutex::new(read), write: Mutex::new(write) };
let mut tasks = JoinSet::new();
let client = Self {
receiver: ReceiverStream::new(read_receiver),
sender: PollSender::new(write_sender),
pending_iqs: HashMap::new(),
impl JabberReader {
fn new(
control_receiver: mpsc::Receiver<JabberReaderControl>,
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
write_sender: mpsc::Sender<JabberWrite>,
) -> Self {
Self {
control_receiver,
stream,
on_crash,
db,
jid,
server,
// TODO: check for drop abort handles
tasks,
sender,
supervisor_control,
write_sender,
tasks: JoinSet::new(),
}
}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
JabberReaderControl::Disconnect => todo!(),
JabberReaderControl::Abort(sender) => todo!(),
};
// store handles in client
// send connection streams down channel created here
Ok(client)
},
stanza = self.stream.read::<Stanza>() => {
match stanza {
Ok(_) => todo!(),
Err(_) => todo!(),
}
self.tasks.spawn();
},
else => break
}
}
}
}
pub async fn read_thread(client: Arc<Mutex<Self>>, sender: mpsc::Sender<UpdateMessage>) {}
pub async fn write_thread(receiver: mpsc::Receiver<CommandMessage>) {
for message in receiver.recv().await {}
trait Task {
async fn handle();
}
pub async fn connect(&mut self, password: &str) -> Result<(), Error> {
impl Task for Stanza {
async fn handle() {
todo!()
}
}
enum JabberReaderControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
}
struct JabberReaderControlHandle {
sender: mpsc::Sender<JabberReaderControl>,
handle: JoinHandle<()>,
}
impl JabberReaderControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
jabber_write: mpsc::Sender<JabberWrite>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = JabberReader::new(
control_receiver,
stream,
on_crash,
db,
sender,
supervisor_control,
jabber_write,
);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}
pub struct Luz {
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: String,
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
tasks: JoinSet<()>,
}
impl Luz {
fn new(
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
password: String,
connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
) -> Self {
Self {
jid,
password,
connected,
db,
receiver,
sender,
tasks: JoinSet::new(),
}
}
async fn run(mut self) {
while let Some(msg) = self.receiver.recv().await {
// TODO: consider separating disconnect/connect and commands apart from commandmessage
match msg {
CommandMessage::Connect => {
match self.connected.lock().await.as_ref() {
Some(_) => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyConnected))
.await;
}
None => {
let mut jid = self.jid.lock().await;
let mut domain = jid.domainpart.clone();
let streams_result =
jabber::connect_and_login(&mut jid, &self.password, &mut domain)
.await;
match streams_result {
Ok(s) => {
let (writer, supervisor) =
JabberSupervisorHandle::new(s, self.sender.clone());
*self.connected.lock().await = Some((writer, supervisor));
}
Err(e) => {
self.sender.send(UpdateMessage::Error(e.into()));
}
}
}
};
}
CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
None => {
self.sender
.send(UpdateMessage::Error(Error::AlreadyDisonnected))
.await;
}
mut c => {
if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle
.sender
.send(JabberSupervisorCommand::Disconnect)
.await;
} else {
unreachable!()
};
}
},
_ => {
match self.connected.lock().await.as_ref() {
Some((w, _)) => self.tasks.spawn(msg.handle_online(
w.clone(),
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
)),
None => self.tasks.spawn(msg.handle_offline(
self.jid.clone(),
self.db.clone(),
self.sender.clone(),
)),
};
}
}
}
}
}
impl CommandMessage {
pub async fn handle_offline(
mut self,
jid: Arc<Mutex<JID>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
) {
todo!()
}
pub async fn handle_online(
mut self,
jabber_write_handle: JabberWriteHandle,
// TODO: jid could lose resource by the end
jid: Arc<Mutex<JID>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
) {
todo!()
}
}
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<UpdateMessage>,
}
impl LuzHandle {
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
let actor = Luz::new(
command_receiver,
Arc::new(Mutex::new(jid)),
password,
Arc::new(Mutex::new(None)),
db,
update_sender,
);
tokio::spawn(async move { actor.run().await });
Self {
sender: command_sender,
receiver: update_receiver,
}
}
}
pub struct JabberSupervisor {
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: JabberWriterControlHandle,
reader_handle: JabberReaderControlHandle,
}
pub enum JabberSupervisorCommand {
Disconnect,
}
impl JabberSupervisor {
fn new(
connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: JabberWriterControlHandle,
reader_handle: JabberReaderControlHandle,
) -> Self {
Self {
connection_commands,
writer_crash,
sender,
writer_handle,
reader_handle,
reader_crash,
}
}
async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.connection_commands.recv() => {
self.handle_command_message(msg).await;
},
error = self.writer_crash => {
},
error = self.reader_crash => {
},
}
}
}
}
pub struct JabberSupervisorHandle {
sender: mpsc::Sender<JabberSupervisorCommand>,
handle: JoinHandle<()>,
}
impl JabberSupervisorHandle {
pub fn new(
streams: BoundJabberStream<Tls>,
sender: mpsc::Sender<UpdateMessage>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
) -> (JabberWriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
let (reader, writer) = streams.split();
let (jabber_write_handle, jabber_writer_control_handle) =
JabberWriterControlHandle::new(writer, writer_error_sender);
let jabber_reader_control_handle = JabberReaderControlHandle::new(
reader,
reader_crash_sender,
db,
update_sender,
command_sender.clone(),
jabber_write_handle.sender.clone(),
);
let actor = JabberSupervisor::new(
command_receiver,
writer_error_receiver,
reader_crash_receiver,
sender,
jabber_writer_control_handle,
jabber_reader_control_handle,
);
let handle = tokio::spawn(async move { actor.run().await });
(
jabber_write_handle,
Self {
sender: command_sender,
handle,
},
)
}
}
pub enum Error {
AlreadyConnected,
PollSend(PollSendError<CommandMessage>),
Jabber(jabber::Error),
XML(peanuts::Error),
SQL(sqlx::Error),
JID(jid::ParseError),
AlreadyDisonnected,
}
impl From<peanuts::Error> for Error {
fn from(e: peanuts::Error) -> Self {
Self::XML(e)
}
}
impl From<jid::ParseError> for Error {
@ -281,17 +570,6 @@ impl From<jabber::Error> for Error {
}
}
impl Stream for Client {
type Item = UpdateMessage;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
pin!(self).receiver.poll_next_unpin(cx)
}
}
impl From<PollSendError<CommandMessage>> for Error {
fn from(e: PollSendError<CommandMessage>) -> Self {
Self::PollSend(e)
@ -299,7 +577,7 @@ impl From<PollSendError<CommandMessage>> for Error {
}
pub enum CommandMessage {
Connect(String),
Connect,
Disconnect,
/// gets the roster. if offline, retreives cached version from database. should be stored in application memory.
GetRoster,
@ -307,42 +585,6 @@ pub enum CommandMessage {
}
pub enum UpdateMessage {
Error(Error),
Roster(Vec<roster::Item>),
}
impl Client {
pub async fn process_stanza(
&mut self,
stanza: Result<Stanza, jabber::Error>,
sender: mpsc::Sender<UpdateMessage>,
) {
match stanza {
Ok(stanza) => todo!(),
Err(e) => self.process_error(e),
}
}
pub async fn iq(
&mut self,
to: Option<JID>,
r#type: IqType,
query: Option<Query>,
) -> Result<IqResponse, Error> {
self.client
.send(Stanza::Iq(Iq {
from: Some(self.client.jid()),
// TODO: generate id
id: "test".to_string(),
to,
r#type,
// TODO: lang
lang: None,
query,
errors: Vec::new(),
}))
.await?;
Ok(todo!())
}
pub async fn iq_process(&mut self, iq: Iq) {}
}