Pass client instance to the event stream
This commit is contained in:
parent
6f5a2a5e90
commit
1884ea272c
|
@ -15,7 +15,7 @@ async fn run() -> Result<()> {
|
||||||
let stream = mastodon.stream_user().await?;
|
let stream = mastodon.stream_user().await?;
|
||||||
info!("watching mastodon for events. This will run forever, press Ctrl+C to kill the program.");
|
info!("watching mastodon for events. This will run forever, press Ctrl+C to kill the program.");
|
||||||
stream
|
stream
|
||||||
.try_for_each(|event| async move {
|
.try_for_each(|(event, _client)| async move {
|
||||||
match event {
|
match event {
|
||||||
// fill in how you want to handle events here.
|
// fill in how you want to handle events here.
|
||||||
_ => warn!(event = as_serde!(event); "unrecognized event received"),
|
_ => warn!(event = as_serde!(event); "unrecognized event received"),
|
||||||
|
|
|
@ -1,10 +1,6 @@
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use crate::{
|
use crate::{errors::Result, prelude::*, Error};
|
||||||
entities::{event::Event, prelude::Notification, status::Status},
|
|
||||||
errors::Result,
|
|
||||||
Error,
|
|
||||||
};
|
|
||||||
use futures::{stream::try_unfold, TryStream, TryStreamExt};
|
use futures::{stream::try_unfold, TryStream, TryStreamExt};
|
||||||
use log::{as_debug, as_serde, debug, error, info, trace};
|
use log::{as_debug, as_serde, debug, error, info, trace};
|
||||||
use reqwest::Response;
|
use reqwest::Response;
|
||||||
|
@ -18,14 +14,15 @@ use tokio_util::io::StreamReader;
|
||||||
pub fn event_stream(
|
pub fn event_stream(
|
||||||
response: Response,
|
response: Response,
|
||||||
location: String,
|
location: String,
|
||||||
) -> impl TryStream<Ok = Event, Error = Error> {
|
client: &Mastodon,
|
||||||
|
) -> impl TryStream<Ok = (Event, Mastodon), Error = Error> + '_ {
|
||||||
let stream = StreamReader::new(response.bytes_stream().map_err(|err| {
|
let stream = StreamReader::new(response.bytes_stream().map_err(|err| {
|
||||||
error!(err = as_debug!(err); "error reading stream");
|
error!(err = as_debug!(err); "error reading stream");
|
||||||
io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}"))
|
io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}"))
|
||||||
}));
|
}));
|
||||||
let lines_iter = stream.lines();
|
let lines_iter = stream.lines();
|
||||||
try_unfold((lines_iter, location), |mut this| async move {
|
try_unfold((lines_iter, location, client), |mut this| async move {
|
||||||
let (ref mut lines_iter, ref location) = this;
|
let (ref mut lines_iter, ref location, client) = this;
|
||||||
let mut lines = vec![];
|
let mut lines = vec![];
|
||||||
while let Some(line) = lines_iter.next_line().await? {
|
while let Some(line) = lines_iter.next_line().await? {
|
||||||
debug!(message = line, location = &location; "received message");
|
debug!(message = line, location = &location; "received message");
|
||||||
|
@ -37,7 +34,7 @@ pub fn event_stream(
|
||||||
if let Ok(event) = make_event(&lines) {
|
if let Ok(event) = make_event(&lines) {
|
||||||
info!(event = as_serde!(event), location = location; "received event");
|
info!(event = as_serde!(event), location = location; "received event");
|
||||||
lines.clear();
|
lines.clear();
|
||||||
return Ok(Some((event, this)));
|
return Ok(Some(((event, client.clone()), this)));
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -46,7 +43,7 @@ pub fn event_stream(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_event(lines: &[String]) -> Result<Event> {
|
pub(crate) fn make_event(lines: &[String]) -> Result<Event> {
|
||||||
let event;
|
let event;
|
||||||
let data;
|
let data;
|
||||||
if let Some(event_line) = lines.iter().find(|line| line.starts_with("event:")) {
|
if let Some(event_line) = lines.iter().find(|line| line.starts_with("event:")) {
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
//! let client = Mastodon::from(data);
|
//! let client = Mastodon::from(data);
|
||||||
//! tokio_test::block_on(async {
|
//! tokio_test::block_on(async {
|
||||||
//! let stream = client.stream_user().await.unwrap();
|
//! let stream = client.stream_user().await.unwrap();
|
||||||
//! stream.try_for_each(|event| async move {
|
//! stream.try_for_each(|(event, _client)| async move {
|
||||||
//! match event {
|
//! match event {
|
||||||
//! Event::Update(ref status) => { /* .. */ },
|
//! Event::Update(ref status) => { /* .. */ },
|
||||||
//! Event::Notification(ref notification) => { /* .. */ },
|
//! Event::Notification(ref notification) => { /* .. */ },
|
||||||
|
|
|
@ -527,7 +527,8 @@ tokio_test::block_on(async {
|
||||||
}).await.unwrap();
|
}).await.unwrap();
|
||||||
});"
|
});"
|
||||||
),
|
),
|
||||||
pub async fn $fn_name(&self) -> Result<impl TryStream<Ok=Event, Error=Error>> {
|
pub async fn $fn_name(&self) -> Result<impl TryStream<Ok=(Event, Mastodon), Error=Error> + '_> {
|
||||||
|
use $crate::event_stream::event_stream;
|
||||||
let url = self.route(&format!("/api/v1/streaming/{}", $stream));
|
let url = self.route(&format!("/api/v1/streaming/{}", $stream));
|
||||||
let response = self.authenticated(self.client.get(&url)).header("Accept", "application/json").send().await?;
|
let response = self.authenticated(self.client.get(&url)).header("Accept", "application/json").send().await?;
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -537,7 +538,7 @@ tokio_test::block_on(async {
|
||||||
);
|
);
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
if status.is_success() {
|
if status.is_success() {
|
||||||
Ok(event_stream(response, url))
|
Ok(event_stream(response, url, self))
|
||||||
} else {
|
} else {
|
||||||
let response = response.json().await?;
|
let response = response.json().await?;
|
||||||
Err(Error::Api{ status, response })
|
Err(Error::Api{ status, response })
|
||||||
|
@ -575,7 +576,8 @@ tokio_test::block_on(async {
|
||||||
}).await.unwrap();
|
}).await.unwrap();
|
||||||
});"
|
});"
|
||||||
),
|
),
|
||||||
pub async fn $fn_name(&self, $param: $param_type) -> Result<impl TryStream<Ok=Event, Error=Error>> {
|
pub async fn $fn_name(&self, $param: $param_type) -> Result<impl TryStream<Ok=(Event, Mastodon), Error=Error> + '_> {
|
||||||
|
use $crate::event_stream::event_stream;
|
||||||
let mut url: Url = self.route(concat!("/api/v1/streaming/", stringify!($stream))).parse()?;
|
let mut url: Url = self.route(concat!("/api/v1/streaming/", stringify!($stream))).parse()?;
|
||||||
url.query_pairs_mut().append_pair(stringify!($param), $param.as_ref());
|
url.query_pairs_mut().append_pair(stringify!($param), $param.as_ref());
|
||||||
let url = url.to_string();
|
let url = url.to_string();
|
||||||
|
@ -587,7 +589,7 @@ tokio_test::block_on(async {
|
||||||
);
|
);
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
if status.is_success() {
|
if status.is_success() {
|
||||||
Ok(event_stream(response, url))
|
Ok(event_stream(response, url, self))
|
||||||
} else {
|
} else {
|
||||||
let response = response.json().await?;
|
let response = response.json().await?;
|
||||||
Err(Error::Api{ status, response })
|
Err(Error::Api{ status, response })
|
||||||
|
|
|
@ -9,7 +9,6 @@ use crate::{
|
||||||
Empty,
|
Empty,
|
||||||
},
|
},
|
||||||
errors::{Error, Result},
|
errors::{Error, Result},
|
||||||
event_stream::event_stream,
|
|
||||||
helpers::read_response::read_response,
|
helpers::read_response::read_response,
|
||||||
log_serde,
|
log_serde,
|
||||||
polling_time::PollingTime,
|
polling_time::PollingTime,
|
||||||
|
|
Loading…
Reference in New Issue