use std::matches; use std::time::Duration; use anyhow::{anyhow, Result}; use atrium_api::agent::{store::MemorySessionStore, AtpAgent}; use atrium_api::blob::BlobRef; use atrium_api::records::Record; use atrium_xrpc_client::reqwest::ReqwestClient; use chrono::Utc; use http::StatusCode; use log::error; use tokio_stream::StreamExt; use tokio_tungstenite::{connect_async, tungstenite}; use super::entities::ProfileDetails; use super::streaming::{handle_message, CommitProcessor}; pub struct Bluesky { agent: AtpAgent, } impl Bluesky { pub const XRPC_HOST: &'static str = "https://bsky.social"; pub const FIREHOSE_HOST: &'static str = "wss://bsky.network"; pub const STREAMING_TIMEOUT: Duration = Duration::from_secs(60); pub fn unauthenticated() -> Self { Self { agent: AtpAgent::new( ReqwestClient::new(Self::XRPC_HOST), MemorySessionStore::default(), ), } } pub async fn login(handle: &str, password: &str) -> Result { let agent = AtpAgent::new( ReqwestClient::new(Self::XRPC_HOST), MemorySessionStore::default(), ); agent.login(handle, password).await?; Ok(Self { agent }) } pub async fn upload_blob(&self, blob: Vec) -> Result { let result = self.agent.api.com.atproto.repo.upload_blob(blob).await?; Ok(result.blob) } pub async fn publish_feed( &self, publisher_did: &str, feed_generator_did: &str, name: &str, display_name: &str, description: &str, avatar: Option, ) -> Result<()> { use atrium_api::com::atproto::repo::put_record::Input; self.agent .api .com .atproto .repo .put_record(Input { collection: "app.bsky.feed.generator".to_owned(), record: Record::AppBskyFeedGenerator(Box::new( atrium_api::app::bsky::feed::generator::Record { avatar, created_at: Utc::now().to_rfc3339(), description: Some(description.to_owned()), description_facets: None, did: feed_generator_did.to_owned(), display_name: display_name.to_owned(), labels: None, }, )), repo: publisher_did.to_owned(), rkey: name.to_owned(), swap_commit: None, swap_record: None, validate: None, }) .await?; Ok(()) } pub async fn fetch_profile_details(&self, did: &str) -> Result> { let result = self .agent .api .com .atproto .repo .get_record(atrium_api::com::atproto::repo::get_record::Parameters { collection: "app.bsky.actor.profile".to_owned(), cid: None, repo: did.to_owned(), rkey: "self".to_owned(), }) .await; let profile_data = match result { Ok(profile_data) => profile_data, Err(e) if is_missing_record_error(&e) => return Ok(None), Err(e) => return Err(e.into()), }; match profile_data.value { Record::AppBskyActorProfile(profile) => Ok(Some(ProfileDetails::from(*profile))), _ => Err(anyhow!("Wrong type of record")), } } pub async fn resolve_handle(&self, handle: &str) -> Result> { use atrium_api::com::atproto::identity::resolve_handle::Parameters; let result = self .agent .api .com .atproto .identity .resolve_handle(Parameters { handle: handle.to_owned(), }) .await; match result { Ok(result) => Ok(Some(result.did)), Err(e) if is_unable_to_resolve_handle_error(&e) => Ok(None), Err(e) => Err(e.into()), } } pub async fn subscribe_to_operations( &self, processor: &P, cursor: Option, ) -> Result<()> { let url = match cursor { Some(cursor) => format!( "{}/xrpc/com.atproto.sync.subscribeRepos?cursor={}", Self::FIREHOSE_HOST, cursor ), None => format!( "{}/xrpc/com.atproto.sync.subscribeRepos", Self::FIREHOSE_HOST ), }; let (stream, _) = connect_async(url).await?; let stream = stream.timeout(Self::STREAMING_TIMEOUT); let mut stream = Box::pin(stream); while let Some(Ok(tungstenite::Message::Binary(message))) = stream.try_next().await? { if let Err(e) = handle_message(&message, processor).await { error!("Error handling a message: {:?}", e); } } Ok(()) } } fn is_missing_record_error(error: &atrium_xrpc::error::Error) -> bool { use atrium_xrpc::error::{Error, ErrorResponseBody, XrpcError, XrpcErrorKind}; matches!(error, Error::XrpcResponse(XrpcError { status, error: Some(XrpcErrorKind::Undefined(ErrorResponseBody { error: Some(error_code), message: Some(error_message), })), }) if // FIXME: This is this way instead of pattern matching because atrium's // version of http is pegged at like 0.2.x and it does not // re-export it so we have no way of referencing the real type status.as_u16() == StatusCode::BAD_REQUEST.as_u16() && error_code == "InvalidRequest" && error_message.starts_with("Could not locate record") ) } fn is_unable_to_resolve_handle_error(error: &atrium_xrpc::error::Error) -> bool { use atrium_xrpc::error::{Error, ErrorResponseBody, XrpcError, XrpcErrorKind}; matches!(error, Error::XrpcResponse(XrpcError { status, error: Some(XrpcErrorKind::Undefined(ErrorResponseBody { error: Some(error_code), message: Some(error_message), })), }) if // FIXME: This is this way instead of pattern matching because atrium's // version of http is pegged at like 0.2.x and it does not // re-export it so we have no way of referencing the real type status.as_u16() == StatusCode::BAD_REQUEST.as_u16() && error_code == "InvalidRequest" && error_message.starts_with("Unable to resolve handle") ) }