use SSE for streaming events

The Mastodon API doesn't use WebSockets for sending events, it uses SSE.
That is to say, it sends events as lines in a continually-streamed
response.
This commit is contained in:
D. Scott Boggs 2022-12-18 17:25:53 -05:00
parent c811f42054
commit 610d51c593
3 changed files with 71 additions and 68 deletions

View File

@ -62,6 +62,9 @@ optional = true
version = "1.22.0"
features = ["rt-multi-thread", "macros"]
[dependencies.tokio-util]
version = "0.7.4"
features = ["io"]
[dev-dependencies]
tokio-test = "0.4.2"

View File

@ -1,61 +1,49 @@
use std::io;
use crate::{
entities::{event::Event, prelude::Notification, status::Status},
errors::Result,
Error,
};
use futures::{stream::try_unfold, TryStream};
use futures::{stream::try_unfold, TryStream, TryStreamExt};
use log::{as_debug, as_serde, debug, error, info, trace};
use tungstenite::Message;
use reqwest::Response;
use tokio::io::AsyncBufReadExt;
use tokio_util::io::StreamReader;
/// Returns a stream of events at the given url location.
/// Return a stream of events from the given response by parsing Server-Sent
/// Events as they come in.
///
/// See https://docs.joinmastodon.org/methods/streaming/ for more info
pub fn event_stream(
response: Response,
location: String,
) -> Result<impl TryStream<Ok = Event, Error = Error, Item = Result<Event>>> {
trace!(location = location; "connecting to websocket for events");
let (client, response) = tungstenite::connect(&location)?;
let status = response.status();
if !status.is_success() {
error!(
status = as_debug!(status),
body = response.body().as_ref().map(|it| String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()),
location = &location;
"error connecting to websocket"
);
return Err(Error::Api(crate::ApiError {
error: status.canonical_reason().map(String::from),
error_description: None,
}));
}
debug!(location = &location, status = as_debug!(status); "successfully connected to websocket");
Ok(try_unfold((client, location), |mut this| async move {
let (ref mut client, ref location) = this;
) -> impl TryStream<Ok = Event, Error = Error> {
let stream = StreamReader::new(response.bytes_stream().map_err(|err| {
error!(err = as_debug!(err); "error reading stream");
io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}"))
}));
let lines_iter = stream.lines();
try_unfold((lines_iter, location), |mut this| async move {
let (ref mut lines_iter, ref location) = this;
let mut lines = vec![];
loop {
match client.read_message() {
Ok(Message::Text(line)) => {
debug!(message = line, location = &location; "received websocket message");
let line = line.trim().to_string();
if line.starts_with(":") || line.is_empty() {
continue;
}
lines.push(line);
if let Ok(event) = make_event(&lines) {
info!(event = as_serde!(event), location = location; "received websocket event");
lines.clear();
return Ok(Some((event, this)));
} else {
continue;
}
},
Ok(Message::Ping(data)) => {
debug!(metadata = as_serde!(data); "received ping, ponging");
client.write_message(Message::Pong(data))?;
},
Ok(message) => return Err(message.into()),
Err(err) => return Err(err.into()),
while let Some(line) = lines_iter.next_line().await? {
debug!(message = line, location = &location; "received websocket message");
let line = line.trim().to_string();
if line.starts_with(":") || line.is_empty() {
continue;
}
lines.push(line);
if let Ok(event) = make_event(&lines) {
info!(event = as_serde!(event), location = location; "received websocket event");
lines.clear();
return Ok(Some((event, this)));
} else {
continue;
}
}
}))
Ok(None)
})
}
fn make_event(lines: &[String]) -> Result<Event> {

View File

@ -21,7 +21,7 @@ use crate::{
UpdateCredsRequest,
UpdatePushRequest,
};
use futures::TryStream;
use futures::{stream::try_unfold, TryStream};
use log::{as_debug, as_serde, debug, error, info, trace};
use reqwest::{Client, RequestBuilder};
use url::Url;
@ -394,29 +394,41 @@ impl Mastodon {
/// });
/// ```
pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error = Error>> {
let call_id = Uuid::new_v4();
let mut url: Url = self.route("/api/v1/streaming").parse()?;
url.query_pairs_mut()
.append_pair("access_token", &self.data.token)
.append_pair("stream", "user");
let url = self.route("/api/v1/streaming/user");
let response = self.authenticated(self.client.get(&url)).send().await?;
debug!(
url = url.as_str(), call_id = as_debug!(call_id);
"making user streaming API request"
status = log_serde!(response Status), url = &url,
headers = log_serde!(response Headers);
"received API response"
);
let response = reqwest::get(url.as_str()).await?;
let mut url: Url = response.url().as_str().parse()?;
info!(
url = url.as_str(), call_id = as_debug!(call_id),
status = response.status().as_str();
"received url from streaming API request"
);
let new_scheme = match url.scheme() {
"http" => "ws",
"https" => "wss",
x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
};
url.set_scheme(new_scheme)
.map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
Ok(event_stream(response.error_for_status()?, url))
}
// pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error
// = Error>> { let call_id = Uuid::new_v4();
// let mut url: Url = self.route("/api/v1/streaming").parse()?;
// url.query_pairs_mut()
// .append_pair("access_token", &self.data.token)
// .append_pair("stream", "user");
// debug!(
// url = url.as_str(), call_id = as_debug!(call_id);
// "making user streaming API request"
// );
// let response = reqwest::get(url.as_str()).await?;
// let mut url: Url = response.url().as_str().parse()?;
// info!(
// url = url.as_str(), call_id = as_debug!(call_id),
// status = response.status().as_str();
// "received url from streaming API request"
// );
// let new_scheme = match url.scheme() {
// "http" => "ws",
// "https" => "wss",
// x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
// };
// url.set_scheme(new_scheme)
// .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
// }
/// Set the bearer authentication token
fn authenticated(&self, request: RequestBuilder) -> RequestBuilder {