2022-12-18 22:25:53 +00:00
|
|
|
use std::io;
|
|
|
|
|
2022-12-05 13:52:48 +00:00
|
|
|
use crate::{
|
|
|
|
entities::{event::Event, prelude::Notification, status::Status},
|
|
|
|
errors::Result,
|
|
|
|
Error,
|
|
|
|
};
|
2022-12-18 22:25:53 +00:00
|
|
|
use futures::{stream::try_unfold, TryStream, TryStreamExt};
|
2022-12-07 20:58:28 +00:00
|
|
|
use log::{as_debug, as_serde, debug, error, info, trace};
|
2022-12-18 22:25:53 +00:00
|
|
|
use reqwest::Response;
|
|
|
|
use tokio::io::AsyncBufReadExt;
|
|
|
|
use tokio_util::io::StreamReader;
|
2022-12-05 13:52:48 +00:00
|
|
|
|
2022-12-18 22:25:53 +00:00
|
|
|
/// Return a stream of events from the given response by parsing Server-Sent
|
|
|
|
/// Events as they come in.
|
|
|
|
///
|
|
|
|
/// See https://docs.joinmastodon.org/methods/streaming/ for more info
|
2022-12-05 13:52:48 +00:00
|
|
|
pub fn event_stream(
|
2022-12-18 22:25:53 +00:00
|
|
|
response: Response,
|
2022-12-07 20:58:28 +00:00
|
|
|
location: String,
|
2022-12-18 22:25:53 +00:00
|
|
|
) -> impl TryStream<Ok = Event, Error = Error> {
|
|
|
|
let stream = StreamReader::new(response.bytes_stream().map_err(|err| {
|
|
|
|
error!(err = as_debug!(err); "error reading stream");
|
|
|
|
io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}"))
|
|
|
|
}));
|
|
|
|
let lines_iter = stream.lines();
|
|
|
|
try_unfold((lines_iter, location), |mut this| async move {
|
|
|
|
let (ref mut lines_iter, ref location) = this;
|
2022-12-05 13:52:48 +00:00
|
|
|
let mut lines = vec![];
|
2022-12-18 22:25:53 +00:00
|
|
|
while let Some(line) = lines_iter.next_line().await? {
|
|
|
|
debug!(message = line, location = &location; "received websocket message");
|
|
|
|
let line = line.trim().to_string();
|
|
|
|
if line.starts_with(":") || line.is_empty() {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
lines.push(line);
|
|
|
|
if let Ok(event) = make_event(&lines) {
|
|
|
|
info!(event = as_serde!(event), location = location; "received websocket event");
|
|
|
|
lines.clear();
|
|
|
|
return Ok(Some((event, this)));
|
|
|
|
} else {
|
|
|
|
continue;
|
2022-12-05 13:52:48 +00:00
|
|
|
}
|
|
|
|
}
|
2022-12-18 22:25:53 +00:00
|
|
|
Ok(None)
|
|
|
|
})
|
2022-12-05 13:52:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn make_event(lines: &[String]) -> Result<Event> {
|
|
|
|
let event;
|
|
|
|
let data;
|
|
|
|
if let Some(event_line) = lines.iter().find(|line| line.starts_with("event:")) {
|
|
|
|
event = event_line[6..].trim().to_string();
|
|
|
|
data = lines
|
|
|
|
.iter()
|
|
|
|
.find(|line| line.starts_with("data:"))
|
|
|
|
.map(|x| x[5..].trim().to_string());
|
|
|
|
} else {
|
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct Message {
|
|
|
|
pub event: String,
|
|
|
|
pub payload: Option<String>,
|
|
|
|
}
|
|
|
|
let message = serde_json::from_str::<Message>(&lines[0])?;
|
|
|
|
event = message.event;
|
|
|
|
data = message.payload;
|
|
|
|
}
|
|
|
|
let event: &str = &event;
|
2022-12-07 20:58:28 +00:00
|
|
|
trace!(event = event, payload = data; "websocket message parsed");
|
2022-12-05 13:52:48 +00:00
|
|
|
Ok(match event {
|
|
|
|
"notification" => {
|
|
|
|
let data = data
|
|
|
|
.ok_or_else(|| Error::Other("Missing `data` line for notification".to_string()))?;
|
|
|
|
let notification = serde_json::from_str::<Notification>(&data)?;
|
|
|
|
Event::Notification(notification)
|
|
|
|
},
|
|
|
|
"update" => {
|
|
|
|
let data =
|
|
|
|
data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?;
|
|
|
|
let status = serde_json::from_str::<Status>(&data)?;
|
|
|
|
Event::Update(status)
|
|
|
|
},
|
|
|
|
"delete" => {
|
|
|
|
let data =
|
|
|
|
data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?;
|
|
|
|
Event::Delete(data)
|
|
|
|
},
|
|
|
|
"filters_changed" => Event::FiltersChanged,
|
|
|
|
_ => return Err(Error::Other(format!("Unknown event `{}`", event))),
|
|
|
|
})
|
|
|
|
}
|