From dd333336491d1553154b7c84903786e53d1c2648 Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Sat, 23 Sep 2023 20:25:26 +0200 Subject: [PATCH] Rewrite streaming processing in a more sane way And also add support for likes and follows --- src/processes/post_indexer.rs | 1 + src/services/bluesky/streaming.rs | 123 +++++++++++++++++++++++------- 2 files changed, 96 insertions(+), 28 deletions(-) diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 15be6ce..a06beba 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -85,6 +85,7 @@ impl CommitProcessor for PostIndexer { // TODO: Delete posts from db // self.database.delete_post(&self.db_connection_pool, &uri).await?; } + _ => continue, } } diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index d73755b..d02b976 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -1,12 +1,17 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use anyhow::Result; use async_trait::async_trait; +use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; use super::proto::Frame; -use anyhow::anyhow; -use atrium_api::app::bsky::feed::post::Record; -use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; + +const COLLECTION_POST: &str = "app.bsky.feed.post"; +const COLLECTION_LIKE: &str = "app.bsky.feed.like"; +const COLLECTION_FOLLOW: &str = "app.bsky.graph.follow"; + +const ACTION_CREATE: &str = "create"; +const ACTION_DELETE: &str = "delete"; #[async_trait] pub trait CommitProcessor { @@ -27,9 +32,28 @@ pub enum Operation { languages: HashSet, text: String, }, + CreateLike { + author_did: String, + cid: String, + uri: String, + subject_cid: String, + subject_uri: String, + }, + CreateFollow { + author_did: String, + cid: String, + uri: String, + subject: String, + }, DeletePost { uri: String, }, + DeleteLike { + uri: String, + }, + DeleteFollow { + uri: String, + }, } pub async fn handle_message(message: &[u8], processor: &P) -> Result<()> { @@ -63,35 +87,78 @@ fn parse_commit_from_message(message: &[u8]) -> Result> { async fn extract_operations(commit: &Commit) -> Result> { let mut operations = Vec::new(); - let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; + let (blocks, _header) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; + let blocks_by_cid: HashMap<_, _> = blocks.into_iter().collect(); + for op in &commit.ops { let collection = op.path.split('/').next().expect("op.path is empty"); - if (op.action != "create" && op.action != "delete") || collection != "app.bsky.feed.post" { - continue; - } - + let action = op.action.as_str(); let uri = format!("at://{}/{}", commit.repo, op.path); - if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) { - let record: Record = ciborium::from_reader(&mut item.as_slice())?; + let operation = match action { + ACTION_CREATE => { + let cid = match op.cid { + Some(cid) => cid, + None => continue, + }; - operations.push(match op.action.as_str() { - "create" => Operation::CreatePost { - languages: record.langs.unwrap_or_default().iter().cloned().collect(), - text: record.text, - author_did: commit.repo.clone(), - cid: op - .cid - .ok_or(anyhow!( - "cid is not present for a post create operation, how is that possible" - ))? - .to_string(), - uri, - }, - "delete" => Operation::DeletePost { uri }, - _ => unreachable!(), - }); - } + let block = match blocks_by_cid.get(&cid) { + Some(block) => block, + None => continue, + }; + + match collection { + COLLECTION_POST => { + use atrium_api::app::bsky::feed::post::Record; + + let record: Record = ciborium::from_reader(&mut block.as_slice())?; + + Operation::CreatePost { + author_did: commit.repo.clone(), + cid: cid.to_string(), + uri, + languages: record.langs.unwrap_or_default().iter().cloned().collect(), + text: record.text, + } + } + COLLECTION_LIKE => { + use atrium_api::app::bsky::feed::like::Record; + + let record: Record = ciborium::from_reader(&mut block.as_slice())?; + + Operation::CreateLike { + author_did: commit.repo.clone(), + cid: cid.to_string(), + uri, + subject_cid: record.subject.cid, + subject_uri: record.subject.uri, + } + } + COLLECTION_FOLLOW => { + use atrium_api::app::bsky::graph::follow::Record; + + let record: Record = ciborium::from_reader(&mut block.as_slice())?; + + Operation::CreateFollow { + author_did: commit.repo.clone(), + cid: cid.to_string(), + uri, + subject: record.subject, + } + } + _ => continue, + } + } + ACTION_DELETE => match collection { + COLLECTION_POST => Operation::DeletePost { uri }, + COLLECTION_LIKE => Operation::DeleteLike { uri }, + COLLECTION_FOLLOW => Operation::DeleteFollow { uri }, + _ => continue, + }, + _ => continue, + }; + + operations.push(operation) } Ok(operations)