diff --git a/Cargo.toml b/Cargo.toml index 138f53f..8e4eb07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,9 @@ optional = true version = "1.22.0" features = ["rt-multi-thread", "macros"] +[dependencies.tokio-util] +version = "0.7.4" +features = ["io"] [dev-dependencies] tokio-test = "0.4.2" diff --git a/src/event_stream.rs b/src/event_stream.rs index 611b194..963025d 100644 --- a/src/event_stream.rs +++ b/src/event_stream.rs @@ -1,61 +1,49 @@ +use std::io; + use crate::{ entities::{event::Event, prelude::Notification, status::Status}, errors::Result, Error, }; -use futures::{stream::try_unfold, TryStream}; +use futures::{stream::try_unfold, TryStream, TryStreamExt}; use log::{as_debug, as_serde, debug, error, info, trace}; -use tungstenite::Message; +use reqwest::Response; +use tokio::io::AsyncBufReadExt; +use tokio_util::io::StreamReader; -/// Returns a stream of events at the given url location. +/// Return a stream of events from the given response by parsing Server-Sent +/// Events as they come in. +/// +/// See https://docs.joinmastodon.org/methods/streaming/ for more info pub fn event_stream( + response: Response, location: String, -) -> Result>> { - trace!(location = location; "connecting to websocket for events"); - let (client, response) = tungstenite::connect(&location)?; - let status = response.status(); - if !status.is_success() { - error!( - status = as_debug!(status), - body = response.body().as_ref().map(|it| String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()), - location = &location; - "error connecting to websocket" - ); - return Err(Error::Api(crate::ApiError { - error: status.canonical_reason().map(String::from), - error_description: None, - })); - } - debug!(location = &location, status = as_debug!(status); "successfully connected to websocket"); - Ok(try_unfold((client, location), |mut this| async move { - let (ref mut client, ref location) = this; +) -> impl TryStream { + let stream = StreamReader::new(response.bytes_stream().map_err(|err| { + error!(err = as_debug!(err); "error reading stream"); + io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}")) + })); + let lines_iter = stream.lines(); + try_unfold((lines_iter, location), |mut this| async move { + let (ref mut lines_iter, ref location) = this; let mut lines = vec![]; - loop { - match client.read_message() { - Ok(Message::Text(line)) => { - debug!(message = line, location = &location; "received websocket message"); - let line = line.trim().to_string(); - if line.starts_with(":") || line.is_empty() { - continue; - } - lines.push(line); - if let Ok(event) = make_event(&lines) { - info!(event = as_serde!(event), location = location; "received websocket event"); - lines.clear(); - return Ok(Some((event, this))); - } else { - continue; - } - }, - Ok(Message::Ping(data)) => { - debug!(metadata = as_serde!(data); "received ping, ponging"); - client.write_message(Message::Pong(data))?; - }, - Ok(message) => return Err(message.into()), - Err(err) => return Err(err.into()), + while let Some(line) = lines_iter.next_line().await? { + debug!(message = line, location = &location; "received websocket message"); + let line = line.trim().to_string(); + if line.starts_with(":") || line.is_empty() { + continue; + } + lines.push(line); + if let Ok(event) = make_event(&lines) { + info!(event = as_serde!(event), location = location; "received websocket event"); + lines.clear(); + return Ok(Some((event, this))); + } else { + continue; } } - })) + Ok(None) + }) } fn make_event(lines: &[String]) -> Result { diff --git a/src/mastodon.rs b/src/mastodon.rs index 1b3ee0a..b7fb9c7 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -21,7 +21,7 @@ use crate::{ UpdateCredsRequest, UpdatePushRequest, }; -use futures::TryStream; +use futures::{stream::try_unfold, TryStream}; use log::{as_debug, as_serde, debug, error, info, trace}; use reqwest::{Client, RequestBuilder}; use url::Url; @@ -394,29 +394,41 @@ impl Mastodon { /// }); /// ``` pub async fn streaming_user(&self) -> Result> { - let call_id = Uuid::new_v4(); - let mut url: Url = self.route("/api/v1/streaming").parse()?; - url.query_pairs_mut() - .append_pair("access_token", &self.data.token) - .append_pair("stream", "user"); + let url = self.route("/api/v1/streaming/user"); + let response = self.authenticated(self.client.get(&url)).send().await?; debug!( - url = url.as_str(), call_id = as_debug!(call_id); - "making user streaming API request" + status = log_serde!(response Status), url = &url, + headers = log_serde!(response Headers); + "received API response" ); - let response = reqwest::get(url.as_str()).await?; - let mut url: Url = response.url().as_str().parse()?; - info!( - url = url.as_str(), call_id = as_debug!(call_id), - status = response.status().as_str(); - "received url from streaming API request" - ); - let new_scheme = match url.scheme() { - "http" => "ws", - "https" => "wss", - x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), - }; - url.set_scheme(new_scheme) - .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + Ok(event_stream(response.error_for_status()?, url)) + } + + // pub async fn streaming_user(&self) -> Result> { let call_id = Uuid::new_v4(); + // let mut url: Url = self.route("/api/v1/streaming").parse()?; + // url.query_pairs_mut() + // .append_pair("access_token", &self.data.token) + // .append_pair("stream", "user"); + // debug!( + // url = url.as_str(), call_id = as_debug!(call_id); + // "making user streaming API request" + // ); + // let response = reqwest::get(url.as_str()).await?; + // let mut url: Url = response.url().as_str().parse()?; + // info!( + // url = url.as_str(), call_id = as_debug!(call_id), + // status = response.status().as_str(); + // "received url from streaming API request" + // ); + // let new_scheme = match url.scheme() { + // "http" => "ws", + // "https" => "wss", + // x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + // }; + // url.set_scheme(new_scheme) + // .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + // } /// Set the bearer authentication token fn authenticated(&self, request: RequestBuilder) -> RequestBuilder {