fix crash by fusing oneshot
This commit is contained in:
		
							parent
							
								
									7dc1b1f35d
								
							
						
					
					
						commit
						eda4bd92ff
					
				| 
						 | 
					@ -1 +1,2 @@
 | 
				
			||||||
luz.db
 | 
					luz.db
 | 
				
			||||||
 | 
					.sqlx/
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -8,6 +8,7 @@ use chat::{Body, Chat, Message};
 | 
				
			||||||
use connection::{write::WriteMessage, SupervisorSender};
 | 
					use connection::{write::WriteMessage, SupervisorSender};
 | 
				
			||||||
use db::Db;
 | 
					use db::Db;
 | 
				
			||||||
use error::{ConnectionError, Reason, RosterError, StatusError};
 | 
					use error::{ConnectionError, Reason, RosterError, StatusError};
 | 
				
			||||||
 | 
					use futures::{future::Fuse, FutureExt};
 | 
				
			||||||
use jabber::JID;
 | 
					use jabber::JID;
 | 
				
			||||||
use presence::{Offline, Online, Presence};
 | 
					use presence::{Offline, Online, Presence};
 | 
				
			||||||
use roster::{Contact, ContactUpdate};
 | 
					use roster::{Contact, ContactUpdate};
 | 
				
			||||||
| 
						 | 
					@ -47,7 +48,7 @@ pub struct Luz {
 | 
				
			||||||
    db: Db,
 | 
					    db: Db,
 | 
				
			||||||
    sender: mpsc::Sender<UpdateMessage>,
 | 
					    sender: mpsc::Sender<UpdateMessage>,
 | 
				
			||||||
    /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
 | 
					    /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
 | 
				
			||||||
    connection_supervisor_shutdown: oneshot::Receiver<()>,
 | 
					    connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
 | 
				
			||||||
    // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
 | 
					    // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
 | 
				
			||||||
    // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
 | 
					    // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
 | 
				
			||||||
    tasks: JoinSet<()>,
 | 
					    tasks: JoinSet<()>,
 | 
				
			||||||
| 
						 | 
					@ -60,7 +61,7 @@ impl Luz {
 | 
				
			||||||
        jid: Arc<Mutex<JID>>,
 | 
					        jid: Arc<Mutex<JID>>,
 | 
				
			||||||
        password: String,
 | 
					        password: String,
 | 
				
			||||||
        connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
 | 
					        connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
 | 
				
			||||||
        connection_supervisor_shutdown: oneshot::Receiver<()>,
 | 
					        connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
 | 
				
			||||||
        db: SqlitePool,
 | 
					        db: SqlitePool,
 | 
				
			||||||
        sender: mpsc::Sender<UpdateMessage>,
 | 
					        sender: mpsc::Sender<UpdateMessage>,
 | 
				
			||||||
    ) -> Self {
 | 
					    ) -> Self {
 | 
				
			||||||
| 
						 | 
					@ -82,9 +83,8 @@ impl Luz {
 | 
				
			||||||
        loop {
 | 
					        loop {
 | 
				
			||||||
            let msg = tokio::select! {
 | 
					            let msg = tokio::select! {
 | 
				
			||||||
                // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
 | 
					                // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
 | 
				
			||||||
                // THIS IS NOT OKAY LOLLLL
 | 
					                // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
 | 
				
			||||||
                _ = &mut self.connection_supervisor_shutdown => {
 | 
					                _ = &mut self.connection_supervisor_shutdown => {
 | 
				
			||||||
                    info!("got this");
 | 
					 | 
				
			||||||
                    *self.connected.lock().await = None;
 | 
					                    *self.connected.lock().await = None;
 | 
				
			||||||
                    continue;
 | 
					                    continue;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
| 
						 | 
					@ -130,6 +130,7 @@ impl Luz {
 | 
				
			||||||
                                        self.password.clone(),
 | 
					                                        self.password.clone(),
 | 
				
			||||||
                                        self.pending_iqs.clone(),
 | 
					                                        self.pending_iqs.clone(),
 | 
				
			||||||
                                    );
 | 
					                                    );
 | 
				
			||||||
 | 
					                                    let shutdown_recv = shutdown_recv.fuse();
 | 
				
			||||||
                                    self.connection_supervisor_shutdown = shutdown_recv;
 | 
					                                    self.connection_supervisor_shutdown = shutdown_recv;
 | 
				
			||||||
                                    // TODO: get roster and send initial presence
 | 
					                                    // TODO: get roster and send initial presence
 | 
				
			||||||
                                    let (send, recv) = oneshot::channel();
 | 
					                                    let (send, recv) = oneshot::channel();
 | 
				
			||||||
| 
						 | 
					@ -237,7 +238,7 @@ impl Luz {
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                    };
 | 
					                    };
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                CommandMessage::Disconnect(_offline) => {
 | 
					                CommandMessage::Disconnect(offline) => {
 | 
				
			||||||
                    match self.connected.lock().await.as_mut() {
 | 
					                    match self.connected.lock().await.as_mut() {
 | 
				
			||||||
                        None => {
 | 
					                        None => {
 | 
				
			||||||
                            let _ = self
 | 
					                            let _ = self
 | 
				
			||||||
| 
						 | 
					@ -247,15 +248,19 @@ impl Luz {
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                        mut c => {
 | 
					                        mut c => {
 | 
				
			||||||
                            // TODO: send unavailable presence
 | 
					                            // TODO: send unavailable presence
 | 
				
			||||||
                            if let Some((_write_handle, supervisor_handle)) = c.take() {
 | 
					                            if let Some((write_handle, supervisor_handle)) = c.take() {
 | 
				
			||||||
 | 
					                                let offline_presence: stanza::client::presence::Presence =
 | 
				
			||||||
 | 
					                                    offline.clone().into();
 | 
				
			||||||
 | 
					                                let stanza = Stanza::Presence(offline_presence);
 | 
				
			||||||
 | 
					                                // TODO: timeout and error check
 | 
				
			||||||
 | 
					                                write_handle.write(stanza).await;
 | 
				
			||||||
                                let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
 | 
					                                let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
 | 
				
			||||||
                                c = None;
 | 
					                                let _ = self.sender.send(UpdateMessage::Offline(offline)).await;
 | 
				
			||||||
                            } else {
 | 
					                            } else {
 | 
				
			||||||
                                unreachable!()
 | 
					                                unreachable!()
 | 
				
			||||||
                            };
 | 
					                            };
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    info!("lock released")
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                _ => {
 | 
					                _ => {
 | 
				
			||||||
                    match self.connected.lock().await.as_ref() {
 | 
					                    match self.connected.lock().await.as_ref() {
 | 
				
			||||||
| 
						 | 
					@ -1004,6 +1009,7 @@ impl LuzHandle {
 | 
				
			||||||
        let (update_sender, update_receiver) = mpsc::channel(20);
 | 
					        let (update_sender, update_receiver) = mpsc::channel(20);
 | 
				
			||||||
        // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
 | 
					        // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
 | 
				
			||||||
        let (sup_send, sup_recv) = oneshot::channel();
 | 
					        let (sup_send, sup_recv) = oneshot::channel();
 | 
				
			||||||
 | 
					        let mut sup_recv = sup_recv.fuse();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let actor = Luz::new(
 | 
					        let actor = Luz::new(
 | 
				
			||||||
            command_sender.clone(),
 | 
					            command_sender.clone(),
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -12,9 +12,13 @@ use tracing::info;
 | 
				
			||||||
#[tokio::main]
 | 
					#[tokio::main]
 | 
				
			||||||
async fn main() {
 | 
					async fn main() {
 | 
				
			||||||
    tracing_subscriber::fmt::init();
 | 
					    tracing_subscriber::fmt::init();
 | 
				
			||||||
    let db = SqlitePool::connect("./luz.db").await.unwrap();
 | 
					    let (luz, mut recv) = LuzHandle::new(
 | 
				
			||||||
    let (luz, mut recv) =
 | 
					        "test@blos.sm".try_into().unwrap(),
 | 
				
			||||||
        LuzHandle::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
 | 
					        "slayed".to_string(),
 | 
				
			||||||
 | 
					        "./luz.db",
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					    .await
 | 
				
			||||||
 | 
					    .unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    tokio::spawn(async move {
 | 
					    tokio::spawn(async move {
 | 
				
			||||||
        while let Some(msg) = recv.recv().await {
 | 
					        while let Some(msg) = recv.recv().await {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -91,3 +91,22 @@ impl From<Online> for stanza::client::presence::Presence {
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl From<Offline> for stanza::client::presence::Presence {
 | 
				
			||||||
 | 
					    fn from(value: Offline) -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            from: None,
 | 
				
			||||||
 | 
					            id: None,
 | 
				
			||||||
 | 
					            to: None,
 | 
				
			||||||
 | 
					            r#type: Some(stanza::client::presence::PresenceType::Unavailable),
 | 
				
			||||||
 | 
					            lang: None,
 | 
				
			||||||
 | 
					            show: None,
 | 
				
			||||||
 | 
					            status: value.status.map(|status| stanza::client::presence::Status {
 | 
				
			||||||
 | 
					                lang: None,
 | 
				
			||||||
 | 
					                status: String1024(status),
 | 
				
			||||||
 | 
					            }),
 | 
				
			||||||
 | 
					            priority: None,
 | 
				
			||||||
 | 
					            errors: Vec::new(),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue