Add read_response helper

This fixes a bug where a chunked response would be partially read and then hang forever waiting for another chunk, and adds additional debug logging to the request process.
This commit is contained in:
D. Scott Boggs 2022-12-07 16:20:20 -05:00
parent c9fc25a0c9
commit 648de8c8e5
6 changed files with 76 additions and 17 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "elefren"
version = "0.24.0"
version = "1.0.0"
authors = ["Aaron Power <theaaronepower@gmail.com>", "Paul Woolcock <paul@woolcock.us>", "D. Scott Boggs <scott@tams.tech>"]
description = "A wrapper around the Mastodon API."
readme = "README.md"

View File

@ -0,0 +1,57 @@
use std::time::Duration;
use crate::errors::Result;
use futures::pin_mut;
use futures_util::StreamExt;
use log::{as_serde, debug, trace, warn};
use reqwest::Response;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
/// Adapter for reading JSON data from a response with better logging and a
/// fail-safe timeout.
pub async fn read_response<T>(response: Response) -> Result<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
let mut bytes = vec![];
let url = response.url().clone();
// let status = log_serde!(response Status);
// let headers = log_serde!(response Headers);
let stream = response.bytes_stream();
pin_mut!(stream);
loop {
if let Ok(data) = timeout(Duration::from_secs(10), stream.next()).await {
// as of here, we did not time out
let Some(data) = data else { break; };
// as of here, we have not hit the end of the stream yet
let data = data?;
// as of here, we did not hit an error while reading the body
bytes.extend_from_slice(&data);
debug!(
data = String::from_utf8_lossy(&data), url = url.as_str(),
bytes_received_so_far = bytes.len();
"data chunk received"
);
} else {
warn!(
url = url.as_str(), // status = status, headers = headers,
data_received = bytes.len();
"API response timed out"
);
break;
}
}
trace!(
url = url.as_str(), // status = status, headers = headers,
data_received = bytes.len();
"parsing response"
);
let result = serde_json::from_slice(bytes.as_slice())?;
debug!(
url = url.as_str(), // status = status, headers = headers,
result = as_serde!(result);
"result parsed successfully"
);
Ok(result)
}

View File

@ -242,7 +242,7 @@ macro_rules! route {
match response.error_for_status() {
Ok(response) => {
let response = response.json().await?;
let response = read_response(response).await?;
debug!(response = as_serde!(response), url = url, method = stringify!($method), call_id = as_debug!(call_id); "received API response");
Ok(response)
}
@ -334,7 +334,7 @@ macro_rules! route {
match response.error_for_status() {
Ok(response) => {
let response = response.json().await?;
let response = read_response(response).await?;
debug!(response = as_serde!(response), url = $url, method = stringify!($method), call_id = as_debug!(call_id); "received API response");
Ok(response)
}

View File

@ -10,6 +10,7 @@ use crate::{
},
errors::{Error, Result},
event_stream::event_stream,
helpers::read_response::read_response,
AddFilterRequest,
AddPushRequest,
Data,
@ -164,7 +165,7 @@ impl Mastodon {
return Err(Error::Server(status.clone()));
}
Ok(response.json().await?)
Ok(read_response(response).await?)
}
/// Update the user credentials
@ -181,7 +182,7 @@ impl Mastodon {
return Err(Error::Server(status.clone()));
}
Ok(response.json().await?)
Ok(read_response(response).await?)
}
/// Post a new status to the account.
@ -310,7 +311,7 @@ impl Mastodon {
match response.error_for_status() {
Ok(response) => {
let status = response.status();
let response = response.json().await?;
let response = read_response(response).await?;
debug!(status = as_debug!(status), response = as_serde!(response); "received API response");
Ok(response)
},
@ -337,7 +338,7 @@ impl Mastodon {
match response.error_for_status() {
Ok(response) => {
let status = response.status();
let response = response.json().await?;
let response = read_response(response).await?;
debug!(status = as_debug!(status), response = as_serde!(response); "received API response");
Ok(response)
},
@ -392,13 +393,13 @@ impl Mastodon {
.append_pair("access_token", &self.data.token)
.append_pair("stream", "user");
debug!(
url = as_debug!(url), call_id = as_debug!(call_id);
url = url.as_str(), call_id = as_debug!(call_id);
"making user streaming API request"
);
let response = reqwest::get(url.as_str()).await?;
let mut url: Url = response.url().as_str().parse()?;
info!(
url = as_debug!(url), call_id = as_debug!(call_id),
url = url.as_str(), call_id = as_debug!(call_id),
status = response.status().as_str();
"received url from streaming API request"
);

View File

@ -1,5 +1,5 @@
use super::{Mastodon, Result};
use crate::{entities::itemsiter::ItemsIter, format_err};
use crate::{entities::itemsiter::ItemsIter, format_err, helpers::read_response::read_response};
use futures::Stream;
use hyper_old_types::header::{parsing, Link, RelationType};
use log::{as_debug, as_serde, debug, error, trace};
@ -21,7 +21,7 @@ macro_rules! pages {
};
debug!(
url = as_debug!(url), method = "get",
url = url.as_str(), method = "get",
call_id = as_debug!(self.call_id),
direction = stringify!($direction);
"making API request"
@ -31,7 +31,7 @@ macro_rules! pages {
match response.error_for_status() {
Ok(response) => {
let (prev, next) = get_links(&response, self.call_id)?;
let response = response.json().await?;
let response = read_response(response).await?;
debug!(
url = url, method = "get", next = as_debug!(next),
prev = as_debug!(prev), call_id = as_debug!(self.call_id),
@ -108,7 +108,7 @@ impl<'a, T: for<'de> Deserialize<'de> + Serialize> Page<T> {
/// Create a new Page.
pub(crate) async fn new(mastodon: Mastodon, response: Response, call_id: Uuid) -> Result<Self> {
let (prev, next) = get_links(&response, call_id)?;
let initial_items = response.json().await?;
let initial_items = read_response(response).await?;
debug!(
initial_items = as_serde!(initial_items), prev = as_debug!(prev),
next = as_debug!(next), call_id = as_debug!(call_id);
@ -171,7 +171,7 @@ fn get_links(response: &Response, call_id: Uuid) -> Result<(Option<Url>, Option<
if relations.contains(&RelationType::Next) {
// next = Some(Url::parse(value.link())?);
next = if let Ok(url) = Url::parse(value.link()) {
trace!(next = as_debug!(url), call_id = as_debug!(call_id); "parsed link header");
trace!(next = url.as_str(), call_id = as_debug!(call_id); "parsed link header");
Some(url)
} else {
// HACK: url::ParseError::into isn't working for some reason.
@ -181,7 +181,7 @@ fn get_links(response: &Response, call_id: Uuid) -> Result<(Option<Url>, Option<
if relations.contains(&RelationType::Prev) {
prev = if let Ok(url) = Url::parse(value.link()) {
trace!(prev = as_debug!(url), call_id = as_debug!(call_id); "parsed link header");
trace!(prev = url.as_str(), call_id = as_debug!(call_id); "parsed link header");
Some(url)
} else {
// HACK: url::ParseError::into isn't working for some reason.

View File

@ -7,6 +7,7 @@ use uuid::Uuid;
use crate::{
apps::{App, AppBuilder},
helpers::read_response::read_response,
log_serde,
scopes::Scopes,
Data,
@ -191,7 +192,7 @@ impl<'a> Registration<'a> {
match response.error_for_status() {
Ok(response) => {
let response = response.json().await?;
let response = read_response(response).await?;
debug!(
response = as_serde!(response), app = as_serde!(app),
url = url, method = stringify!($method),
@ -351,7 +352,7 @@ impl Registered {
headers = log_serde!(response Headers);
"received API response"
);
let token: AccessToken = response.json().await?;
let token: AccessToken = read_response(response).await?;
debug!(url = url, body = as_serde!(token); "parsed response body");
let data = self.registered(token.access_token);
trace!(auth_data = as_serde!(data); "registered");