Rewrite streaming processing in a more sane way

And also add support for likes and follows
This commit is contained in:
Aleksei Voronov 2023-09-23 20:25:26 +02:00
parent 3a54e04bf4
commit dd33333649
2 changed files with 96 additions and 28 deletions

View File

@ -85,6 +85,7 @@ impl CommitProcessor for PostIndexer {
// TODO: Delete posts from db // TODO: Delete posts from db
// self.database.delete_post(&self.db_connection_pool, &uri).await?; // self.database.delete_post(&self.db_connection_pool, &uri).await?;
} }
_ => continue,
} }
} }

View File

@ -1,12 +1,17 @@
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message};
use super::proto::Frame; use super::proto::Frame;
use anyhow::anyhow;
use atrium_api::app::bsky::feed::post::Record; const COLLECTION_POST: &str = "app.bsky.feed.post";
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; const COLLECTION_LIKE: &str = "app.bsky.feed.like";
const COLLECTION_FOLLOW: &str = "app.bsky.graph.follow";
const ACTION_CREATE: &str = "create";
const ACTION_DELETE: &str = "delete";
#[async_trait] #[async_trait]
pub trait CommitProcessor { pub trait CommitProcessor {
@ -27,9 +32,28 @@ pub enum Operation {
languages: HashSet<String>, languages: HashSet<String>,
text: String, text: String,
}, },
CreateLike {
author_did: String,
cid: String,
uri: String,
subject_cid: String,
subject_uri: String,
},
CreateFollow {
author_did: String,
cid: String,
uri: String,
subject: String,
},
DeletePost { DeletePost {
uri: String, uri: String,
}, },
DeleteLike {
uri: String,
},
DeleteFollow {
uri: String,
},
} }
pub async fn handle_message<P: CommitProcessor>(message: &[u8], processor: &P) -> Result<()> { pub async fn handle_message<P: CommitProcessor>(message: &[u8], processor: &P) -> Result<()> {
@ -63,35 +87,78 @@ fn parse_commit_from_message(message: &[u8]) -> Result<Option<Commit>> {
async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> { async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
let mut operations = Vec::new(); let mut operations = Vec::new();
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; let (blocks, _header) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
let blocks_by_cid: HashMap<_, _> = blocks.into_iter().collect();
for op in &commit.ops { for op in &commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty"); let collection = op.path.split('/').next().expect("op.path is empty");
if (op.action != "create" && op.action != "delete") || collection != "app.bsky.feed.post" { let action = op.action.as_str();
continue;
}
let uri = format!("at://{}/{}", commit.repo, op.path); let uri = format!("at://{}/{}", commit.repo, op.path);
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) { let operation = match action {
let record: Record = ciborium::from_reader(&mut item.as_slice())?; ACTION_CREATE => {
let cid = match op.cid {
Some(cid) => cid,
None => continue,
};
operations.push(match op.action.as_str() { let block = match blocks_by_cid.get(&cid) {
"create" => Operation::CreatePost { Some(block) => block,
languages: record.langs.unwrap_or_default().iter().cloned().collect(), None => continue,
text: record.text, };
author_did: commit.repo.clone(),
cid: op match collection {
.cid COLLECTION_POST => {
.ok_or(anyhow!( use atrium_api::app::bsky::feed::post::Record;
"cid is not present for a post create operation, how is that possible"
))? let record: Record = ciborium::from_reader(&mut block.as_slice())?;
.to_string(),
uri, Operation::CreatePost {
}, author_did: commit.repo.clone(),
"delete" => Operation::DeletePost { uri }, cid: cid.to_string(),
_ => unreachable!(), uri,
}); languages: record.langs.unwrap_or_default().iter().cloned().collect(),
} text: record.text,
}
}
COLLECTION_LIKE => {
use atrium_api::app::bsky::feed::like::Record;
let record: Record = ciborium::from_reader(&mut block.as_slice())?;
Operation::CreateLike {
author_did: commit.repo.clone(),
cid: cid.to_string(),
uri,
subject_cid: record.subject.cid,
subject_uri: record.subject.uri,
}
}
COLLECTION_FOLLOW => {
use atrium_api::app::bsky::graph::follow::Record;
let record: Record = ciborium::from_reader(&mut block.as_slice())?;
Operation::CreateFollow {
author_did: commit.repo.clone(),
cid: cid.to_string(),
uri,
subject: record.subject,
}
}
_ => continue,
}
}
ACTION_DELETE => match collection {
COLLECTION_POST => Operation::DeletePost { uri },
COLLECTION_LIKE => Operation::DeleteLike { uri },
COLLECTION_FOLLOW => Operation::DeleteFollow { uri },
_ => continue,
},
_ => continue,
};
operations.push(operation)
} }
Ok(operations) Ok(operations)