Upgrade everything to latest versions

This includes a bunch of small changes to adapt to how atrium-api has changed
over time. They're not functional or interesting, just some type-level
adjustments that are needed.

Some more complicated logic was changed in how profile details are parsed,
since atrium's way of doing things is weird and hard to understand so I just
manually grab stuff from the object map instead of relying on atrium's types.
This is similar to how CBOR parsing is done.

Boring maintenance stuff.
This commit is contained in:
Aleksei Voronov 2024-08-18 13:35:41 +02:00
parent 149cd44227
commit b8d1fd7695
15 changed files with 1095 additions and 863 deletions

1640
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,28 +7,28 @@ default-run = "nederlandskie"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.79"
async-trait = "0.1.77"
atrium-api = "0.15.0"
atrium-xrpc = "0.8.0"
atrium-xrpc-client = "0.2.0"
axum = "0.7.4"
chat-gpt-lib-rs = "0.3.2"
chrono = "0.4.31"
clap = { version = "4.4.16", features = ["derive"] }
anyhow = "1.0.86"
async-trait = "0.1.81"
atrium-api = "0.24.2"
atrium-xrpc = "0.11.3"
atrium-xrpc-client = "0.5.6"
axum = "0.7.5"
chat-gpt-lib-rs = "0.5.1"
chrono = "0.4.38"
clap = { version = "4.5.16", features = ["derive"] }
dotenv = "0.15.0"
env_logger = "0.10.1"
http = "1.0.0"
libipld-core = { version = "0.16.0", features = ["serde-codec"] }
env_logger = "0.11.5"
http = "1.1.0"
ipld-core = "0.4.1"
lingua = "1.6.2"
log = "0.4.20"
log = "0.4.22"
once_cell = "1.19.0"
rs-car = "0.4.1"
scooby = "0.5.0"
serde = "1.0.195"
serde_ipld_dagcbor = "0.4.2"
serde = "1.0.208"
serde_ipld_dagcbor = "0.6.1"
sk-cbor = "0.1.2"
sqlx = { version = "0.7.3", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] }
tokio = { version = "1.35.1", features = ["full"] }
tokio-stream = "0.1.14"
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
sqlx = { version = "0.8.0", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] }
tokio = { version = "1.39.2", features = ["full"] }
tokio-stream = "0.1.15"
tokio-tungstenite = { version = "0.23.1", features = ["native-tls"] }

View File

@ -19,7 +19,7 @@ pub trait Algo {
async fn fetch_posts(
&self,
database: &Database,
limit: i32,
limit: u8,
earlier_than: Option<(DateTime<Utc>, &str)>,
) -> Result<Vec<database::Post>>;
}

View File

@ -50,7 +50,7 @@ impl Algo for Nederlandskie {
async fn fetch_posts(
&self,
database: &Database,
limit: i32,
limit: u8,
earlier_than: Option<(DateTime<Utc>, &str)>,
) -> Result<Vec<database::Post>> {
Ok(database

View File

@ -1,12 +1,13 @@
use anyhow::Result;
use atrium_api::types::string::Did;
use dotenv::dotenv;
use std::env;
pub struct Config {
pub chat_gpt_api_key: String,
pub database_url: String,
pub feed_generator_did: String,
pub publisher_did: String,
pub feed_generator_did: Did,
pub publisher_did: Did,
pub feed_generator_hostname: String,
}
@ -18,8 +19,12 @@ impl Config {
chat_gpt_api_key: env::var("CHAT_GPT_API_KEY")?,
database_url: env::var("DATABASE_URL")?,
feed_generator_hostname: env::var("FEED_GENERATOR_HOSTNAME")?,
feed_generator_did: format!("did:web:{}", env::var("FEED_GENERATOR_HOSTNAME")?),
publisher_did: env::var("PUBLISHER_DID")?,
feed_generator_did: format!("did:web:{}", env::var("FEED_GENERATOR_HOSTNAME")?)
.parse()
.map_err(anyhow::Error::msg)?,
publisher_did: env::var("PUBLISHER_DID")?
.parse()
.map_err(anyhow::Error::msg)?,
})
}
}

View File

@ -1,8 +1,9 @@
use std::sync::Arc;
use atrium_api::app::bsky::feed::describe_feed_generator::{
Feed, Output as FeedGeneratorDescription,
FeedData, OutputData as FeedGeneratorDescription,
};
use atrium_api::types::Object;
use axum::{extract::State, Json};
use crate::{algos::Algos, config::Config};
@ -15,12 +16,14 @@ pub async fn describe_feed_generator(
did: config.feed_generator_did.clone(),
feeds: algos
.iter_names()
.map(|name| Feed {
.map(|name| FeedData {
uri: format!(
"at://{}/app.bsky.feed.generator/{}",
config.publisher_did, name
config.publisher_did.as_ref(),
name
),
})
.map(Object::from)
.collect(),
links: None,
})

View File

@ -25,7 +25,7 @@ pub struct Service {
pub async fn did_json(State(config): State<Arc<Config>>) -> Json<Did> {
Json(Did {
context: vec!["https://www.w3.org/ns/did/v1".to_owned()],
id: config.feed_generator_did.clone(),
id: config.feed_generator_did.to_string(),
service: vec![Service {
id: "#bsky_fg".to_owned(),
type_: "BskyFeedGenerator".to_owned(),

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use anyhow::anyhow;
use atrium_api::app::bsky::feed::defs::SkeletonFeedPost;
use atrium_api::app::bsky::feed::defs::SkeletonFeedPostData;
use atrium_api::app::bsky::feed::get_feed_skeleton::{
Output as FeedSkeleton, Parameters as FeedSkeletonQuery,
OutputData as FeedSkeleton, Parameters as FeedSkeletonQuery,
};
use atrium_api::types::{LimitedNonZeroU8, Object};
use axum::extract::{Query, State};
use axum::Json;
use chrono::{DateTime, TimeZone, Utc};
@ -28,17 +29,23 @@ pub async fn get_feed_skeleton(
.get_by_name(feed_name)
.ok_or_else(|| AppError::FeedNotFound(feed_name.to_owned()))?;
let limit = query.limit.unwrap_or(20);
let limit = query
.limit
.unwrap_or(LimitedNonZeroU8::try_from(20).expect("this default limit should always work"));
let earlier_than = query.cursor.as_deref().map(parse_cursor).transpose()?;
let posts = algo.fetch_posts(&database, limit, earlier_than).await?;
let posts = algo
.fetch_posts(&database, limit.into(), earlier_than)
.await?;
let feed = posts
.iter()
.map(|p| SkeletonFeedPost {
.map(|p| SkeletonFeedPostData {
post: p.uri.clone(),
feed_context: None,
reason: None,
})
.map(Object::from)
.collect();
let cursor = posts.last().map(|p| make_cursor(&p.indexed_at, &p.cid));

View File

@ -103,7 +103,9 @@ impl CommitProcessor for PostIndexer {
if commit.seq % 20 == 0 {
debug!(
"Updating cursor for {} to {} ({})",
self.config.feed_generator_did, commit.seq, commit.time
self.config.feed_generator_did.as_str(),
commit.seq,
commit.time
);
self.database
.update_subscription_cursor(

View File

@ -37,7 +37,7 @@ impl AI {
response
.choices
.get(0)
.first()
.map(|choice| choice.message.content.to_lowercase())
.ok_or_else(|| anyhow!("No choices received from ChatGPT, weird"))
}

View File

@ -1,12 +1,12 @@
use std::fmt::Debug;
use std::matches;
use std::time::Duration;
use anyhow::{anyhow, Result};
use anyhow::Result;
use atrium_api::agent::{store::MemorySessionStore, AtpAgent};
use atrium_api::blob::BlobRef;
use atrium_api::records::Record;
use atrium_api::types::string::Datetime;
use atrium_api::types::{BlobRef, Collection, Object, TryIntoUnknown};
use atrium_xrpc_client::reqwest::ReqwestClient;
use chrono::Utc;
use http::StatusCode;
use log::error;
use tokio_stream::StreamExt;
@ -46,7 +46,7 @@ impl Bluesky {
pub async fn upload_blob(&self, blob: Vec<u8>) -> Result<BlobRef> {
let result = self.agent.api.com.atproto.repo.upload_blob(blob).await?;
Ok(result.blob)
Ok(result.data.blob)
}
pub async fn publish_feed(
@ -58,66 +58,71 @@ impl Bluesky {
description: &str,
avatar: Option<BlobRef>,
) -> Result<()> {
use atrium_api::com::atproto::repo::put_record::Input;
use atrium_api::com::atproto::repo::put_record::InputData;
self.agent
.api
.com
.atproto
.repo
.put_record(Input {
collection: "app.bsky.feed.generator".to_owned(),
record: Record::AppBskyFeedGenerator(Box::new(
atrium_api::app::bsky::feed::generator::Record {
.put_record(
InputData {
collection: atrium_api::app::bsky::feed::Generator::nsid(),
record: atrium_api::app::bsky::feed::generator::RecordData {
avatar,
created_at: Utc::now().to_rfc3339(),
created_at: Datetime::now(),
description: Some(description.to_owned()),
description_facets: None,
did: feed_generator_did.to_owned(),
did: feed_generator_did.parse().map_err(anyhow::Error::msg)?,
display_name: display_name.to_owned(),
labels: None,
},
)),
repo: publisher_did.to_owned(),
accepts_interactions: None,
}
.try_into_unknown()?,
repo: publisher_did.parse().map_err(anyhow::Error::msg)?,
rkey: name.to_owned(),
swap_commit: None,
swap_record: None,
validate: None,
})
}
.into(),
)
.await?;
Ok(())
}
pub async fn fetch_profile_details(&self, did: &str) -> Result<Option<ProfileDetails>> {
use atrium_api::com::atproto::repo::get_record::ParametersData;
let result = self
.agent
.api
.com
.atproto
.repo
.get_record(atrium_api::com::atproto::repo::get_record::Parameters {
collection: "app.bsky.actor.profile".to_owned(),
.get_record(
ParametersData {
collection: atrium_api::app::bsky::actor::Profile::nsid(),
cid: None,
repo: did.to_owned(),
repo: did.parse().map_err(anyhow::Error::msg)?,
rkey: "self".to_owned(),
})
}
.into(),
)
.await;
let profile_data = match result {
Ok(profile_data) => profile_data,
let profile_output = match result {
Ok(profile_output) => profile_output,
Err(e) if is_missing_record_error(&e) => return Ok(None),
Err(e) => return Err(e.into()),
};
match profile_data.value {
Record::AppBskyActorProfile(profile) => Ok(Some(ProfileDetails::from(*profile))),
_ => Err(anyhow!("Wrong type of record")),
}
Ok(Some(ProfileDetails::try_from(profile_output.data.value)?))
}
pub async fn resolve_handle(&self, handle: &str) -> Result<Option<String>> {
use atrium_api::com::atproto::identity::resolve_handle::Parameters;
use atrium_api::com::atproto::identity::resolve_handle::ParametersData;
let result = self
.agent
@ -125,13 +130,13 @@ impl Bluesky {
.com
.atproto
.identity
.resolve_handle(Parameters {
handle: handle.to_owned(),
})
.resolve_handle(Object::from(ParametersData {
handle: handle.parse().map_err(anyhow::Error::msg)?,
}))
.await;
match result {
Ok(result) => Ok(Some(result.did)),
Ok(result) => Ok(Some(result.did.to_string())),
Err(e) if is_unable_to_resolve_handle_error(&e) => Ok(None),
Err(e) => Err(e.into()),
}
@ -168,7 +173,10 @@ impl Bluesky {
}
}
fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {
fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool
where
T: Debug,
{
use atrium_xrpc::error::{Error, ErrorResponseBody, XrpcError, XrpcErrorKind};
matches!(error,
@ -189,7 +197,10 @@ fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {
)
}
fn is_unable_to_resolve_handle_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {
fn is_unable_to_resolve_handle_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool
where
T: Debug,
{
use atrium_xrpc::error::{Error, ErrorResponseBody, XrpcError, XrpcErrorKind};
matches!(error,

View File

@ -1,4 +1,10 @@
use atrium_api::app::bsky::actor::profile::Record as ProfileRecord;
use std::ops::Deref;
use atrium_api::{
app::bsky::actor::profile::RecordData as ProfileRecordData,
types::Unknown,
};
use ipld_core::ipld::Ipld;
#[derive(Debug)]
pub struct ProfileDetails {
@ -6,11 +12,33 @@ pub struct ProfileDetails {
pub description: String,
}
impl From<ProfileRecord> for ProfileDetails {
fn from(value: ProfileRecord) -> Self {
impl From<ProfileRecordData> for ProfileDetails {
fn from(value: ProfileRecordData) -> Self {
Self {
display_name: value.display_name.unwrap_or_default(),
description: value.description.unwrap_or_default(),
}
}
}
impl TryFrom<Unknown> for ProfileDetails {
type Error = anyhow::Error;
fn try_from(value: Unknown) -> Result<Self, Self::Error> {
let string_or_empty = |value: &Unknown, key: &str| match value {
Unknown::Object(map) => match map.get(key) {
Some(x) => match x.deref() {
Ipld::String(s) => s.clone(),
_ => "".to_owned(),
},
_ => "".to_owned(),
},
_ => "".to_owned(),
};
Ok(ProfileDetails {
display_name: string_or_empty(&value, "displayName"),
description: string_or_empty(&value, "description"),
})
}
}

View File

@ -1,7 +1,4 @@
use atrium_api::com::atproto::sync::subscribe_repos::{
Commit, Handle, Info, Message, Migrate, Tombstone,
};
use libipld_core::ipld::Ipld;
use ipld_core::ipld::Ipld;
use std::io::Cursor;
// original definition:
@ -52,13 +49,13 @@ impl TryFrom<Ipld> for FrameHeader {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Frame {
Message(Box<MessageFrame>),
Message(Option<String>, MessageFrame),
Error(ErrorFrame),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MessageFrame {
pub body: Message,
pub body: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@ -81,39 +78,16 @@ impl TryFrom<&[u8]> for Frame {
return Err(anyhow::anyhow!("invalid frame type"));
}
};
let ipld = serde_ipld_dagcbor::from_slice::<Ipld>(left)?;
let header = FrameHeader::try_from(ipld)?;
match header {
FrameHeader::Message(t) => match t.as_deref() {
Some("#commit") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Commit(Box::new(serde_ipld_dagcbor::from_slice::<Commit>(
right,
)?)),
}))),
Some("#handle") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Handle(Box::new(serde_ipld_dagcbor::from_slice::<Handle>(
right,
)?)),
}))),
Some("#info") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Info(Box::new(serde_ipld_dagcbor::from_slice::<Info>(right)?)),
}))),
Some("#migrate") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Migrate(Box::new(serde_ipld_dagcbor::from_slice::<Migrate>(
right,
)?)),
}))),
Some("#tombstone") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Tombstone(Box::new(
serde_ipld_dagcbor::from_slice::<Tombstone>(right)?,
)),
}))),
_ => {
let tag = t.as_deref();
Err(anyhow::anyhow!("frame not implemented: tag={tag:?}"))
}
let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?;
if let FrameHeader::Message(t) = &header {
Ok(Frame::Message(
t.clone(),
MessageFrame {
body: right.to_vec(),
},
FrameHeader::Error => Ok(Frame::Error(ErrorFrame {})),
))
} else {
Ok(Frame::Error(ErrorFrame {}))
}
}
}

View File

@ -2,7 +2,8 @@ use std::collections::HashMap;
use anyhow::Result;
use async_trait::async_trait;
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message};
use atrium_api::com::atproto::sync::subscribe_repos::Commit;
use atrium_api::types::Collection;
use chrono::{DateTime, Utc};
use super::{
@ -11,10 +12,6 @@ use super::{
internals::ipld::Frame,
};
const COLLECTION_POST: &str = "app.bsky.feed.post";
const COLLECTION_LIKE: &str = "app.bsky.feed.like";
const COLLECTION_FOLLOW: &str = "app.bsky.graph.follow";
const ACTION_CREATE: &str = "create";
const ACTION_DELETE: &str = "delete";
@ -24,7 +21,7 @@ pub trait CommitProcessor {
}
pub struct CommitDetails {
pub seq: i32,
pub seq: i64,
pub time: DateTime<Utc>,
pub operations: Vec<Operation>,
}
@ -71,7 +68,7 @@ pub async fn handle_message<P: CommitProcessor>(message: &[u8], processor: &P) -
processor
.process_commit(&CommitDetails {
seq: commit.seq,
time: commit.time.parse()?,
time: (*commit.time.as_ref()).into(),
operations,
})
.await?;
@ -81,10 +78,14 @@ pub async fn handle_message<P: CommitProcessor>(message: &[u8], processor: &P) -
fn parse_commit_from_message(message: &[u8]) -> Result<Option<Commit>> {
match Frame::try_from(message)? {
Frame::Message(message) => match message.body {
Message::Commit(commit) => Ok(Some(*commit)),
_ => Ok(None),
},
Frame::Message(Some(t), message) => {
if t == "#commit" {
Ok(serde_ipld_dagcbor::from_reader(message.body.as_slice())?)
} else {
Ok(None)
}
}
Frame::Message(None, _) => Ok(None),
Frame::Error(err) => panic!("Frame error: {err:?}"),
}
}
@ -93,17 +94,20 @@ async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
let mut operations = Vec::new();
let (blocks, _header) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
let blocks_by_cid: HashMap<_, _> = blocks.into_iter().collect();
let blocks_by_cid: HashMap<_, _> = blocks
.into_iter()
.map(|(cid, block)| (cid.to_string(), block))
.collect();
for op in &commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
let action = op.action.as_str();
let uri = format!("at://{}/{}", commit.repo, op.path);
let uri = format!("at://{}/{}", commit.repo.as_str(), op.path);
let operation = match action {
ACTION_CREATE => {
let cid = match op.cid {
Some(cid) => cid,
let cid = match &op.cid {
Some(cid_link) => cid_link.0.to_string(),
None => continue,
};
@ -113,31 +117,31 @@ async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
};
match collection {
COLLECTION_POST => {
atrium_api::app::bsky::feed::Post::NSID => {
let post: PostRecord = read_record(block)?;
Operation::CreatePost {
author_did: commit.repo.clone(),
author_did: commit.repo.to_string(),
cid: cid.to_string(),
uri,
post,
}
}
COLLECTION_LIKE => {
atrium_api::app::bsky::feed::Like::NSID => {
let like: LikeRecord = read_record(block)?;
Operation::CreateLike {
author_did: commit.repo.clone(),
author_did: commit.repo.to_string(),
cid: cid.to_string(),
uri,
like,
}
}
COLLECTION_FOLLOW => {
atrium_api::app::bsky::graph::Follow::NSID => {
let follow: FollowRecord = read_record(block)?;
Operation::CreateFollow {
author_did: commit.repo.clone(),
author_did: commit.repo.to_string(),
cid: cid.to_string(),
uri,
follow,
@ -147,9 +151,9 @@ async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
}
}
ACTION_DELETE => match collection {
COLLECTION_POST => Operation::DeletePost { uri },
COLLECTION_LIKE => Operation::DeleteLike { uri },
COLLECTION_FOLLOW => Operation::DeleteFollow { uri },
atrium_api::app::bsky::feed::Post::NSID => Operation::DeletePost { uri },
atrium_api::app::bsky::feed::Like::NSID => Operation::DeleteLike { uri },
atrium_api::app::bsky::graph::Follow::NSID => Operation::DeleteFollow { uri },
_ => continue,
},
_ => continue,

View File

@ -251,7 +251,7 @@ impl Database {
&self,
host: &str,
did: &str,
cursor: i32,
cursor: i64,
) -> Result<bool> {
let mut params = Parameters::new();