Add macro for streaming routes
This commit is contained in:
		
							parent
							
								
									610d51c593
								
							
						
					
					
						commit
						c5141972e4
					
				| 
						 | 
					@ -438,3 +438,47 @@ macro_rules! paged_routes_with_id {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    () => {}
 | 
					    () => {}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					macro_rules! streaming {
 | 
				
			||||||
 | 
					    {$($stream:ident@$fn_name:ident ($desc:tt),)*} => {
 | 
				
			||||||
 | 
					        $(
 | 
				
			||||||
 | 
					            doc_comment! {
 | 
				
			||||||
 | 
					                concat!(
 | 
				
			||||||
 | 
					                    $desc,
 | 
				
			||||||
 | 
					                    "\n\nExample:\n\n",
 | 
				
			||||||
 | 
					                    "
 | 
				
			||||||
 | 
					use elefren::prelude::*;
 | 
				
			||||||
 | 
					use elefren::entities::event::Event;
 | 
				
			||||||
 | 
					use futures_util::{pin_mut, StreamExt, TryStreamExt};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					tokio_test::block_on(async {
 | 
				
			||||||
 | 
					    let data = Data::default();
 | 
				
			||||||
 | 
					    let client = Mastodon::from(data);
 | 
				
			||||||
 | 
					    let stream = client.",
 | 
				
			||||||
 | 
					                    stringify!($fn_name),
 | 
				
			||||||
 | 
					                    "().await.unwrap();
 | 
				
			||||||
 | 
					    stream.try_for_each(|event| async move {
 | 
				
			||||||
 | 
					        match event {
 | 
				
			||||||
 | 
					            Event::Update(ref status) => { /* .. */ },
 | 
				
			||||||
 | 
					            Event::Notification(ref notification) => { /* .. */ },
 | 
				
			||||||
 | 
					            Event::Delete(ref id) => { /* .. */ },
 | 
				
			||||||
 | 
					            Event::FiltersChanged => { /* .. */ },
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }).await.unwrap();
 | 
				
			||||||
 | 
					});"
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                pub async fn $fn_name(&self) -> Result<impl TryStream<Ok=Event, Error=Error>> {
 | 
				
			||||||
 | 
					                    let url = self.route(concat!("/api/v1/streaming/", stringify!($stream)));
 | 
				
			||||||
 | 
					                    let response = self.authenticated(self.client.get(&url)).send().await?;
 | 
				
			||||||
 | 
					                    debug!(
 | 
				
			||||||
 | 
					                        status = log_serde!(response Status), url = &url,
 | 
				
			||||||
 | 
					                        headers = log_serde!(response Headers);
 | 
				
			||||||
 | 
					                        "received API response"
 | 
				
			||||||
 | 
					                    );
 | 
				
			||||||
 | 
					                    Ok(event_stream(response.error_for_status()?, url))
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        )*
 | 
				
			||||||
 | 
					    };
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -21,8 +21,8 @@ use crate::{
 | 
				
			||||||
    UpdateCredsRequest,
 | 
					    UpdateCredsRequest,
 | 
				
			||||||
    UpdatePushRequest,
 | 
					    UpdatePushRequest,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use futures::{stream::try_unfold, TryStream};
 | 
					use futures::TryStream;
 | 
				
			||||||
use log::{as_debug, as_serde, debug, error, info, trace};
 | 
					use log::{as_debug, as_serde, debug, error, trace};
 | 
				
			||||||
use reqwest::{Client, RequestBuilder};
 | 
					use reqwest::{Client, RequestBuilder};
 | 
				
			||||||
use url::Url;
 | 
					use url::Url;
 | 
				
			||||||
use uuid::Uuid;
 | 
					use uuid::Uuid;
 | 
				
			||||||
| 
						 | 
					@ -129,6 +129,10 @@ impl Mastodon {
 | 
				
			||||||
        (post) unendorse_user: "accounts/{}/unpin" => Relationship,
 | 
					        (post) unendorse_user: "accounts/{}/unpin" => Relationship,
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    streaming! {
 | 
				
			||||||
 | 
					        user@stream_user ("returns events that are relevant to the authorized user, i.e. home timeline & notifications"),
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// Create a new Mastodon Client
 | 
					    /// Create a new Mastodon Client
 | 
				
			||||||
    pub fn new(client: Client, data: Data) -> Self {
 | 
					    pub fn new(client: Client, data: Data) -> Self {
 | 
				
			||||||
        Mastodon(Arc::new(MastodonClient {
 | 
					        Mastodon(Arc::new(MastodonClient {
 | 
				
			||||||
| 
						 | 
					@ -368,68 +372,6 @@ impl Mastodon {
 | 
				
			||||||
        self.following(&me.id).await
 | 
					        self.following(&me.id).await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// returns events that are relevant to the authorized user, i.e. home
 | 
					 | 
				
			||||||
    /// timeline & notifications
 | 
					 | 
				
			||||||
    ///
 | 
					 | 
				
			||||||
    /// // Example
 | 
					 | 
				
			||||||
    ///
 | 
					 | 
				
			||||||
    /// ```no_run
 | 
					 | 
				
			||||||
    /// use elefren::prelude::*;
 | 
					 | 
				
			||||||
    /// use elefren::entities::event::Event;
 | 
					 | 
				
			||||||
    /// use futures_util::{pin_mut, StreamExt, TryStreamExt};
 | 
					 | 
				
			||||||
    ///
 | 
					 | 
				
			||||||
    /// tokio_test::block_on(async {
 | 
					 | 
				
			||||||
    ///     let data = Data::default();
 | 
					 | 
				
			||||||
    ///     let client = Mastodon::from(data);
 | 
					 | 
				
			||||||
    ///     let stream = client.streaming_user().await.unwrap();
 | 
					 | 
				
			||||||
    ///     stream.try_for_each(|event| async move {
 | 
					 | 
				
			||||||
    ///         match event {
 | 
					 | 
				
			||||||
    ///             Event::Update(ref status) => { /* .. */ },
 | 
					 | 
				
			||||||
    ///             Event::Notification(ref notification) => { /* .. */ },
 | 
					 | 
				
			||||||
    ///             Event::Delete(ref id) => { /* .. */ },
 | 
					 | 
				
			||||||
    ///             Event::FiltersChanged => { /* .. */ },
 | 
					 | 
				
			||||||
    ///         }
 | 
					 | 
				
			||||||
    ///         Ok(())
 | 
					 | 
				
			||||||
    ///     }).await.unwrap();
 | 
					 | 
				
			||||||
    /// });
 | 
					 | 
				
			||||||
    /// ```
 | 
					 | 
				
			||||||
    pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error = Error>> {
 | 
					 | 
				
			||||||
        let url = self.route("/api/v1/streaming/user");
 | 
					 | 
				
			||||||
        let response = self.authenticated(self.client.get(&url)).send().await?;
 | 
					 | 
				
			||||||
        debug!(
 | 
					 | 
				
			||||||
            status = log_serde!(response Status), url = &url,
 | 
					 | 
				
			||||||
            headers = log_serde!(response Headers);
 | 
					 | 
				
			||||||
            "received API response"
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        Ok(event_stream(response.error_for_status()?, url))
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    // pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error
 | 
					 | 
				
			||||||
    // = Error>> {     let call_id = Uuid::new_v4();
 | 
					 | 
				
			||||||
    //     let mut url: Url = self.route("/api/v1/streaming").parse()?;
 | 
					 | 
				
			||||||
    //     url.query_pairs_mut()
 | 
					 | 
				
			||||||
    //         .append_pair("access_token", &self.data.token)
 | 
					 | 
				
			||||||
    //         .append_pair("stream", "user");
 | 
					 | 
				
			||||||
    //     debug!(
 | 
					 | 
				
			||||||
    //         url = url.as_str(), call_id = as_debug!(call_id);
 | 
					 | 
				
			||||||
    //         "making user streaming API request"
 | 
					 | 
				
			||||||
    //     );
 | 
					 | 
				
			||||||
    //     let response = reqwest::get(url.as_str()).await?;
 | 
					 | 
				
			||||||
    //     let mut url: Url = response.url().as_str().parse()?;
 | 
					 | 
				
			||||||
    //     info!(
 | 
					 | 
				
			||||||
    //         url = url.as_str(), call_id = as_debug!(call_id),
 | 
					 | 
				
			||||||
    //         status = response.status().as_str();
 | 
					 | 
				
			||||||
    //         "received url from streaming API request"
 | 
					 | 
				
			||||||
    //     );
 | 
					 | 
				
			||||||
    //     let new_scheme = match url.scheme() {
 | 
					 | 
				
			||||||
    //         "http" => "ws",
 | 
					 | 
				
			||||||
    //         "https" => "wss",
 | 
					 | 
				
			||||||
    //         x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
 | 
					 | 
				
			||||||
    //     };
 | 
					 | 
				
			||||||
    //     url.set_scheme(new_scheme)
 | 
					 | 
				
			||||||
    //         .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
 | 
					 | 
				
			||||||
    // }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    /// Set the bearer authentication token
 | 
					    /// Set the bearer authentication token
 | 
				
			||||||
    fn authenticated(&self, request: RequestBuilder) -> RequestBuilder {
 | 
					    fn authenticated(&self, request: RequestBuilder) -> RequestBuilder {
 | 
				
			||||||
        request.bearer_auth(&self.data.token)
 | 
					        request.bearer_auth(&self.data.token)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue