diff --git a/Cargo.lock b/Cargo.lock index 2cd42f7..fdc1d4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,12 +267,6 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.21.4" @@ -1286,21 +1280,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "jwt" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f" -dependencies = [ - "base64 0.13.1", - "crypto-common", - "digest", - "hmac", - "serde", - "serde_json", - "sha2", -] - [[package]] name = "keccak" version = "0.1.4" @@ -2334,12 +2313,10 @@ dependencies = [ "env_logger", "futures", "http", - "jwt", "libipld-core", "lingua", "log", "once_cell", - "reqwest", "rs-car", "scooby", "serde", @@ -2786,7 +2763,7 @@ version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ - "base64 0.21.4", + "base64", "bytes", "encoding_rs", "futures-core", @@ -3261,7 +3238,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" dependencies = [ "atoi", - "base64 0.21.4", + "base64", "bitflags 2.4.0", "byteorder", "bytes", @@ -3304,7 +3281,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" dependencies = [ "atoi", - "base64 0.21.4", + "base64", "bitflags 2.4.0", "byteorder", "chrono", diff --git a/Cargo.toml b/Cargo.toml index afa5f4b..d848691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,12 +19,10 @@ dotenv = "0.15.0" env_logger = "0.10.0" futures = "0.3.29" http = "0.2.9" -jwt = "0.16.0" libipld-core = { version = "0.16.0", features = ["serde-codec"] } lingua = "1.5.0" log = "0.4.20" once_cell = "1.18.0" -reqwest = "0.11.22" rs-car = "0.4.1" scooby = "0.5.0" serde = "1.0.190" diff --git a/src/services/bluesky.rs b/src/services/bluesky.rs index 76d08f3..ee1882f 100644 --- a/src/services/bluesky.rs +++ b/src/services/bluesky.rs @@ -4,5 +4,5 @@ mod internals; mod streaming; pub use client::Bluesky; -pub use entities::{FollowRecord, LikeRecord, PostRecord, Session}; +pub use entities::{FollowRecord, LikeRecord, PostRecord}; pub use streaming::{CommitDetails, CommitProcessor, Operation}; diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 66f3539..4ea151a 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -1,24 +1,21 @@ use std::matches; -use std::sync::Arc; -use std::sync::Mutex; use anyhow::{anyhow, Result}; use atrium_api::blob::BlobRef; -use atrium_api::client::AtpServiceClient; use atrium_api::records::Record; +use atrium_api::agent::{AtpAgent, Session}; +use atrium_xrpc::client::reqwest::ReqwestClient; use axum::http::StatusCode; use chrono::Utc; use futures::StreamExt; use log::error; use tokio_tungstenite::{connect_async, tungstenite}; -use super::entities::{ProfileDetails, Session}; -use super::internals::xrpc::AuthenticateableXrpcClient; +use super::entities::{ProfileDetails}; use super::streaming::{handle_message, CommitProcessor}; pub struct Bluesky { - client: AtpServiceClient, - session: Option>>, + agent: AtpAgent, } impl Bluesky { @@ -27,56 +24,25 @@ impl Bluesky { pub fn unauthenticated() -> Self { Self { - client: AtpServiceClient::new(AuthenticateableXrpcClient::new( - Self::XRPC_HOST.to_owned(), - )), - session: None, + agent: AtpAgent::new(ReqwestClient::new(Self::XRPC_HOST.to_owned())) } } pub async fn login(handle: &str, password: &str) -> Result { - use atrium_api::com::atproto::server::create_session::Input; + let agent = AtpAgent::new(ReqwestClient::new(Self::XRPC_HOST.to_owned())); + agent.login(handle, password).await?; - let client = - AtpServiceClient::new(AuthenticateableXrpcClient::new(Self::XRPC_HOST.to_owned())); - - let result = client - .service - .com - .atproto - .server - .create_session(Input { - identifier: handle.to_owned(), - password: password.to_owned(), - }) - .await?; - - let session = Arc::new(Mutex::new(result.try_into()?)); - - let authenticated_client = AtpServiceClient::new(AuthenticateableXrpcClient::with_session( - Self::XRPC_HOST.to_owned(), - session.clone(), - )); - - Ok(Self { - client: authenticated_client, - session: Some(session), - }) + Ok(Self { agent }) } pub fn session(&self) -> Option { - self.session - .as_ref() - .and_then(|s| s.lock().ok()) - .map(|s| s.clone()) + self.agent.get_session() } pub async fn upload_blob(&self, blob: Vec) -> Result { - self.ensure_token_valid().await?; - let result = self - .client - .service + .agent + .api .com .atproto .repo @@ -97,10 +63,8 @@ impl Bluesky { ) -> Result<()> { use atrium_api::com::atproto::repo::put_record::Input; - self.ensure_token_valid().await?; - - self.client - .service + self.agent + .api .com .atproto .repo @@ -130,8 +94,8 @@ impl Bluesky { pub async fn fetch_profile_details(&self, did: &str) -> Result> { let result = self - .client - .service + .agent + .api .com .atproto .repo @@ -159,8 +123,8 @@ impl Bluesky { use atrium_api::com::atproto::identity::resolve_handle::Parameters; let result = self - .client - .service + .agent + .api .com .atproto .identity @@ -203,40 +167,6 @@ impl Bluesky { Ok(()) } - - async fn ensure_token_valid(&self) -> Result<()> { - let access_jwt_exp = self - .session - .as_ref() - .ok_or_else(|| anyhow!("Not authenticated"))? - .lock() - .map_err(|e| anyhow!("session mutex is poisoned: {e}"))? - .access_jwt_exp; - - let jwt_expired = Utc::now() > access_jwt_exp; - - if jwt_expired { - let refreshed = self - .client - .service - .com - .atproto - .server - .refresh_session() - .await?; - - let mut session = self - .session - .as_ref() - .ok_or_else(|| anyhow!("Not authenticated"))? - .lock() - .map_err(|e| anyhow!("session mutex is poisoned: {e}"))?; - - *session = refreshed.try_into()?; - } - - Ok(()) - } } fn is_missing_record_error(error: &atrium_xrpc::error::Error) -> bool { diff --git a/src/services/bluesky/entities.rs b/src/services/bluesky/entities.rs index 2a80c54..a7c3855 100644 --- a/src/services/bluesky/entities.rs +++ b/src/services/bluesky/entities.rs @@ -2,10 +2,8 @@ mod follow; mod like; mod post; mod profile; -mod session; pub use follow::FollowRecord; pub use like::LikeRecord; pub use post::PostRecord; pub use profile::ProfileDetails; -pub use session::Session; diff --git a/src/services/bluesky/entities/session.rs b/src/services/bluesky/entities/session.rs deleted file mode 100644 index 1d3aa7d..0000000 --- a/src/services/bluesky/entities/session.rs +++ /dev/null @@ -1,57 +0,0 @@ -use anyhow::{anyhow, Result}; -use atrium_api::com::atproto::server::create_session::Output as CreateSessionOutput; -use atrium_api::com::atproto::server::refresh_session::Output as RefreshSessionOutput; -use chrono::{DateTime, TimeZone, Utc}; -use jwt::{Header, Token}; -use serde::Deserialize; - -#[derive(Clone, Debug)] -pub struct Session { - pub access_jwt: String, - pub access_jwt_exp: DateTime, - pub refresh_jwt: String, - pub did: String, -} - -#[derive(Deserialize)] -struct AtprotoClaims { - exp: i64, -} - -pub fn get_token_expiration(jwt_string: &str) -> Result> { - let token: Token = Token::parse_unverified(jwt_string)?; - let expiration_time = Utc - .timestamp_millis_opt(token.claims().exp) - .earliest() - .ok_or_else(|| anyhow!("couldn't interpret expiration timestamp"))?; - - Ok(expiration_time) -} - -impl TryInto for CreateSessionOutput { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let access_jwt_exp = get_token_expiration(&self.access_jwt)?; - Ok(Session { - access_jwt: self.access_jwt, - access_jwt_exp, - refresh_jwt: self.refresh_jwt, - did: self.did, - }) - } -} - -impl TryInto for RefreshSessionOutput { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - let access_jwt_exp = get_token_expiration(&self.access_jwt)?; - Ok(Session { - access_jwt: self.access_jwt, - access_jwt_exp, - refresh_jwt: self.refresh_jwt, - did: self.did, - }) - } -} diff --git a/src/services/bluesky/internals.rs b/src/services/bluesky/internals.rs index 9b3ebb0..0e6d4c2 100644 --- a/src/services/bluesky/internals.rs +++ b/src/services/bluesky/internals.rs @@ -1,3 +1,2 @@ pub mod cbor; pub mod ipld; -pub mod xrpc; diff --git a/src/services/bluesky/internals/xrpc.rs b/src/services/bluesky/internals/xrpc.rs deleted file mode 100644 index efc0fed..0000000 --- a/src/services/bluesky/internals/xrpc.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::error::Error; -use std::sync::{Arc, Mutex}; - -use async_trait::async_trait; -use atrium_xrpc::{client::reqwest::ReqwestClient, HttpClient, XrpcClient}; -use http::{Request, Response}; - -use crate::services::bluesky::entities::Session; - -pub struct AuthenticateableXrpcClient { - inner: ReqwestClient, - session: Option>>, -} - -impl AuthenticateableXrpcClient { - pub fn new(host: String) -> Self { - Self { - inner: ReqwestClient::new(host), - session: None, - } - } - - pub fn with_session(host: String, session: Arc>) -> Self { - Self { - inner: ReqwestClient::new(host), - session: Some(session), - } - } -} - -#[async_trait] -impl HttpClient for AuthenticateableXrpcClient { - async fn send_http( - &self, - req: Request>, - ) -> Result>, Box> { - self.inner.send_http(req).await - } -} - -impl XrpcClient for AuthenticateableXrpcClient { - fn auth(&self, is_refresh: bool) -> Option { - self.session - .as_ref() - .and_then(|session| session.lock().ok()) - .map(|session| { - if is_refresh { - session.refresh_jwt.clone() - } else { - session.access_jwt.clone() - } - }) - } - - fn host(&self) -> &str { - self.inner.host() - } -}