diff --git a/Cargo.lock b/Cargo.lock index 07dcb68..8b7ac54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2365,7 +2365,6 @@ dependencies = [ "clap", "dotenv", "env_logger", - "futures", "http 1.0.0", "libipld-core", "lingua", @@ -2378,6 +2377,7 @@ dependencies = [ "sk-cbor", "sqlx", "tokio", + "tokio-stream", "tokio-tungstenite", ] diff --git a/Cargo.toml b/Cargo.toml index efad474..737316d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ chrono = "0.4.31" clap = { version = "4.4.16", features = ["derive"] } dotenv = "0.15.0" env_logger = "0.10.1" -futures = "0.3.30" http = "1.0.0" libipld-core = { version = "0.16.0", features = ["serde-codec"] } lingua = "1.6.2" @@ -31,4 +30,5 @@ serde_ipld_dagcbor = "0.4.2" sk-cbor = "0.1.2" sqlx = { version = "0.7.3", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] } tokio = { version = "1.35.1", features = ["full"] } +tokio-stream = "0.1.14" tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 3c166e5..3214593 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -1,4 +1,5 @@ use std::matches; +use std::time::Duration; use anyhow::{anyhow, Result}; use atrium_api::agent::{store::MemorySessionStore, AtpAgent}; @@ -6,9 +7,9 @@ use atrium_api::blob::BlobRef; use atrium_api::records::Record; use atrium_xrpc_client::reqwest::ReqwestClient; use chrono::Utc; -use futures::StreamExt; use http::StatusCode; use log::error; +use tokio_stream::StreamExt; use tokio_tungstenite::{connect_async, tungstenite}; use super::entities::ProfileDetails; @@ -21,6 +22,7 @@ pub struct Bluesky { impl Bluesky { pub const XRPC_HOST: &'static str = "https://bsky.social"; pub const FIREHOSE_HOST: &'static str = "wss://bsky.network"; + pub const STREAMING_TIMEOUT: Duration = Duration::from_secs(60); pub fn unauthenticated() -> Self { Self { @@ -152,9 +154,11 @@ impl Bluesky { ), }; - let (mut stream, _) = connect_async(url).await?; + let (stream, _) = connect_async(url).await?; + let stream = stream.timeout(Self::STREAMING_TIMEOUT); + let mut stream = Box::pin(stream); - while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { + while let Some(Ok(tungstenite::Message::Binary(message))) = stream.try_next().await? { if let Err(e) = handle_message(&message, processor).await { error!("Error handling a message: {:?}", e); }