From 1884ea272c34f30aedb6c6d257d533601cfde583 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Mon, 13 Feb 2023 11:54:49 -0500 Subject: [PATCH] Pass client instance to the event stream --- examples/log_events.rs | 2 +- src/event_stream.rs | 17 +++++++---------- src/lib.rs | 2 +- src/macros.rs | 10 ++++++---- src/mastodon.rs | 1 - 5 files changed, 15 insertions(+), 17 deletions(-) diff --git a/examples/log_events.rs b/examples/log_events.rs index 3dd8a80..8858f4d 100644 --- a/examples/log_events.rs +++ b/examples/log_events.rs @@ -15,7 +15,7 @@ async fn run() -> Result<()> { let stream = mastodon.stream_user().await?; info!("watching mastodon for events. This will run forever, press Ctrl+C to kill the program."); stream - .try_for_each(|event| async move { + .try_for_each(|(event, _client)| async move { match event { // fill in how you want to handle events here. _ => warn!(event = as_serde!(event); "unrecognized event received"), diff --git a/src/event_stream.rs b/src/event_stream.rs index c35ac9a..d6cb7c0 100644 --- a/src/event_stream.rs +++ b/src/event_stream.rs @@ -1,10 +1,6 @@ use std::io; -use crate::{ - entities::{event::Event, prelude::Notification, status::Status}, - errors::Result, - Error, -}; +use crate::{errors::Result, prelude::*, Error}; use futures::{stream::try_unfold, TryStream, TryStreamExt}; use log::{as_debug, as_serde, debug, error, info, trace}; use reqwest::Response; @@ -18,14 +14,15 @@ use tokio_util::io::StreamReader; pub fn event_stream( response: Response, location: String, -) -> impl TryStream { + client: &Mastodon, +) -> impl TryStream + '_ { let stream = StreamReader::new(response.bytes_stream().map_err(|err| { error!(err = as_debug!(err); "error reading stream"); io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}")) })); let lines_iter = stream.lines(); - try_unfold((lines_iter, location), |mut this| async move { - let (ref mut lines_iter, ref location) = this; + try_unfold((lines_iter, location, client), |mut this| async move { + let (ref mut lines_iter, ref location, client) = this; let mut lines = vec![]; while let Some(line) = lines_iter.next_line().await? { debug!(message = line, location = &location; "received message"); @@ -37,7 +34,7 @@ pub fn event_stream( if let Ok(event) = make_event(&lines) { info!(event = as_serde!(event), location = location; "received event"); lines.clear(); - return Ok(Some((event, this))); + return Ok(Some(((event, client.clone()), this))); } else { continue; } @@ -46,7 +43,7 @@ pub fn event_stream( }) } -fn make_event(lines: &[String]) -> Result { +pub(crate) fn make_event(lines: &[String]) -> Result { let event; let data; if let Some(event_line) = lines.iter().find(|line| line.starts_with("event:")) { diff --git a/src/lib.rs b/src/lib.rs index 60dc78e..1188636 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,7 +40,7 @@ //! let client = Mastodon::from(data); //! tokio_test::block_on(async { //! let stream = client.stream_user().await.unwrap(); -//! stream.try_for_each(|event| async move { +//! stream.try_for_each(|(event, _client)| async move { //! match event { //! Event::Update(ref status) => { /* .. */ }, //! Event::Notification(ref notification) => { /* .. */ }, diff --git a/src/macros.rs b/src/macros.rs index 1ebbfba..24373a1 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -527,7 +527,8 @@ tokio_test::block_on(async { }).await.unwrap(); });" ), - pub async fn $fn_name(&self) -> Result> { + pub async fn $fn_name(&self) -> Result + '_> { + use $crate::event_stream::event_stream; let url = self.route(&format!("/api/v1/streaming/{}", $stream)); let response = self.authenticated(self.client.get(&url)).header("Accept", "application/json").send().await?; debug!( @@ -537,7 +538,7 @@ tokio_test::block_on(async { ); let status = response.status(); if status.is_success() { - Ok(event_stream(response, url)) + Ok(event_stream(response, url, self)) } else { let response = response.json().await?; Err(Error::Api{ status, response }) @@ -575,7 +576,8 @@ tokio_test::block_on(async { }).await.unwrap(); });" ), - pub async fn $fn_name(&self, $param: $param_type) -> Result> { + pub async fn $fn_name(&self, $param: $param_type) -> Result + '_> { + use $crate::event_stream::event_stream; let mut url: Url = self.route(concat!("/api/v1/streaming/", stringify!($stream))).parse()?; url.query_pairs_mut().append_pair(stringify!($param), $param.as_ref()); let url = url.to_string(); @@ -587,7 +589,7 @@ tokio_test::block_on(async { ); let status = response.status(); if status.is_success() { - Ok(event_stream(response, url)) + Ok(event_stream(response, url, self)) } else { let response = response.json().await?; Err(Error::Api{ status, response }) diff --git a/src/mastodon.rs b/src/mastodon.rs index 68f976d..d337f56 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -9,7 +9,6 @@ use crate::{ Empty, }, errors::{Error, Result}, - event_stream::event_stream, helpers::read_response::read_response, log_serde, polling_time::PollingTime,