Simplify Bluesky api usage

`atrium-api` now includes an `AtpAgent` which takes care of creating
and refreshing sessions automatically, so we no longer need our
custom xrpc client and session management logic.

This is nice.
This commit is contained in:
Aleksei Voronov 2023-11-06 08:53:23 +01:00
parent 524598a40b
commit 35ee1b0a1f
8 changed files with 21 additions and 234 deletions

29
Cargo.lock generated
View File

@ -267,12 +267,6 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270"
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.4" version = "0.21.4"
@ -1286,21 +1280,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "jwt"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f"
dependencies = [
"base64 0.13.1",
"crypto-common",
"digest",
"hmac",
"serde",
"serde_json",
"sha2",
]
[[package]] [[package]]
name = "keccak" name = "keccak"
version = "0.1.4" version = "0.1.4"
@ -2334,12 +2313,10 @@ dependencies = [
"env_logger", "env_logger",
"futures", "futures",
"http", "http",
"jwt",
"libipld-core", "libipld-core",
"lingua", "lingua",
"log", "log",
"once_cell", "once_cell",
"reqwest",
"rs-car", "rs-car",
"scooby", "scooby",
"serde", "serde",
@ -2786,7 +2763,7 @@ version = "0.11.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b"
dependencies = [ dependencies = [
"base64 0.21.4", "base64",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
@ -3261,7 +3238,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64 0.21.4", "base64",
"bitflags 2.4.0", "bitflags 2.4.0",
"byteorder", "byteorder",
"bytes", "bytes",
@ -3304,7 +3281,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64 0.21.4", "base64",
"bitflags 2.4.0", "bitflags 2.4.0",
"byteorder", "byteorder",
"chrono", "chrono",

View File

@ -19,12 +19,10 @@ dotenv = "0.15.0"
env_logger = "0.10.0" env_logger = "0.10.0"
futures = "0.3.29" futures = "0.3.29"
http = "0.2.9" http = "0.2.9"
jwt = "0.16.0"
libipld-core = { version = "0.16.0", features = ["serde-codec"] } libipld-core = { version = "0.16.0", features = ["serde-codec"] }
lingua = "1.5.0" lingua = "1.5.0"
log = "0.4.20" log = "0.4.20"
once_cell = "1.18.0" once_cell = "1.18.0"
reqwest = "0.11.22"
rs-car = "0.4.1" rs-car = "0.4.1"
scooby = "0.5.0" scooby = "0.5.0"
serde = "1.0.190" serde = "1.0.190"

View File

@ -4,5 +4,5 @@ mod internals;
mod streaming; mod streaming;
pub use client::Bluesky; pub use client::Bluesky;
pub use entities::{FollowRecord, LikeRecord, PostRecord, Session}; pub use entities::{FollowRecord, LikeRecord, PostRecord};
pub use streaming::{CommitDetails, CommitProcessor, Operation}; pub use streaming::{CommitDetails, CommitProcessor, Operation};

View File

@ -1,24 +1,21 @@
use std::matches; use std::matches;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use atrium_api::blob::BlobRef; use atrium_api::blob::BlobRef;
use atrium_api::client::AtpServiceClient;
use atrium_api::records::Record; use atrium_api::records::Record;
use atrium_api::agent::{AtpAgent, Session};
use atrium_xrpc::client::reqwest::ReqwestClient;
use axum::http::StatusCode; use axum::http::StatusCode;
use chrono::Utc; use chrono::Utc;
use futures::StreamExt; use futures::StreamExt;
use log::error; use log::error;
use tokio_tungstenite::{connect_async, tungstenite}; use tokio_tungstenite::{connect_async, tungstenite};
use super::entities::{ProfileDetails, Session}; use super::entities::{ProfileDetails};
use super::internals::xrpc::AuthenticateableXrpcClient;
use super::streaming::{handle_message, CommitProcessor}; use super::streaming::{handle_message, CommitProcessor};
pub struct Bluesky { pub struct Bluesky {
client: AtpServiceClient<AuthenticateableXrpcClient>, agent: AtpAgent<ReqwestClient>,
session: Option<Arc<Mutex<Session>>>,
} }
impl Bluesky { impl Bluesky {
@ -27,56 +24,25 @@ impl Bluesky {
pub fn unauthenticated() -> Self { pub fn unauthenticated() -> Self {
Self { Self {
client: AtpServiceClient::new(AuthenticateableXrpcClient::new( agent: AtpAgent::new(ReqwestClient::new(Self::XRPC_HOST.to_owned()))
Self::XRPC_HOST.to_owned(),
)),
session: None,
} }
} }
pub async fn login(handle: &str, password: &str) -> Result<Self> { pub async fn login(handle: &str, password: &str) -> Result<Self> {
use atrium_api::com::atproto::server::create_session::Input; let agent = AtpAgent::new(ReqwestClient::new(Self::XRPC_HOST.to_owned()));
agent.login(handle, password).await?;
let client = Ok(Self { agent })
AtpServiceClient::new(AuthenticateableXrpcClient::new(Self::XRPC_HOST.to_owned()));
let result = client
.service
.com
.atproto
.server
.create_session(Input {
identifier: handle.to_owned(),
password: password.to_owned(),
})
.await?;
let session = Arc::new(Mutex::new(result.try_into()?));
let authenticated_client = AtpServiceClient::new(AuthenticateableXrpcClient::with_session(
Self::XRPC_HOST.to_owned(),
session.clone(),
));
Ok(Self {
client: authenticated_client,
session: Some(session),
})
} }
pub fn session(&self) -> Option<Session> { pub fn session(&self) -> Option<Session> {
self.session self.agent.get_session()
.as_ref()
.and_then(|s| s.lock().ok())
.map(|s| s.clone())
} }
pub async fn upload_blob(&self, blob: Vec<u8>) -> Result<BlobRef> { pub async fn upload_blob(&self, blob: Vec<u8>) -> Result<BlobRef> {
self.ensure_token_valid().await?;
let result = self let result = self
.client .agent
.service .api
.com .com
.atproto .atproto
.repo .repo
@ -97,10 +63,8 @@ impl Bluesky {
) -> Result<()> { ) -> Result<()> {
use atrium_api::com::atproto::repo::put_record::Input; use atrium_api::com::atproto::repo::put_record::Input;
self.ensure_token_valid().await?; self.agent
.api
self.client
.service
.com .com
.atproto .atproto
.repo .repo
@ -130,8 +94,8 @@ impl Bluesky {
pub async fn fetch_profile_details(&self, did: &str) -> Result<Option<ProfileDetails>> { pub async fn fetch_profile_details(&self, did: &str) -> Result<Option<ProfileDetails>> {
let result = self let result = self
.client .agent
.service .api
.com .com
.atproto .atproto
.repo .repo
@ -159,8 +123,8 @@ impl Bluesky {
use atrium_api::com::atproto::identity::resolve_handle::Parameters; use atrium_api::com::atproto::identity::resolve_handle::Parameters;
let result = self let result = self
.client .agent
.service .api
.com .com
.atproto .atproto
.identity .identity
@ -203,40 +167,6 @@ impl Bluesky {
Ok(()) Ok(())
} }
async fn ensure_token_valid(&self) -> Result<()> {
let access_jwt_exp = self
.session
.as_ref()
.ok_or_else(|| anyhow!("Not authenticated"))?
.lock()
.map_err(|e| anyhow!("session mutex is poisoned: {e}"))?
.access_jwt_exp;
let jwt_expired = Utc::now() > access_jwt_exp;
if jwt_expired {
let refreshed = self
.client
.service
.com
.atproto
.server
.refresh_session()
.await?;
let mut session = self
.session
.as_ref()
.ok_or_else(|| anyhow!("Not authenticated"))?
.lock()
.map_err(|e| anyhow!("session mutex is poisoned: {e}"))?;
*session = refreshed.try_into()?;
}
Ok(())
}
} }
fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool { fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {

View File

@ -2,10 +2,8 @@ mod follow;
mod like; mod like;
mod post; mod post;
mod profile; mod profile;
mod session;
pub use follow::FollowRecord; pub use follow::FollowRecord;
pub use like::LikeRecord; pub use like::LikeRecord;
pub use post::PostRecord; pub use post::PostRecord;
pub use profile::ProfileDetails; pub use profile::ProfileDetails;
pub use session::Session;

View File

@ -1,57 +0,0 @@
use anyhow::{anyhow, Result};
use atrium_api::com::atproto::server::create_session::Output as CreateSessionOutput;
use atrium_api::com::atproto::server::refresh_session::Output as RefreshSessionOutput;
use chrono::{DateTime, TimeZone, Utc};
use jwt::{Header, Token};
use serde::Deserialize;
#[derive(Clone, Debug)]
pub struct Session {
pub access_jwt: String,
pub access_jwt_exp: DateTime<Utc>,
pub refresh_jwt: String,
pub did: String,
}
#[derive(Deserialize)]
struct AtprotoClaims {
exp: i64,
}
pub fn get_token_expiration(jwt_string: &str) -> Result<DateTime<Utc>> {
let token: Token<Header, AtprotoClaims, _> = Token::parse_unverified(jwt_string)?;
let expiration_time = Utc
.timestamp_millis_opt(token.claims().exp)
.earliest()
.ok_or_else(|| anyhow!("couldn't interpret expiration timestamp"))?;
Ok(expiration_time)
}
impl TryInto<Session> for CreateSessionOutput {
type Error = anyhow::Error;
fn try_into(self) -> Result<Session> {
let access_jwt_exp = get_token_expiration(&self.access_jwt)?;
Ok(Session {
access_jwt: self.access_jwt,
access_jwt_exp,
refresh_jwt: self.refresh_jwt,
did: self.did,
})
}
}
impl TryInto<Session> for RefreshSessionOutput {
type Error = anyhow::Error;
fn try_into(self) -> Result<Session> {
let access_jwt_exp = get_token_expiration(&self.access_jwt)?;
Ok(Session {
access_jwt: self.access_jwt,
access_jwt_exp,
refresh_jwt: self.refresh_jwt,
did: self.did,
})
}
}

View File

@ -1,3 +1,2 @@
pub mod cbor; pub mod cbor;
pub mod ipld; pub mod ipld;
pub mod xrpc;

View File

@ -1,58 +0,0 @@
use std::error::Error;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use atrium_xrpc::{client::reqwest::ReqwestClient, HttpClient, XrpcClient};
use http::{Request, Response};
use crate::services::bluesky::entities::Session;
pub struct AuthenticateableXrpcClient {
inner: ReqwestClient,
session: Option<Arc<Mutex<Session>>>,
}
impl AuthenticateableXrpcClient {
pub fn new(host: String) -> Self {
Self {
inner: ReqwestClient::new(host),
session: None,
}
}
pub fn with_session(host: String, session: Arc<Mutex<Session>>) -> Self {
Self {
inner: ReqwestClient::new(host),
session: Some(session),
}
}
}
#[async_trait]
impl HttpClient for AuthenticateableXrpcClient {
async fn send_http(
&self,
req: Request<Vec<u8>>,
) -> Result<Response<Vec<u8>>, Box<dyn Error + Send + Sync + 'static>> {
self.inner.send_http(req).await
}
}
impl XrpcClient for AuthenticateableXrpcClient {
fn auth(&self, is_refresh: bool) -> Option<String> {
self.session
.as_ref()
.and_then(|session| session.lock().ok())
.map(|session| {
if is_refresh {
session.refresh_jwt.clone()
} else {
session.access_jwt.clone()
}
})
}
fn host(&self) -> &str {
self.inner.host()
}
}