Fix publishing feeds

This basically required implementing authentication from ground up
because atrium-api is horribly deficient when it comes to it,
providing basically no real way to manage it, and what is provided
is actually broken anyway requiring additional hacks to get around

Ah well. This has been the story of using anything in Rust that's
related to Bluesky. Everything is broken.
This commit is contained in:
Aleksei Voronov 2023-10-07 18:26:20 +02:00
parent 1e0e34b9a5
commit 1bd843a05a
11 changed files with 244 additions and 29 deletions

56
Cargo.lock generated
View File

@ -266,6 +266,12 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270"
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.4" version = "0.21.4"
@ -1279,6 +1285,21 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "jwt"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f"
dependencies = [
"base64 0.13.1",
"crypto-common",
"digest",
"hmac",
"serde",
"serde_json",
"sha2",
]
[[package]] [[package]]
name = "keccak" name = "keccak"
version = "0.1.4" version = "0.1.4"
@ -2311,10 +2332,13 @@ dependencies = [
"dotenv", "dotenv",
"env_logger", "env_logger",
"futures", "futures",
"http",
"jwt",
"libipld-core", "libipld-core",
"lingua", "lingua",
"log", "log",
"once_cell", "once_cell",
"reqwest",
"rs-car", "rs-car",
"scooby", "scooby",
"serde", "serde",
@ -2757,11 +2781,11 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.20" version = "0.11.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b"
dependencies = [ dependencies = [
"base64", "base64 0.21.4",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
@ -2782,6 +2806,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"system-configuration",
"tokio", "tokio",
"tokio-native-tls", "tokio-native-tls",
"tower-service", "tower-service",
@ -3235,7 +3260,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64", "base64 0.21.4",
"bitflags 2.4.0", "bitflags 2.4.0",
"byteorder", "byteorder",
"bytes", "bytes",
@ -3278,7 +3303,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64", "base64 0.21.4",
"bitflags 2.4.0", "bitflags 2.4.0",
"byteorder", "byteorder",
"chrono", "chrono",
@ -3422,6 +3447,27 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.8.0" version = "3.8.0"

View File

@ -18,10 +18,13 @@ clap = { version = "4.4.6", features = ["derive"] }
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.10.0" env_logger = "0.10.0"
futures = "0.3.28" futures = "0.3.28"
http = "0.2.9"
jwt = "0.16.0"
libipld-core = { version = "0.16.0", features = ["serde-codec"] } libipld-core = { version = "0.16.0", features = ["serde-codec"] }
lingua = "1.5.0" lingua = "1.5.0"
log = "0.4.20" log = "0.4.20"
once_cell = "1.18.0" once_cell = "1.18.0"
reqwest = "0.11.22"
rs-car = "0.4.1" rs-car = "0.4.1"
scooby = "0.5.0" scooby = "0.5.0"
serde = "1.0.188" serde = "1.0.188"

View File

@ -19,7 +19,7 @@ Not fully complete yet, see roadmap.
- [x] Handle missing profiles in the profile classifier - [x] Handle missing profiles in the profile classifier
- [x] Add a way to mark a profile as being from a certain country manually - [x] Add a way to mark a profile as being from a certain country manually
- [x] Handle reconnecting to websocket somehow - [x] Handle reconnecting to websocket somehow
- [ ] Publish the feed - [x] Publish the feed
## Configuration ## Configuration

View File

@ -28,7 +28,7 @@ async fn main() -> Result<()> {
let database_url = let database_url =
env::var("DATABASE_URL").context("DATABASE_URL environment variable must be set")?; env::var("DATABASE_URL").context("DATABASE_URL environment variable must be set")?;
let bluesky = Bluesky::new("https://bsky.social"); let bluesky = Bluesky::unauthenticated("https://bsky.social");
let database = Database::connect(&database_url).await?; let database = Database::connect(&database_url).await?;
for handle in &args.handle { for handle in &args.handle {

View File

@ -41,19 +41,21 @@ async fn main() -> Result<()> {
let feed_generator_did = format!("did:web:{}", env::var("FEED_GENERATOR_HOSTNAME")?); let feed_generator_did = format!("did:web:{}", env::var("FEED_GENERATOR_HOSTNAME")?);
let bluesky = Bluesky::new("https://bsky.social"); println!("Logging in");
let session = bluesky.login(&handle, &password).await?; let bluesky = Bluesky::login("https://bsky.social", &handle, &password).await?;
let mut avatar = None; let mut avatar = None;
if let Some(path) = args.avatar_filename { if let Some(path) = args.avatar_filename {
let bytes = std::fs::read(path)?; let bytes = std::fs::read(path)?;
avatar = Some(bluesky.upload_blob(bytes).await?); avatar = Some(bluesky.upload_blob(bytes).await?);
println!("Uploaded avatar");
} }
bluesky bluesky
.publish_feed( .publish_feed(
&session.did, &bluesky.session().unwrap().did,
&feed_generator_did, &feed_generator_did,
&args.name, &args.name,
&args.display_name, &args.display_name,

View File

@ -2,7 +2,7 @@ extern crate nederlandskie;
use std::env; use std::env;
use anyhow::{Context, Result}; use anyhow::{Context, Result, anyhow};
use dotenv::dotenv; use dotenv::dotenv;
use nederlandskie::services::Bluesky; use nederlandskie::services::Bluesky;
@ -11,15 +11,14 @@ use nederlandskie::services::Bluesky;
async fn main() -> Result<()> { async fn main() -> Result<()> {
dotenv()?; dotenv()?;
let bluesky = Bluesky::new("https://bsky.social");
let handle = env::var("PUBLISHER_BLUESKY_HANDLE") let handle = env::var("PUBLISHER_BLUESKY_HANDLE")
.context("PUBLISHER_BLUESKY_HANDLE environment variable must be set")?; .context("PUBLISHER_BLUESKY_HANDLE environment variable must be set")?;
let password = env::var("PUBLISHER_BLUESKY_PASSWORD") let password = env::var("PUBLISHER_BLUESKY_PASSWORD")
.context("PUBLISHER_BLUESKY_PASSWORD environment variable must be set")?; .context("PUBLISHER_BLUESKY_PASSWORD environment variable must be set")?;
let session = bluesky.login(&handle, &password).await?; let bluesky = Bluesky::login("https://bsky.social", &handle, &password).await?;
let session = bluesky.session().ok_or_else(|| anyhow!("Could not log in"))?;
println!("{}", session.did); println!("{}", session.did);

View File

@ -23,7 +23,7 @@ async fn main() -> Result<()> {
info!("Initializing service clients"); info!("Initializing service clients");
let ai = Arc::new(AI::new(&config.chat_gpt_api_key, "https://api.openai.com")); let ai = Arc::new(AI::new(&config.chat_gpt_api_key, "https://api.openai.com"));
let bluesky = Arc::new(Bluesky::new("https://bsky.social")); let bluesky = Arc::new(Bluesky::unauthenticated("https://bsky.social"));
let database = Arc::new(Database::connect(&config.database_url).await?); let database = Arc::new(Database::connect(&config.database_url).await?);
info!("Initializing language detector"); info!("Initializing language detector");

View File

@ -1,7 +1,9 @@
mod client; mod client;
mod decode; mod decode;
mod proto; mod proto;
mod session;
mod streaming; mod streaming;
mod xrpc_client;
pub use client::Bluesky; pub use client::Bluesky;
pub use streaming::{CommitDetails, CommitProcessor, Operation}; pub use streaming::{CommitDetails, CommitProcessor, Operation};

View File

@ -1,18 +1,21 @@
use std::matches; use std::matches;
use std::sync::Arc;
use std::sync::Mutex;
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use atrium_api::blob::BlobRef; use atrium_api::blob::BlobRef;
use atrium_api::client::AtpServiceClient; use atrium_api::client::AtpServiceClient;
use atrium_api::client::AtpServiceWrapper; use atrium_api::client::AtpServiceWrapper;
use atrium_api::records::Record; use atrium_api::records::Record;
use atrium_xrpc::client::reqwest::ReqwestClient;
use axum::http::StatusCode; use axum::http::StatusCode;
use chrono::Utc; use chrono::Utc;
use futures::StreamExt; use futures::StreamExt;
use log::error; use log::error;
use tokio_tungstenite::{connect_async, tungstenite}; use tokio_tungstenite::{connect_async, tungstenite};
use super::session::Session;
use super::streaming::{handle_message, CommitProcessor}; use super::streaming::{handle_message, CommitProcessor};
use super::xrpc_client::AuthenticateableXrpcClient;
#[derive(Debug)] #[derive(Debug)]
pub struct ProfileDetails { pub struct ProfileDetails {
@ -20,27 +23,25 @@ pub struct ProfileDetails {
pub description: String, pub description: String,
} }
#[derive(Debug)]
pub struct SessionDetails {
pub did: String,
}
pub struct Bluesky { pub struct Bluesky {
client: AtpServiceClient<AtpServiceWrapper<ReqwestClient>>, client: AtpServiceClient<AtpServiceWrapper<AuthenticateableXrpcClient>>,
session: Option<Arc<Mutex<Session>>>
} }
impl Bluesky { impl Bluesky {
pub fn new(host: &str) -> Self { pub fn unauthenticated(host: &str) -> Self {
Self { Self {
client: AtpServiceClient::new(ReqwestClient::new(host.to_owned())), client: AtpServiceClient::new(AuthenticateableXrpcClient::new(host.to_owned())),
session: None
} }
} }
pub async fn login(&self, handle: &str, password: &str) -> Result<SessionDetails> { pub async fn login(host: &str, handle: &str, password: &str) -> Result<Self> {
use atrium_api::com::atproto::server::create_session::Input; use atrium_api::com::atproto::server::create_session::Input;
let result = self let client = AtpServiceClient::new(AuthenticateableXrpcClient::new(host.to_owned()));
.client
let result = client
.service .service
.com .com
.atproto .atproto
@ -51,10 +52,26 @@ impl Bluesky {
}) })
.await?; .await?;
Ok(SessionDetails { did: result.did }) let session = Arc::new(Mutex::new(result.try_into()?));
let authenticated_client = AtpServiceClient::new(AuthenticateableXrpcClient::with_session(
host.to_owned(),
session.clone()
));
Ok(Self {
client: authenticated_client,
session: Some(session)
})
}
pub fn session(&self) -> Option<Session> {
self.session.as_ref().and_then(|s| s.lock().ok()).map(|s| s.clone())
} }
pub async fn upload_blob(&self, blob: Vec<u8>) -> Result<BlobRef> { pub async fn upload_blob(&self, blob: Vec<u8>) -> Result<BlobRef> {
self.ensure_token_valid().await?;
let result = self let result = self
.client .client
.service .service
@ -78,6 +95,8 @@ impl Bluesky {
) -> Result<()> { ) -> Result<()> {
use atrium_api::com::atproto::repo::put_record::Input; use atrium_api::com::atproto::repo::put_record::Input;
self.ensure_token_valid().await?;
self.client self.client
.service .service
.com .com
@ -88,7 +107,7 @@ impl Bluesky {
record: Record::AppBskyFeedGenerator(Box::new( record: Record::AppBskyFeedGenerator(Box::new(
atrium_api::app::bsky::feed::generator::Record { atrium_api::app::bsky::feed::generator::Record {
avatar, avatar,
created_at: Utc::now().to_string(), created_at: Utc::now().to_rfc3339(),
description: Some(description.to_owned()), description: Some(description.to_owned()),
description_facets: None, description_facets: None,
did: feed_generator_did.to_owned(), did: feed_generator_did.to_owned(),
@ -183,6 +202,27 @@ impl Bluesky {
Ok(()) Ok(())
} }
async fn ensure_token_valid(&self) -> Result<()> {
let access_jwt_exp =
self.session.as_ref().ok_or_else(|| anyhow!("Not authenticated"))?.lock().map_err(|e| anyhow!("session mutex is poisoned: {e}"))?.access_jwt_exp;
let jwt_expired = Utc::now() > access_jwt_exp;
if jwt_expired {
let refreshed = self.client.service.com.atproto.server.refresh_session().await?;
let mut session = self.session
.as_ref()
.ok_or_else(|| anyhow!("Not authenticated"))?
.lock()
.map_err(|e| anyhow!("session mutex is poisoned: {e}"))?;
*session = refreshed.try_into()?;
}
Ok(())
}
} }
fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool { fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {

View File

@ -0,0 +1,57 @@
use anyhow::{anyhow, Result};
use atrium_api::com::atproto::server::create_session::Output as CreateSessionOutput;
use atrium_api::com::atproto::server::refresh_session::Output as RefreshSessionOutput;
use chrono::{DateTime, TimeZone, Utc};
use jwt::{Header, Token};
use serde::Deserialize;
#[derive(Clone, Debug)]
pub struct Session {
pub access_jwt: String,
pub access_jwt_exp: DateTime<Utc>,
pub refresh_jwt: String,
pub did: String,
}
#[derive(Deserialize)]
struct AtprotoClaims {
exp: i64,
}
pub fn get_token_expiration(jwt_string: &str) -> Result<DateTime<Utc>> {
let token: Token<Header, AtprotoClaims, _> = Token::parse_unverified(jwt_string)?;
let expiration_time = Utc
.timestamp_millis_opt(token.claims().exp)
.earliest()
.ok_or_else(|| anyhow!("couldn't interpret expiration timestamp"))?;
Ok(expiration_time)
}
impl TryInto<Session> for CreateSessionOutput {
type Error = anyhow::Error;
fn try_into(self) -> Result<Session> {
let access_jwt_exp = get_token_expiration(&self.access_jwt)?;
Ok(Session {
access_jwt: self.access_jwt,
access_jwt_exp,
refresh_jwt: self.refresh_jwt,
did: self.did,
})
}
}
impl TryInto<Session> for RefreshSessionOutput {
type Error = anyhow::Error;
fn try_into(self) -> Result<Session> {
let access_jwt_exp = get_token_expiration(&self.access_jwt)?;
Ok(Session {
access_jwt: self.access_jwt,
access_jwt_exp,
refresh_jwt: self.refresh_jwt,
did: self.did,
})
}
}

View File

@ -0,0 +1,66 @@
use async_trait::async_trait;
use atrium_xrpc::{client::reqwest::ReqwestClient, HttpClient, XrpcClient};
use http::{Request, Response, Method};
use std::sync::{Arc, Mutex};
use super::session::Session;
pub struct AuthenticateableXrpcClient {
inner: ReqwestClient,
session: Option<Arc<Mutex<Session>>>,
}
impl AuthenticateableXrpcClient {
pub fn new(host: String) -> Self {
Self {
inner: ReqwestClient::new(host),
session: None,
}
}
pub fn with_session(host: String, session: Arc<Mutex<Session>>) -> Self {
Self {
inner: ReqwestClient::new(host),
session: Some(session),
}
}
}
#[async_trait]
impl HttpClient for AuthenticateableXrpcClient {
async fn send_http(
&self,
req: Request<Vec<u8>>,
) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (mut parts, body) = req.into_parts();
/* NOTE: This is a huge hack because auth is currently totally broken in atrium-api */
let is_request_to_refresh_session = parts.method == Method::POST && parts.uri.to_string().ends_with("com.atproto.server.refreshSession");
if let Some(token) = self.auth(is_request_to_refresh_session) {
parts.headers.insert(http::header::AUTHORIZATION, format!("Bearer {}", token).parse()?);
}
let req = Request::from_parts(parts, body);
self.inner.send_http(req).await
}
}
impl XrpcClient for AuthenticateableXrpcClient {
fn auth(&self, is_refresh: bool) -> Option<String> {
self.session
.as_ref()
.and_then(|session| session.lock().ok())
.map(|session| {
if is_refresh {
session.refresh_jwt.clone()
} else {
session.access_jwt.clone()
}
})
}
fn host(&self) -> &str {
self.inner.host()
}
}