From 648de8c8e541ca981f3d857e60eb6e97ad76eed3 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Wed, 7 Dec 2022 16:20:20 -0500 Subject: [PATCH] Add read_response helper This fixes a bug where a chunked response would be partially read and then hang forever waiting for another chunk, and adds additional debug logging to the request process. --- Cargo.toml | 2 +- src/helpers/read_response.rs | 57 ++++++++++++++++++++++++++++++++++++ src/macros.rs | 4 +-- src/mastodon.rs | 13 ++++---- src/page.rs | 12 ++++---- src/registration.rs | 5 ++-- 6 files changed, 76 insertions(+), 17 deletions(-) create mode 100644 src/helpers/read_response.rs diff --git a/Cargo.toml b/Cargo.toml index 5256a91..138f53f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "elefren" -version = "0.24.0" +version = "1.0.0" authors = ["Aaron Power ", "Paul Woolcock ", "D. Scott Boggs "] description = "A wrapper around the Mastodon API." readme = "README.md" diff --git a/src/helpers/read_response.rs b/src/helpers/read_response.rs new file mode 100644 index 0000000..66e6be8 --- /dev/null +++ b/src/helpers/read_response.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use crate::errors::Result; +use futures::pin_mut; +use futures_util::StreamExt; +use log::{as_serde, debug, trace, warn}; +use reqwest::Response; +use serde::{Deserialize, Serialize}; +use tokio::time::timeout; + +/// Adapter for reading JSON data from a response with better logging and a +/// fail-safe timeout. +pub async fn read_response(response: Response) -> Result +where + T: for<'de> Deserialize<'de> + Serialize, +{ + let mut bytes = vec![]; + let url = response.url().clone(); + // let status = log_serde!(response Status); + // let headers = log_serde!(response Headers); + let stream = response.bytes_stream(); + pin_mut!(stream); + loop { + if let Ok(data) = timeout(Duration::from_secs(10), stream.next()).await { + // as of here, we did not time out + let Some(data) = data else { break; }; + // as of here, we have not hit the end of the stream yet + let data = data?; + // as of here, we did not hit an error while reading the body + bytes.extend_from_slice(&data); + debug!( + data = String::from_utf8_lossy(&data), url = url.as_str(), + bytes_received_so_far = bytes.len(); + "data chunk received" + ); + } else { + warn!( + url = url.as_str(), // status = status, headers = headers, + data_received = bytes.len(); + "API response timed out" + ); + break; + } + } + trace!( + url = url.as_str(), // status = status, headers = headers, + data_received = bytes.len(); + "parsing response" + ); + let result = serde_json::from_slice(bytes.as_slice())?; + debug!( + url = url.as_str(), // status = status, headers = headers, + result = as_serde!(result); + "result parsed successfully" + ); + Ok(result) +} diff --git a/src/macros.rs b/src/macros.rs index 7f11407..cf0e26a 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -242,7 +242,7 @@ macro_rules! route { match response.error_for_status() { Ok(response) => { - let response = response.json().await?; + let response = read_response(response).await?; debug!(response = as_serde!(response), url = url, method = stringify!($method), call_id = as_debug!(call_id); "received API response"); Ok(response) } @@ -334,7 +334,7 @@ macro_rules! route { match response.error_for_status() { Ok(response) => { - let response = response.json().await?; + let response = read_response(response).await?; debug!(response = as_serde!(response), url = $url, method = stringify!($method), call_id = as_debug!(call_id); "received API response"); Ok(response) } diff --git a/src/mastodon.rs b/src/mastodon.rs index 2024a46..4b7b55a 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -10,6 +10,7 @@ use crate::{ }, errors::{Error, Result}, event_stream::event_stream, + helpers::read_response::read_response, AddFilterRequest, AddPushRequest, Data, @@ -164,7 +165,7 @@ impl Mastodon { return Err(Error::Server(status.clone())); } - Ok(response.json().await?) + Ok(read_response(response).await?) } /// Update the user credentials @@ -181,7 +182,7 @@ impl Mastodon { return Err(Error::Server(status.clone())); } - Ok(response.json().await?) + Ok(read_response(response).await?) } /// Post a new status to the account. @@ -310,7 +311,7 @@ impl Mastodon { match response.error_for_status() { Ok(response) => { let status = response.status(); - let response = response.json().await?; + let response = read_response(response).await?; debug!(status = as_debug!(status), response = as_serde!(response); "received API response"); Ok(response) }, @@ -337,7 +338,7 @@ impl Mastodon { match response.error_for_status() { Ok(response) => { let status = response.status(); - let response = response.json().await?; + let response = read_response(response).await?; debug!(status = as_debug!(status), response = as_serde!(response); "received API response"); Ok(response) }, @@ -392,13 +393,13 @@ impl Mastodon { .append_pair("access_token", &self.data.token) .append_pair("stream", "user"); debug!( - url = as_debug!(url), call_id = as_debug!(call_id); + url = url.as_str(), call_id = as_debug!(call_id); "making user streaming API request" ); let response = reqwest::get(url.as_str()).await?; let mut url: Url = response.url().as_str().parse()?; info!( - url = as_debug!(url), call_id = as_debug!(call_id), + url = url.as_str(), call_id = as_debug!(call_id), status = response.status().as_str(); "received url from streaming API request" ); diff --git a/src/page.rs b/src/page.rs index 056f7f4..76f3fc0 100644 --- a/src/page.rs +++ b/src/page.rs @@ -1,5 +1,5 @@ use super::{Mastodon, Result}; -use crate::{entities::itemsiter::ItemsIter, format_err}; +use crate::{entities::itemsiter::ItemsIter, format_err, helpers::read_response::read_response}; use futures::Stream; use hyper_old_types::header::{parsing, Link, RelationType}; use log::{as_debug, as_serde, debug, error, trace}; @@ -21,7 +21,7 @@ macro_rules! pages { }; debug!( - url = as_debug!(url), method = "get", + url = url.as_str(), method = "get", call_id = as_debug!(self.call_id), direction = stringify!($direction); "making API request" @@ -31,7 +31,7 @@ macro_rules! pages { match response.error_for_status() { Ok(response) => { let (prev, next) = get_links(&response, self.call_id)?; - let response = response.json().await?; + let response = read_response(response).await?; debug!( url = url, method = "get", next = as_debug!(next), prev = as_debug!(prev), call_id = as_debug!(self.call_id), @@ -108,7 +108,7 @@ impl<'a, T: for<'de> Deserialize<'de> + Serialize> Page { /// Create a new Page. pub(crate) async fn new(mastodon: Mastodon, response: Response, call_id: Uuid) -> Result { let (prev, next) = get_links(&response, call_id)?; - let initial_items = response.json().await?; + let initial_items = read_response(response).await?; debug!( initial_items = as_serde!(initial_items), prev = as_debug!(prev), next = as_debug!(next), call_id = as_debug!(call_id); @@ -171,7 +171,7 @@ fn get_links(response: &Response, call_id: Uuid) -> Result<(Option, Option< if relations.contains(&RelationType::Next) { // next = Some(Url::parse(value.link())?); next = if let Ok(url) = Url::parse(value.link()) { - trace!(next = as_debug!(url), call_id = as_debug!(call_id); "parsed link header"); + trace!(next = url.as_str(), call_id = as_debug!(call_id); "parsed link header"); Some(url) } else { // HACK: url::ParseError::into isn't working for some reason. @@ -181,7 +181,7 @@ fn get_links(response: &Response, call_id: Uuid) -> Result<(Option, Option< if relations.contains(&RelationType::Prev) { prev = if let Ok(url) = Url::parse(value.link()) { - trace!(prev = as_debug!(url), call_id = as_debug!(call_id); "parsed link header"); + trace!(prev = url.as_str(), call_id = as_debug!(call_id); "parsed link header"); Some(url) } else { // HACK: url::ParseError::into isn't working for some reason. diff --git a/src/registration.rs b/src/registration.rs index 060e186..66dff17 100644 --- a/src/registration.rs +++ b/src/registration.rs @@ -7,6 +7,7 @@ use uuid::Uuid; use crate::{ apps::{App, AppBuilder}, + helpers::read_response::read_response, log_serde, scopes::Scopes, Data, @@ -191,7 +192,7 @@ impl<'a> Registration<'a> { match response.error_for_status() { Ok(response) => { - let response = response.json().await?; + let response = read_response(response).await?; debug!( response = as_serde!(response), app = as_serde!(app), url = url, method = stringify!($method), @@ -351,7 +352,7 @@ impl Registered { headers = log_serde!(response Headers); "received API response" ); - let token: AccessToken = response.json().await?; + let token: AccessToken = read_response(response).await?; debug!(url = url, body = as_serde!(token); "parsed response body"); let data = self.registered(token.access_token); trace!(auth_data = as_serde!(data); "registered");