WIP: luz initial client
This commit is contained in:
parent
866e134371
commit
b023c6b5f2
|
@ -415,7 +415,7 @@ mod tests {
|
|||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
use crate::{client::JabberClient, connection::Connection};
|
||||
use crate::{connection::Connection};
|
||||
use futures::sink;
|
||||
use test_log::test;
|
||||
use tokio::time::sleep;
|
||||
|
@ -468,8 +468,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn sink() {
|
||||
let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||
client.connect().await.unwrap();
|
||||
// let mut client = JabberClient::new("test@blos.sm", "slayed").unwrap();
|
||||
// client.connect().await.unwrap();
|
||||
// let stream = client.inner().unwrap();
|
||||
// let sink = sink::unfold(stream, |mut stream, stanza: Stanza| async move {
|
||||
// stream.writer.write(&stanza).await?;
|
||||
|
|
|
@ -6,6 +6,8 @@ edition = "2021"
|
|||
[dependencies]
|
||||
futures = "0.3.31"
|
||||
jabber = { version = "0.1.0", path = "../jabber" }
|
||||
jid = { version = "0.1.0", path = "../jid" }
|
||||
sqlx = { version = "0.8.3", features = [ "sqlite", "runtime-tokio" ] }
|
||||
stanza = { version = "0.1.0", path = "../stanza" }
|
||||
tokio = "1.42.0"
|
||||
tokio-stream = "0.1.17"
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
CREATE TABLE roster(
|
||||
id INTEGER PRIMARY KEY,
|
||||
jid TEXT NOT NULL,
|
||||
nickname TEXT,
|
||||
);
|
293
luz/src/lib.rs
293
luz/src/lib.rs
|
@ -1,15 +1,21 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
fmt::Pointer,
|
||||
pin::pin,
|
||||
sync::Arc,
|
||||
task::{ready, Poll},
|
||||
thread::JoinHandle,
|
||||
};
|
||||
|
||||
use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
Sink, SinkExt, Stream, StreamExt,
|
||||
};
|
||||
use jabber::{client::JabberClient, JID};
|
||||
use jabber::{
|
||||
connection::Tls,
|
||||
jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter},
|
||||
JID,
|
||||
};
|
||||
use sqlx::{query, Pool, Sqlite, SqlitePool};
|
||||
use stanza::{
|
||||
client::{
|
||||
iq::{Iq, IqType, Query},
|
||||
|
@ -17,58 +23,256 @@ use stanza::{
|
|||
},
|
||||
roster,
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::{
|
||||
io::AsyncRead,
|
||||
select,
|
||||
sync::{
|
||||
mpsc::{self, Receiver, Sender},
|
||||
Mutex,
|
||||
},
|
||||
task::{JoinHandle, JoinSet},
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::{PollSendError, PollSender};
|
||||
|
||||
pub struct Client {
|
||||
client: JabberClient,
|
||||
pending_iqs: HashMap<String, mpsc::Sender<Iq>>,
|
||||
// database connection (sqlite)
|
||||
pub struct Luz {
|
||||
receiver: ReceiverStream<UpdateMessage>,
|
||||
sender: PollSender<CommandMessage>,
|
||||
// TODO: oneshot
|
||||
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
|
||||
db: SqlitePool,
|
||||
tasks: JoinSet<()>,
|
||||
jid: JID,
|
||||
}
|
||||
|
||||
impl Luz {
|
||||
pub async fn new() {}
|
||||
|
||||
pub async fn supervisor(
|
||||
mut read_sender: Sender<UpdateMessage>,
|
||||
mut write_receiver: Receiver<CommandMessage>,
|
||||
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
|
||||
db: SqlitePool,
|
||||
jid: &JID,
|
||||
) {
|
||||
let connection: Arc<
|
||||
Mutex<Option<(JoinHandle<()>, (JoinHandle<()>, Sender<CommandMessage>))>>,
|
||||
> = Arc::new(Mutex::new(None));
|
||||
let mut jid = jid.clone();
|
||||
let mut server = jid.domainpart.clone();
|
||||
let tasks: Arc<Mutex<JoinSet<()>>> = Arc::new(Mutex::new(JoinSet::new()));
|
||||
for command in write_receiver.recv().await {
|
||||
match command {
|
||||
CommandMessage::Connect(_) => todo!(),
|
||||
CommandMessage::Disconnect => todo!(),
|
||||
CommandMessage::Kill => todo!(),
|
||||
CommandMessage::GetRoster => todo!(),
|
||||
CommandMessage::SendMessage(jid, _) => todo!(),
|
||||
}
|
||||
tasks.lock().await.spawn(handle_command(connection.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_command(
|
||||
connection: Arc<Mutex<Option<(JoinHandle<()>, (JoinHandle<()>, Sender<CommandMessage>))>>>,
|
||||
) {
|
||||
match command {
|
||||
CommandMessage::Connect(password) => {
|
||||
let streams = jabber::connect_and_login(&mut jid, password, &mut server)
|
||||
.await
|
||||
.unwrap();
|
||||
let (read, write) = streams.split();
|
||||
let (sender, receiver) = mpsc::channel(20);
|
||||
let reads = tokio::spawn(jabber_reads(read, read_sender.clone(), db.clone()));
|
||||
let writes = tokio::spawn(jabber_writes(write, receiver, db.clone()));
|
||||
// properly handle previous connection
|
||||
*connection.lock().await = Some((reads, (writes, sender)));
|
||||
}
|
||||
CommandMessage::Disconnect => {
|
||||
if let Some((reads, (writes, write_channel))) = connection.lock().await.take() {
|
||||
write_channel.send(CommandMessage::Disconnect).await;
|
||||
writes.await;
|
||||
reads.await;
|
||||
jid = jid.as_bare();
|
||||
}
|
||||
}
|
||||
CommandMessage::GetRoster => match *connection.lock().await {
|
||||
Some(_) => todo!(),
|
||||
None => todo!(),
|
||||
},
|
||||
CommandMessage::SendMessage(jid, _) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn jabber_writes(
|
||||
mut jabber_writer: BoundJabberWriter<Tls>,
|
||||
mut write_receiver: Receiver<CommandMessage>,
|
||||
/*error_sender: Sender<UpdateMessage>,*/ db: SqlitePool,
|
||||
) {
|
||||
}
|
||||
|
||||
pub async fn jabber_reads<S>(
|
||||
mut read: BoundJabberReader<S>,
|
||||
read_sender: Sender<UpdateMessage>,
|
||||
db: SqlitePool,
|
||||
) where
|
||||
S: AsyncRead + Unpin + Sync,
|
||||
{
|
||||
while let Ok(stanza) = read.read::<Stanza>().await {
|
||||
let upd = match stanza {
|
||||
Stanza::Message(message) => todo!(),
|
||||
Stanza::Presence(presence) => todo!(),
|
||||
Stanza::Iq(iq) => match iq.r#type {
|
||||
IqType::Error => todo!(),
|
||||
IqType::Get => todo!(),
|
||||
IqType::Result => match iq.query {
|
||||
Some(q) => match q {
|
||||
Query::Bind(bind) => todo!(),
|
||||
Query::Ping(ping) => todo!(),
|
||||
Query::Roster(query) => UpdateMessage::Roster(query.items),
|
||||
Query::Unsupported => todo!(),
|
||||
},
|
||||
None => todo!(),
|
||||
},
|
||||
IqType::Set => todo!(),
|
||||
},
|
||||
Stanza::Error(error) => todo!(),
|
||||
Stanza::OtherContent(content) => todo!(),
|
||||
};
|
||||
read_sender.send(upd).await.expect("read_sender");
|
||||
}
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub enum JabberWriteCommand {}
|
||||
|
||||
pub async fn funnel(
|
||||
mut jabber_reads: Receiver<UpdateMessage>,
|
||||
mut db_reads: Receiver<UpdateMessage>,
|
||||
out: Sender<UpdateMessage>,
|
||||
) {
|
||||
select! {
|
||||
j = jabber_reads.recv() => {
|
||||
out.send(j.unwrap()).await;
|
||||
},
|
||||
d = db_reads.recv() => {
|
||||
out.send(d.unwrap()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn write_thread(mut write: BoundJabberWriter<S>, write_recv: Receiver<CommandMessage>) {
|
||||
while let Some(cmd) = write_recv.recv().await {
|
||||
match cmd {
|
||||
CommandMessage::GetRoster => todo!(),
|
||||
CommandMessage::SendMessage(jid, _) => todo!(),
|
||||
CommandMessage::Connect => continue,
|
||||
CommandMessage::Disconnect => write_thread_disconnected(write_recv),
|
||||
}
|
||||
}
|
||||
todo!()
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new(jid: String, password: &str) -> Result<Self, Error> {
|
||||
pub fn send_command(self: Arc<Self>, cmd: CommandMessage) {
|
||||
match cmd {
|
||||
CommandMessage::Connect => todo!(),
|
||||
CommandMessage::Disconnect => todo!(),
|
||||
CommandMessage::GetRoster => self.get_roster(),
|
||||
CommandMessage::SendMessage(jid, _) => todo!(),
|
||||
}
|
||||
}
|
||||
async fn begin_query_roster(self: Arc<Self>) -> Result<()> {
|
||||
if self.connected {
|
||||
self.db.start_get_roster().await;
|
||||
self.write_chan.send(CommandMessage::GetRoster).await;
|
||||
} else {
|
||||
let roster = self.db.get_roster();
|
||||
self.db_chan.send(roster).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReadStreamThread {
|
||||
// held by both read thread and write thread
|
||||
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
|
||||
tasks: JoinSet<()>,
|
||||
}
|
||||
|
||||
pub struct WriteStreamThread {
|
||||
pending_iqs: Arc<Mutex<HashMap<String, mpsc::Sender<Iq>>>>,
|
||||
tasks: JoinSet<()>,
|
||||
}
|
||||
|
||||
impl Luz {
|
||||
// pub async fn new(jid: &str, dburl: &str) -> Result<Self, Error> {
|
||||
// let client = Client::new(jid, dburl).await?;
|
||||
// let luz = Arc::new(Mutex::new(client));
|
||||
// client
|
||||
// .tasks
|
||||
// .spawn(Client::read_thread(luz.clone(), read_sender));
|
||||
// client.tasks.spawn(Client::write_thread(write_receiver));
|
||||
// let luz = luz.into_inner();
|
||||
// Ok(Luz(luz))
|
||||
// }
|
||||
}
|
||||
|
||||
// instead of having a channel to send stanzas down, have a channel to send channels that you can send stanzas down, so if one channel fails (through disconnection, etc), you can move on to the next. could also have an enum to let them sit until reconnection.
|
||||
impl Client {
|
||||
async fn new(jid: &str, dburl: &str) -> Result<Self, Error> {
|
||||
let (read_sender, read_receiver) = mpsc::channel::<UpdateMessage>(20);
|
||||
let (write_sender, write_receiver) = mpsc::channel::<CommandMessage>(20);
|
||||
let mut jabber_client = JabberClient::new(jid, password)?;
|
||||
jabber_client.connect().await?;
|
||||
let (write, read) = jabber_client.split();
|
||||
let jid: JID = jid.try_into()?;
|
||||
let server = jid.domainpart.clone();
|
||||
let db = SqlitePool::connect(dburl).await?;
|
||||
// let bound_streams = jabber::connect_and_login(&mut jid, password, &mut server).await?;
|
||||
// let (read, write) = bound_streams.split();
|
||||
// let streams = SplitStreams { read: Mutex::new(read), write: Mutex::new(write) };
|
||||
let mut tasks = JoinSet::new();
|
||||
let client = Self {
|
||||
client: jabber_client,
|
||||
receiver: ReceiverStream::new(read_receiver),
|
||||
sender: PollSender::new(write_sender),
|
||||
pending_iqs: HashMap::new(),
|
||||
db,
|
||||
jid,
|
||||
server,
|
||||
// TODO: check for drop abort handles
|
||||
tasks,
|
||||
};
|
||||
tokio::spawn(client.process_read(read, read_sender));
|
||||
tokio::spawn(client.process_write(write, write_receiver));
|
||||
// store handles in client
|
||||
// send connection streams down channel created here
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
pub async fn process_read(
|
||||
&self,
|
||||
mut stream: SplitStream<JabberClient>,
|
||||
sender: mpsc::Sender<UpdateMessage>,
|
||||
) {
|
||||
for stanza in stream.next().await {
|
||||
tokio::spawn(self.process_stanza(stanza, sender.clone()));
|
||||
}
|
||||
pub async fn read_thread(client: Arc<Mutex<Self>>, sender: mpsc::Sender<UpdateMessage>) {}
|
||||
|
||||
pub async fn write_thread(receiver: mpsc::Receiver<CommandMessage>) {
|
||||
for message in receiver.recv().await {}
|
||||
}
|
||||
|
||||
pub async fn process_write(
|
||||
&self,
|
||||
mut sink: SplitSink<JabberClient, Stanza>,
|
||||
receiver: mpsc::Receiver<CommandMessage>,
|
||||
) {
|
||||
for message in receiver.recv_many(, )
|
||||
pub async fn connect(&mut self, password: &str) -> Result<(), Error> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Error {
|
||||
PollSend(PollSendError<CommandMessage>),
|
||||
Jabber(jabber::Error),
|
||||
SQL(sqlx::Error),
|
||||
JID(jid::ParseError),
|
||||
}
|
||||
|
||||
impl From<jid::ParseError> for Error {
|
||||
fn from(e: jid::ParseError) -> Self {
|
||||
Self::JID(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<sqlx::Error> for Error {
|
||||
fn from(e: sqlx::Error) -> Self {
|
||||
Self::SQL(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<jabber::Error> for Error {
|
||||
|
@ -88,35 +292,6 @@ impl Stream for Client {
|
|||
}
|
||||
}
|
||||
|
||||
impl Sink<CommandMessage> for Client {
|
||||
type Error = Error;
|
||||
|
||||
fn poll_ready(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(ready!(pin!(self).sender.poll_ready_unpin(cx)))
|
||||
}
|
||||
|
||||
fn start_send(self: std::pin::Pin<&mut Self>, item: CommandMessage) -> Result<(), Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Result<(), Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PollSendError<CommandMessage>> for Error {
|
||||
fn from(e: PollSendError<CommandMessage>) -> Self {
|
||||
Self::PollSend(e)
|
||||
|
@ -124,7 +299,9 @@ impl From<PollSendError<CommandMessage>> for Error {
|
|||
}
|
||||
|
||||
pub enum CommandMessage {
|
||||
Connect,
|
||||
Connect(String),
|
||||
Disconnect,
|
||||
/// gets the roster. if offline, retreives cached version from database. should be stored in application memory.
|
||||
GetRoster,
|
||||
SendMessage(JID, String),
|
||||
}
|
||||
|
|
|
@ -10,8 +10,8 @@ pub const XMLNS: &str = "jabber:iq:roster";
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Query {
|
||||
ver: Option<String>,
|
||||
items: Vec<Item>,
|
||||
pub ver: Option<String>,
|
||||
pub items: Vec<Item>,
|
||||
}
|
||||
|
||||
impl FromElement for Query {
|
||||
|
@ -36,11 +36,16 @@ impl IntoElement for Query {
|
|||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Item {
|
||||
/// signals subscription pre-approval (server only)
|
||||
approved: Option<bool>,
|
||||
/// signals subscription sub-states (server only)
|
||||
ask: bool,
|
||||
/// uniquely identifies item
|
||||
jid: JID,
|
||||
/// handle that is determined by user, not contact
|
||||
name: Option<String>,
|
||||
subscription: Subscription,
|
||||
/// state of the presence subscription
|
||||
subscription: Option<Subscription>,
|
||||
groups: Vec<Group>,
|
||||
}
|
||||
|
||||
|
@ -63,7 +68,7 @@ impl FromElement for Item {
|
|||
};
|
||||
let jid = element.attribute("jid")?;
|
||||
let name = element.attribute_opt("name")?;
|
||||
let subscription = element.attribute_opt("subscription")?.unwrap_or_default();
|
||||
let subscription = element.attribute_opt("subscription")?;
|
||||
let groups = element.pop_children()?;
|
||||
|
||||
Ok(Self {
|
||||
|
@ -91,7 +96,7 @@ impl IntoElement for Item {
|
|||
)
|
||||
.push_attribute("jid", self.jid.clone())
|
||||
.push_attribute_opt("name", self.name.clone())
|
||||
.push_attribute("subscription", self.subscription)
|
||||
.push_attribute_opt("subscription", self.subscription)
|
||||
.push_children(self.groups.clone())
|
||||
}
|
||||
}
|
||||
|
@ -134,6 +139,7 @@ impl ToString for Subscription {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
// TODO: check if should be option or not
|
||||
pub struct Group(Option<String>);
|
||||
|
||||
impl FromElement for Group {
|
||||
|
|
Loading…
Reference in New Issue