From 610d51c59332f61ce81b52d16899c1885890334e Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Sun, 18 Dec 2022 17:25:53 -0500 Subject: [PATCH 1/7] use SSE for streaming events The Mastodon API doesn't use WebSockets for sending events, it uses SSE. That is to say, it sends events as lines in a continually-streamed response. --- Cargo.toml | 3 ++ src/event_stream.rs | 80 +++++++++++++++++++-------------------------- src/mastodon.rs | 56 ++++++++++++++++++------------- 3 files changed, 71 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 138f53f..8e4eb07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,9 @@ optional = true version = "1.22.0" features = ["rt-multi-thread", "macros"] +[dependencies.tokio-util] +version = "0.7.4" +features = ["io"] [dev-dependencies] tokio-test = "0.4.2" diff --git a/src/event_stream.rs b/src/event_stream.rs index 611b194..963025d 100644 --- a/src/event_stream.rs +++ b/src/event_stream.rs @@ -1,61 +1,49 @@ +use std::io; + use crate::{ entities::{event::Event, prelude::Notification, status::Status}, errors::Result, Error, }; -use futures::{stream::try_unfold, TryStream}; +use futures::{stream::try_unfold, TryStream, TryStreamExt}; use log::{as_debug, as_serde, debug, error, info, trace}; -use tungstenite::Message; +use reqwest::Response; +use tokio::io::AsyncBufReadExt; +use tokio_util::io::StreamReader; -/// Returns a stream of events at the given url location. +/// Return a stream of events from the given response by parsing Server-Sent +/// Events as they come in. +/// +/// See https://docs.joinmastodon.org/methods/streaming/ for more info pub fn event_stream( + response: Response, location: String, -) -> Result>> { - trace!(location = location; "connecting to websocket for events"); - let (client, response) = tungstenite::connect(&location)?; - let status = response.status(); - if !status.is_success() { - error!( - status = as_debug!(status), - body = response.body().as_ref().map(|it| String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()), - location = &location; - "error connecting to websocket" - ); - return Err(Error::Api(crate::ApiError { - error: status.canonical_reason().map(String::from), - error_description: None, - })); - } - debug!(location = &location, status = as_debug!(status); "successfully connected to websocket"); - Ok(try_unfold((client, location), |mut this| async move { - let (ref mut client, ref location) = this; +) -> impl TryStream { + let stream = StreamReader::new(response.bytes_stream().map_err(|err| { + error!(err = as_debug!(err); "error reading stream"); + io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}")) + })); + let lines_iter = stream.lines(); + try_unfold((lines_iter, location), |mut this| async move { + let (ref mut lines_iter, ref location) = this; let mut lines = vec![]; - loop { - match client.read_message() { - Ok(Message::Text(line)) => { - debug!(message = line, location = &location; "received websocket message"); - let line = line.trim().to_string(); - if line.starts_with(":") || line.is_empty() { - continue; - } - lines.push(line); - if let Ok(event) = make_event(&lines) { - info!(event = as_serde!(event), location = location; "received websocket event"); - lines.clear(); - return Ok(Some((event, this))); - } else { - continue; - } - }, - Ok(Message::Ping(data)) => { - debug!(metadata = as_serde!(data); "received ping, ponging"); - client.write_message(Message::Pong(data))?; - }, - Ok(message) => return Err(message.into()), - Err(err) => return Err(err.into()), + while let Some(line) = lines_iter.next_line().await? { + debug!(message = line, location = &location; "received websocket message"); + let line = line.trim().to_string(); + if line.starts_with(":") || line.is_empty() { + continue; + } + lines.push(line); + if let Ok(event) = make_event(&lines) { + info!(event = as_serde!(event), location = location; "received websocket event"); + lines.clear(); + return Ok(Some((event, this))); + } else { + continue; } } - })) + Ok(None) + }) } fn make_event(lines: &[String]) -> Result { diff --git a/src/mastodon.rs b/src/mastodon.rs index 1b3ee0a..b7fb9c7 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -21,7 +21,7 @@ use crate::{ UpdateCredsRequest, UpdatePushRequest, }; -use futures::TryStream; +use futures::{stream::try_unfold, TryStream}; use log::{as_debug, as_serde, debug, error, info, trace}; use reqwest::{Client, RequestBuilder}; use url::Url; @@ -394,29 +394,41 @@ impl Mastodon { /// }); /// ``` pub async fn streaming_user(&self) -> Result> { - let call_id = Uuid::new_v4(); - let mut url: Url = self.route("/api/v1/streaming").parse()?; - url.query_pairs_mut() - .append_pair("access_token", &self.data.token) - .append_pair("stream", "user"); + let url = self.route("/api/v1/streaming/user"); + let response = self.authenticated(self.client.get(&url)).send().await?; debug!( - url = url.as_str(), call_id = as_debug!(call_id); - "making user streaming API request" + status = log_serde!(response Status), url = &url, + headers = log_serde!(response Headers); + "received API response" ); - let response = reqwest::get(url.as_str()).await?; - let mut url: Url = response.url().as_str().parse()?; - info!( - url = url.as_str(), call_id = as_debug!(call_id), - status = response.status().as_str(); - "received url from streaming API request" - ); - let new_scheme = match url.scheme() { - "http" => "ws", - "https" => "wss", - x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), - }; - url.set_scheme(new_scheme) - .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + Ok(event_stream(response.error_for_status()?, url)) + } + + // pub async fn streaming_user(&self) -> Result> { let call_id = Uuid::new_v4(); + // let mut url: Url = self.route("/api/v1/streaming").parse()?; + // url.query_pairs_mut() + // .append_pair("access_token", &self.data.token) + // .append_pair("stream", "user"); + // debug!( + // url = url.as_str(), call_id = as_debug!(call_id); + // "making user streaming API request" + // ); + // let response = reqwest::get(url.as_str()).await?; + // let mut url: Url = response.url().as_str().parse()?; + // info!( + // url = url.as_str(), call_id = as_debug!(call_id), + // status = response.status().as_str(); + // "received url from streaming API request" + // ); + // let new_scheme = match url.scheme() { + // "http" => "ws", + // "https" => "wss", + // x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + // }; + // url.set_scheme(new_scheme) + // .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + // } /// Set the bearer authentication token fn authenticated(&self, request: RequestBuilder) -> RequestBuilder { From c5141972e405e30ec333a7a8768a6a33f4e242a1 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Sun, 18 Dec 2022 17:30:23 -0500 Subject: [PATCH 2/7] Add macro for streaming routes --- src/macros.rs | 44 +++++++++++++++++++++++++++++++ src/mastodon.rs | 70 +++++-------------------------------------------- 2 files changed, 50 insertions(+), 64 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index ddcc555..2446143 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -438,3 +438,47 @@ macro_rules! paged_routes_with_id { () => {} } + +macro_rules! streaming { + {$($stream:ident@$fn_name:ident ($desc:tt),)*} => { + $( + doc_comment! { + concat!( + $desc, + "\n\nExample:\n\n", + " +use elefren::prelude::*; +use elefren::entities::event::Event; +use futures_util::{pin_mut, StreamExt, TryStreamExt}; + +tokio_test::block_on(async { + let data = Data::default(); + let client = Mastodon::from(data); + let stream = client.", + stringify!($fn_name), + "().await.unwrap(); + stream.try_for_each(|event| async move { + match event { + Event::Update(ref status) => { /* .. */ }, + Event::Notification(ref notification) => { /* .. */ }, + Event::Delete(ref id) => { /* .. */ }, + Event::FiltersChanged => { /* .. */ }, + } + Ok(()) + }).await.unwrap(); +});" + ), + pub async fn $fn_name(&self) -> Result> { + let url = self.route(concat!("/api/v1/streaming/", stringify!($stream))); + let response = self.authenticated(self.client.get(&url)).send().await?; + debug!( + status = log_serde!(response Status), url = &url, + headers = log_serde!(response Headers); + "received API response" + ); + Ok(event_stream(response.error_for_status()?, url)) + } + } + )* + }; +} diff --git a/src/mastodon.rs b/src/mastodon.rs index b7fb9c7..83152f1 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -21,8 +21,8 @@ use crate::{ UpdateCredsRequest, UpdatePushRequest, }; -use futures::{stream::try_unfold, TryStream}; -use log::{as_debug, as_serde, debug, error, info, trace}; +use futures::TryStream; +use log::{as_debug, as_serde, debug, error, trace}; use reqwest::{Client, RequestBuilder}; use url::Url; use uuid::Uuid; @@ -129,6 +129,10 @@ impl Mastodon { (post) unendorse_user: "accounts/{}/unpin" => Relationship, } + streaming! { + user@stream_user ("returns events that are relevant to the authorized user, i.e. home timeline & notifications"), + } + /// Create a new Mastodon Client pub fn new(client: Client, data: Data) -> Self { Mastodon(Arc::new(MastodonClient { @@ -368,68 +372,6 @@ impl Mastodon { self.following(&me.id).await } - /// returns events that are relevant to the authorized user, i.e. home - /// timeline & notifications - /// - /// // Example - /// - /// ```no_run - /// use elefren::prelude::*; - /// use elefren::entities::event::Event; - /// use futures_util::{pin_mut, StreamExt, TryStreamExt}; - /// - /// tokio_test::block_on(async { - /// let data = Data::default(); - /// let client = Mastodon::from(data); - /// let stream = client.streaming_user().await.unwrap(); - /// stream.try_for_each(|event| async move { - /// match event { - /// Event::Update(ref status) => { /* .. */ }, - /// Event::Notification(ref notification) => { /* .. */ }, - /// Event::Delete(ref id) => { /* .. */ }, - /// Event::FiltersChanged => { /* .. */ }, - /// } - /// Ok(()) - /// }).await.unwrap(); - /// }); - /// ``` - pub async fn streaming_user(&self) -> Result> { - let url = self.route("/api/v1/streaming/user"); - let response = self.authenticated(self.client.get(&url)).send().await?; - debug!( - status = log_serde!(response Status), url = &url, - headers = log_serde!(response Headers); - "received API response" - ); - Ok(event_stream(response.error_for_status()?, url)) - } - - // pub async fn streaming_user(&self) -> Result> { let call_id = Uuid::new_v4(); - // let mut url: Url = self.route("/api/v1/streaming").parse()?; - // url.query_pairs_mut() - // .append_pair("access_token", &self.data.token) - // .append_pair("stream", "user"); - // debug!( - // url = url.as_str(), call_id = as_debug!(call_id); - // "making user streaming API request" - // ); - // let response = reqwest::get(url.as_str()).await?; - // let mut url: Url = response.url().as_str().parse()?; - // info!( - // url = url.as_str(), call_id = as_debug!(call_id), - // status = response.status().as_str(); - // "received url from streaming API request" - // ); - // let new_scheme = match url.scheme() { - // "http" => "ws", - // "https" => "wss", - // x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), - // }; - // url.set_scheme(new_scheme) - // .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; - // } - /// Set the bearer authentication token fn authenticated(&self, request: RequestBuilder) -> RequestBuilder { request.bearer_auth(&self.data.token) From 334e620d3cb08963b84e3e2895479e707827356f Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Sun, 18 Dec 2022 18:00:58 -0500 Subject: [PATCH 3/7] Add methods for remaining streams --- src/macros.rs | 82 +++++++++++++++++++++++++++++++++++++------------ src/mastodon.rs | 13 +++++++- 2 files changed, 75 insertions(+), 20 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index 2446143..18f14c7 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -440,13 +440,12 @@ macro_rules! paged_routes_with_id { } macro_rules! streaming { - {$($stream:ident@$fn_name:ident ($desc:tt),)*} => { - $( - doc_comment! { - concat!( - $desc, - "\n\nExample:\n\n", - " + ($stream:literal@$fn_name:ident ($desc:tt), $($rest:tt)*) => { + doc_comment! { + concat!( + $desc, + "\n\nExample:\n\n", + " use elefren::prelude::*; use elefren::entities::event::Event; use futures_util::{pin_mut, StreamExt, TryStreamExt}; @@ -467,18 +466,63 @@ tokio_test::block_on(async { Ok(()) }).await.unwrap(); });" - ), - pub async fn $fn_name(&self) -> Result> { - let url = self.route(concat!("/api/v1/streaming/", stringify!($stream))); - let response = self.authenticated(self.client.get(&url)).send().await?; - debug!( - status = log_serde!(response Status), url = &url, - headers = log_serde!(response Headers); - "received API response" - ); - Ok(event_stream(response.error_for_status()?, url)) - } + ), + pub async fn $fn_name(&self) -> Result> { + let url = self.route(concat!("/api/v1/streaming/", stringify!($stream))); + let response = self.authenticated(self.client.get(&url)).send().await?; + debug!( + status = log_serde!(response Status), url = &url, + headers = log_serde!(response Headers); + "received API response" + ); + Ok(event_stream(response.error_for_status()?, url)) } - )* + } + streaming! { $($rest)* } }; + ($stream:literal($param:ident: $param_type:ty, like $param_doc_val:literal)@$fn_name:ident ($desc:tt), $($rest:tt)*) => { + doc_comment! { + concat!( + $desc, + "\n\nExample:\n\n", + " +use elefren::prelude::*; +use elefren::entities::event::Event; +use futures_util::{pin_mut, StreamExt, TryStreamExt}; + +tokio_test::block_on(async { + let data = Data::default(); + let client = Mastodon::from(data); + let stream = client.", + stringify!($fn_name), + "(", + $param_doc_val, + ").await.unwrap(); + stream.try_for_each(|event| async move { + match event { + Event::Update(ref status) => { /* .. */ }, + Event::Notification(ref notification) => { /* .. */ }, + Event::Delete(ref id) => { /* .. */ }, + Event::FiltersChanged => { /* .. */ }, + } + Ok(()) + }).await.unwrap(); +});" + ), + pub async fn $fn_name(&self, $param: $param_type) -> Result> { + let mut url: Url = self.route(concat!("/api/v1/streaming/", stringify!($stream))).parse()?; + url.query_pairs_mut().append_pair(stringify!($param), $param.as_ref()); + let url = url.to_string(); + let response = self.authenticated(self.client.get(url.as_str())).send().await?; + debug!( + status = log_serde!(response Status), url = as_debug!(url), + headers = log_serde!(response Headers); + "received API response" + ); + Ok(event_stream(response.error_for_status()?, url)) + } + } + streaming! { $($rest)* } + }; + () => {} } diff --git a/src/mastodon.rs b/src/mastodon.rs index 83152f1..5182281 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -130,7 +130,18 @@ impl Mastodon { } streaming! { - user@stream_user ("returns events that are relevant to the authorized user, i.e. home timeline & notifications"), + "user"@stream_user ("returns events that are relevant to the authorized user, i.e. home timeline & notifications"), + "public"@stream_public ("All public posts known to the server. Analogous to the federated timeline."), + "public:media"@stream_public_media ("All public posts known to the server, filtered for media attachments. Analogous to the federated timeline with 'only media' enabled."), + "public:local"@stream_local ("All public posts originating from this server."), + "public:local:media"@stream_local_media ("All public posts originating from this server, filtered for media attachments. Analogous to the local timeline with 'only media' enabled."), + "public:remote"@stream_remote ("All public posts originating from other servers."), + "public:remote:media"@stream_remote_media ("All public posts originating from other servers, filtered for media attachments."), + "hashtag"(tag: impl AsRef, like "#bots")@stream_hashtag ("All public posts using a certain hashtag."), + "hashtag:local"(tag: impl AsRef, like "#bots")@stream_local_hashtag ("All public posts using a certain hashtag, originating from this server."), + "user:notification"@stream_notifications ("Notifications for the current user."), + "list"(list: impl AsRef, like "12345")@stream_list ("Updates to a specific list."), + "direct"@stream_direct ("Updates to direct conversations."), } /// Create a new Mastodon Client From 0f268f7d6f7266d83dccd815f2cb1021b729f493 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Thu, 22 Dec 2022 09:33:51 -0500 Subject: [PATCH 4/7] update readme badge I just needed to make *some* change to trigger CI --- README.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 8163b4d..96d2fe7 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,11 @@ ## A Wrapper for the Mastodon API. -[![Build Status](https://travis-ci.org/pwoolcoc/elefren.svg?branch=master)](https://travis-ci.org/pwoolcoc/elefren) -[![Build Status](https://ci.appveyor.com/api/projects/status/qeigk3nmmps52wxv?svg=true)](https://ci.appveyor.com/project/pwoolcoc/elefren) -[![Coverage Status](https://coveralls.io/repos/github/pwoolcoc/elefren/badge.svg?branch=master&service=github)](https://coveralls.io/github/pwoolcoc/elefren?branch=master) -[![crates.io](https://img.shields.io/crates/v/elefren.svg)](https://crates.io/crates/elefren) -[![Docs](https://docs.rs/elefren/badge.svg)](https://docs.rs/elefren) -[![MIT/APACHE-2.0](https://img.shields.io/crates/l/elefren.svg)](https://crates.io/crates/elefren) +[![Build Status](https://github.com/dscottboggs/elefren/actions/workflows/rust.yml/badge.svg)](https://travis-ci.org/pwoolcoc/elefren) + + + + [Documentation](https://docs.rs/elefren/) From ca7f9c86a6128c2a912cb3c99e1a4b77b1f67711 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Thu, 22 Dec 2022 12:27:30 -0500 Subject: [PATCH 5/7] Removed more references to websockets --- README.md | 37 +++++++++++++++++-------------------- src/errors.rs | 9 --------- src/event_stream.rs | 8 ++++---- src/lib.rs | 2 +- 4 files changed, 22 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 96d2fe7..55c7cee 100644 --- a/README.md +++ b/README.md @@ -45,14 +45,15 @@ use elefren::prelude::*; use elefren::helpers::toml; // requires `features = ["toml"]` use elefren::helpers::cli; -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { let mastodon = if let Ok(data) = toml::from_file("mastodon-data.toml") { Mastodon::from(data) } else { register()? }; - let you = mastodon.verify_credentials()?; + let you = mastodon.verify_credentials().await?; println!("{:#?}", you); @@ -80,25 +81,21 @@ use elefren::entities::event::Event; use std::error::Error; -fn main() -> Result<(), Box> { - let data = Data { - base: "".into(), - client_id: "".into(), - client_secret: "".into(), - redirect: "".into(), - token: "".into(), - }; +#[tokio::main] +async fn main() -> Result<(), Box> { + let client = Mastodon::from(Data::default()); - let client = Mastodon::from(data); - - for event in client.streaming_user()? { - match event { - Event::Update(ref status) => { /* .. */ }, - Event::Notification(ref notification) => { /* .. */ }, - Event::Delete(ref id) => { /* .. */ }, - Event::FiltersChanged => { /* .. */ }, - } - } + client.stream_user() + .await? + .try_for_each(|event| { + match event { + Event::Update(ref status) => { /* .. */ }, + Event::Notification(ref notification) => { /* .. */ }, + Event::Delete(ref id) => { /* .. */ }, + Event::FiltersChanged => { /* .. */ }, + } + }) + .await?; Ok(()) } ``` diff --git a/src/errors.rs b/src/errors.rs index 754d274..cbe85a0 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,7 +12,6 @@ use serde_urlencoded::ser::Error as UrlEncodedError; use tomlcrate::de::Error as TomlDeError; #[cfg(feature = "toml")] use tomlcrate::ser::Error as TomlSerError; -use tungstenite::{error::Error as WebSocketError, Message as WebSocketMessage}; use url::ParseError as UrlError; /// Convience type over `std::result::Result` with `Error` as the error type. @@ -62,16 +61,12 @@ pub enum Error { Envy(EnvyError), /// Error serializing to a query string SerdeQs(SerdeQsError), - /// WebSocket error - WebSocket(WebSocketError), /// An integer conversion was attempted, but the value didn't fit into the /// target type. /// /// At the time of writing, this can only be triggered when a file is /// larger than the system's usize allows. IntConversion(TryFromIntError), - /// A stream message was received that wasn't recognized - UnrecognizedStreamMessage(WebSocketMessage), /// Other errors Other(String), } @@ -100,14 +95,12 @@ impl error::Error for Error { #[cfg(feature = "env")] Error::Envy(ref e) => e, Error::SerdeQs(ref e) => e, - Error::WebSocket(ref e) => e, Error::IntConversion(ref e) => e, Error::Client(..) | Error::Server(..) => return None, Error::ClientIdRequired => return None, Error::ClientSecretRequired => return None, Error::AccessTokenRequired => return None, Error::MissingField(_) => return None, - Error::UnrecognizedStreamMessage(_) => return None, Error::Other(..) => return None, }) } @@ -157,10 +150,8 @@ from! { HeaderParseError => HeaderParseError, #[cfg(feature = "env")] EnvyError => Envy, SerdeQsError => SerdeQs, - WebSocketError => WebSocket, String => Other, TryFromIntError => IntConversion, - WebSocketMessage => UnrecognizedStreamMessage, } #[macro_export] diff --git a/src/event_stream.rs b/src/event_stream.rs index 963025d..9f5c11d 100644 --- a/src/event_stream.rs +++ b/src/event_stream.rs @@ -14,7 +14,7 @@ use tokio_util::io::StreamReader; /// Return a stream of events from the given response by parsing Server-Sent /// Events as they come in. /// -/// See https://docs.joinmastodon.org/methods/streaming/ for more info +/// See for more info pub fn event_stream( response: Response, location: String, @@ -28,14 +28,14 @@ pub fn event_stream( let (ref mut lines_iter, ref location) = this; let mut lines = vec![]; while let Some(line) = lines_iter.next_line().await? { - debug!(message = line, location = &location; "received websocket message"); + debug!(message = line, location = &location; "received message"); let line = line.trim().to_string(); if line.starts_with(":") || line.is_empty() { continue; } lines.push(line); if let Ok(event) = make_event(&lines) { - info!(event = as_serde!(event), location = location; "received websocket event"); + info!(event = as_serde!(event), location = location; "received event"); lines.clear(); return Ok(Some((event, this))); } else { @@ -66,7 +66,7 @@ fn make_event(lines: &[String]) -> Result { data = message.payload; } let event: &str = &event; - trace!(event = event, payload = data; "websocket message parsed"); + trace!(event = event, payload = data; "SSE message parsed"); Ok(match event { "notification" => { let data = data diff --git a/src/lib.rs b/src/lib.rs index b778e96..0ada43a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ //! let data = Data::default(); //! let client = Mastodon::from(data); //! tokio_test::block_on(async { -//! let stream = client.streaming_user().await.unwrap(); +//! let stream = client.stream_user().await.unwrap(); //! stream.try_for_each(|event| async move { //! match event { //! Event::Update(ref status) => { /* .. */ }, From 94b7cd40871415c6d14562100974bdf5939736e9 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Thu, 22 Dec 2022 12:28:08 -0500 Subject: [PATCH 6/7] Rearrange macro; fix quoting issue --- src/macros.rs | 6 +++--- src/mastodon.rs | 36 ++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index 18f14c7..2a74c85 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -440,7 +440,7 @@ macro_rules! paged_routes_with_id { } macro_rules! streaming { - ($stream:literal@$fn_name:ident ($desc:tt), $($rest:tt)*) => { + ($desc:tt $fn_name:ident@$stream:literal, $($rest:tt)*) => { doc_comment! { concat!( $desc, @@ -468,7 +468,7 @@ tokio_test::block_on(async { });" ), pub async fn $fn_name(&self) -> Result> { - let url = self.route(concat!("/api/v1/streaming/", stringify!($stream))); + let url = self.route(&format!("/api/v1/streaming/{}", $stream)); let response = self.authenticated(self.client.get(&url)).send().await?; debug!( status = log_serde!(response Status), url = &url, @@ -480,7 +480,7 @@ tokio_test::block_on(async { } streaming! { $($rest)* } }; - ($stream:literal($param:ident: $param_type:ty, like $param_doc_val:literal)@$fn_name:ident ($desc:tt), $($rest:tt)*) => { + ($desc:tt $fn_name:ident($param:ident: $param_type:ty, like $param_doc_val:literal)@$stream:literal, $($rest:tt)*) => { doc_comment! { concat!( $desc, diff --git a/src/mastodon.rs b/src/mastodon.rs index 5182281..31adfa0 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -130,18 +130,30 @@ impl Mastodon { } streaming! { - "user"@stream_user ("returns events that are relevant to the authorized user, i.e. home timeline & notifications"), - "public"@stream_public ("All public posts known to the server. Analogous to the federated timeline."), - "public:media"@stream_public_media ("All public posts known to the server, filtered for media attachments. Analogous to the federated timeline with 'only media' enabled."), - "public:local"@stream_local ("All public posts originating from this server."), - "public:local:media"@stream_local_media ("All public posts originating from this server, filtered for media attachments. Analogous to the local timeline with 'only media' enabled."), - "public:remote"@stream_remote ("All public posts originating from other servers."), - "public:remote:media"@stream_remote_media ("All public posts originating from other servers, filtered for media attachments."), - "hashtag"(tag: impl AsRef, like "#bots")@stream_hashtag ("All public posts using a certain hashtag."), - "hashtag:local"(tag: impl AsRef, like "#bots")@stream_local_hashtag ("All public posts using a certain hashtag, originating from this server."), - "user:notification"@stream_notifications ("Notifications for the current user."), - "list"(list: impl AsRef, like "12345")@stream_list ("Updates to a specific list."), - "direct"@stream_direct ("Updates to direct conversations."), + "returns events that are relevant to the authorized user, i.e. home timeline & notifications" + stream_user@"user", + "All public posts known to the server. Analogous to the federated timeline." + stream_public@"public", + "All public posts known to the server, filtered for media attachments. Analogous to the federated timeline with 'only media' enabled." + stream_public_media@"public:media", + "All public posts originating from this server." + stream_local@"public:local", + "All public posts originating from this server, filtered for media attachments. Analogous to the local timeline with 'only media' enabled." + stream_local_media@"public:local:media", + "All public posts originating from other servers." + stream_remote@"public:remote", + "All public posts originating from other servers, filtered for media attachments." + stream_remote_media@"public:remote:media", + "All public posts using a certain hashtag." + stream_hashtag(tag: impl AsRef, like "#bots")@"hashtag", + "All public posts using a certain hashtag, originating from this server." + stream_local_hashtag(tag: impl AsRef, like "#bots")@"hashtag:local", + "Notifications for the current user." + stream_notifications@"user:notification", + "Updates to a specific list." + stream_list(list: impl AsRef, like "12345")@"list", + "Updates to direct conversations." + stream_direct@"direct", } /// Create a new Mastodon Client From 59dfc7085bbd43bbbaa932822c8c2a73e7e5bab9 Mon Sep 17 00:00:00 2001 From: "D. Scott Boggs" Date: Thu, 22 Dec 2022 12:28:18 -0500 Subject: [PATCH 7/7] this file isn't needed anymore --- src/mastodon_client.rs | 370 ----------------------------------------- 1 file changed, 370 deletions(-) delete mode 100644 src/mastodon_client.rs diff --git a/src/mastodon_client.rs b/src/mastodon_client.rs deleted file mode 100644 index 45e9c00..0000000 --- a/src/mastodon_client.rs +++ /dev/null @@ -1,370 +0,0 @@ -use std::borrow::Cow; - -use crate::{ - entities::prelude::*, - errors::Result, - http_send::{HttpSend, HttpSender}, - page::Page, - requests::{ - AddFilterRequest, - AddPushRequest, - StatusesRequest, - UpdateCredsRequest, - UpdatePushRequest, - }, - status_builder::NewStatus, -}; - -/// Represents the set of methods that a Mastodon Client can do, so that -/// implementations might be swapped out for testing -#[allow(unused)] -pub trait MastodonClient { - /// Type that wraps streaming API streams - type Stream: Iterator; - - /// GET /api/v1/favourites - fn favourites(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/blocks - fn blocks(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/domain_blocks - fn domain_blocks(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/follow_requests - fn follow_requests(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/timelines/home - fn get_home_timeline(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/custom_emojis - fn get_emojis(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/mutes - fn mutes(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/notifications - fn notifications(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/reports - fn reports(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/followers - fn followers(&self, id: &str) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/following - fn following(&self, id: &str) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/reblogged_by - fn reblogged_by(&self, id: &str) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/favourited_by - fn favourited_by(&self, id: &str) -> Result> { - unimplemented!("This method was not implemented"); - } - /// DELETE /api/v1/domain_blocks - fn unblock_domain(&self, domain: String) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/instance - fn instance(&self) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/verify_credentials - fn verify_credentials(&self) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/reports - fn report(&self, account_id: &str, status_ids: Vec<&str>, comment: String) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/domain_blocks - fn block_domain(&self, domain: String) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/accounts/follow_requests/authorize - fn authorize_follow_request(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/accounts/follow_requests/reject - fn reject_follow_request(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/search - fn search<'a>(&self, q: &'a str, resolve: bool) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v2/search - fn search_v2<'a>(&self, q: &'a str, resolve: bool) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/follows - fn follows(&self, uri: Cow<'static, str>) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/media - fn media(&self, file: Cow<'static, str>) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/notifications/clear - fn clear_notifications(&self) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/notifications/dismiss - fn dismiss_notification(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id - fn get_account(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/accounts/:id/follow - fn follow(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/accounts/:id/unfollow - fn unfollow(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/block - fn block(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/unblock - fn unblock(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/mute - fn mute(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/unmute - fn unmute(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/notifications/:id - fn get_notification(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id - fn get_status(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/context - fn get_context(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/card - fn get_card(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/statuses/:id/reblog - fn reblog(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/statuses/:id/unreblog - fn unreblog(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/statuses/:id/favourite - fn favourite(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/statuses/:id/unfavourite - fn unfavourite(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// DELETE /api/v1/statuses/:id - fn delete_status(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// PATCH /api/v1/accounts/update_credentials - fn update_credentials(&self, builder: &mut UpdateCredsRequest) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/statuses - fn new_status(&self, status: NewStatus) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/timelines/public - fn get_public_timeline(&self, local: bool) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/timelines/tag/:hashtag - fn get_tagged_timeline(&self, hashtag: String, local: bool) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/:id/statuses - fn statuses<'a, 'b: 'a, S>(&'b self, id: &'b str, request: S) -> Result> - where - S: Into>>, - { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/relationships - fn relationships(&self, ids: &[&str]) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/accounts/search?q=:query&limit=:limit&following=:following - fn search_accounts( - &self, - query: &str, - limit: Option, - following: bool, - ) -> Result> { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/push/subscription - fn add_push_subscription(&self, request: &AddPushRequest) -> Result { - unimplemented!("This method was not implemented"); - } - /// PUT /api/v1/push/subscription - fn update_push_data(&self, request: &UpdatePushRequest) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/push/subscription - fn get_push_subscription(&self) -> Result { - unimplemented!("This method was not implemented"); - } - /// DELETE /api/v1/push/subscription - fn delete_push_subscription(&self) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/filters - fn get_filters(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/filters - fn add_filter(&self, request: &mut AddFilterRequest) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/filters/:id - fn get_filter(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// PUT /api/v1/filters/:id - fn update_filter(&self, id: &str, request: &mut AddFilterRequest) -> Result { - unimplemented!("This method was not implemented"); - } - /// DELETE /api/v1/filters/:id - fn delete_filter(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/suggestions - fn get_follow_suggestions(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// DELETE /api/v1/suggestions/:account_id - fn delete_from_suggestions(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/endorsements - fn get_endorsements(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/accounts/:id/pin - fn endorse_user(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// POST /api/v1/accounts/:id/unpin - fn unendorse_user(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// Shortcut for: `let me = client.verify_credentials(); client.followers()` - /// - /// ```no_run - /// use elefren::prelude::*; - /// let data = Data::default(); - /// let client = Mastodon::from(data); - /// let follows_me = client.follows_me().unwrap(); - /// ``` - fn follows_me(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - /// Shortcut for - /// `let me = client.verify_credentials(); client.following(&me.id)` - /// - /// ```no_run - /// use elefren::prelude::*; - /// let data = Data::default(); - /// let client = Mastodon::from(data); - /// let follows_me = client.followed_by_me().unwrap(); - /// ``` - fn followed_by_me(&self) -> Result> { - unimplemented!("This method was not implemented"); - } - - /// Returns events that are relevant to the authorized user, i.e. home - /// timeline and notifications - fn streaming_user(&self) -> Result { - unimplemented!("This method was not implemented"); - } - - /// Returns all public statuses - fn streaming_public(&self) -> Result { - unimplemented!("This method was not implemented"); - } - - /// Returns all local statuses - fn streaming_local(&self) -> Result { - unimplemented!("This method was not implemented"); - } - - /// Returns all public statuses for a particular hashtag - fn streaming_public_hashtag(&self, hashtag: &str) -> Result { - unimplemented!("This method was not implemented"); - } - - /// Returns all local statuses for a particular hashtag - fn streaming_local_hashtag(&self, hashtag: &str) -> Result { - unimplemented!("This method was not implemented"); - } - - /// Returns statuses for a list - fn streaming_list(&self, list_id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - - /// Returns all direct messages - fn streaming_direct(&self) -> Result { - unimplemented!("This method was not implemented"); - } -} - -/// Trait that represents clients that can make unauthenticated calls to a -/// mastodon instance -#[allow(unused)] -pub trait MastodonUnauthenticated { - /// GET /api/v1/statuses/:id - fn get_status(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/context - fn get_context(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/card - fn get_card(&self, id: &str) -> Result { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/reblogged_by - fn reblogged_by(&self, id: &str) -> Result> { - unimplemented!("This method was not implemented"); - } - /// GET /api/v1/statuses/:id/favourited_by - fn favourited_by(&self, id: &str) -> Result> { - unimplemented!("This method was not implemented"); - } -}