nederlandskie/src/services/bluesky/streaming.rs

96 lines
2.9 KiB
Rust
Raw Normal View History

use std::collections::HashSet;
use anyhow::Result;
2023-08-31 08:56:14 +01:00
use async_trait::async_trait;
use super::proto::Frame;
use anyhow::anyhow;
use atrium_api::app::bsky::feed::post::Record;
use atrium_api::com::atproto::sync::subscribe_repos::Commit;
use atrium_api::com::atproto::sync::subscribe_repos::Message;
#[async_trait]
pub trait OperationProcessor {
async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()>;
}
#[derive(Debug)]
pub enum Operation {
CreatePost {
author_did: String,
cid: String,
uri: String,
languages: HashSet<String>,
text: String,
},
DeletePost {
uri: String,
},
}
pub async fn handle_message<P: OperationProcessor>(message: &[u8], processor: &P) -> Result<()> {
2023-08-19 19:20:27 +01:00
let commit = match parse_commit_from_message(&message)? {
Some(commit) => commit,
None => return Ok(()),
};
let post_operations = extract_operations(&commit).await?;
2023-08-19 19:20:27 +01:00
for operation in &post_operations {
processor.process_operation(&operation, &commit).await?;
2023-08-19 19:20:27 +01:00
}
Ok(())
}
fn parse_commit_from_message(message: &[u8]) -> Result<Option<Commit>> {
match Frame::try_from(message)? {
Frame::Message(message) => match message.body {
Message::Commit(commit) => Ok(Some(*commit)),
_ => Ok(None),
},
Frame::Error(err) => panic!("Frame error: {err:?}"),
}
}
async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
2023-08-19 19:20:27 +01:00
let mut operations = Vec::new();
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
for op in &commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
if (op.action != "create" && op.action != "delete") || collection != "app.bsky.feed.post" {
continue;
}
let uri = format!("at://{}/{}", commit.repo, op.path);
2023-08-19 19:20:27 +01:00
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) {
let record: Record = ciborium::from_reader(&mut item.as_slice())?;
2023-08-19 19:20:27 +01:00
operations.push(match op.action.as_str() {
"create" => Operation::CreatePost {
2023-08-31 08:56:14 +01:00
languages: record
.langs
.unwrap_or_else(Vec::new)
.iter()
.cloned()
.collect(),
2023-08-19 19:20:27 +01:00
text: record.text,
author_did: commit.repo.clone(),
2023-08-31 08:56:14 +01:00
cid: op
.cid
.ok_or(anyhow!(
"cid is not present for a post create operation, how is that possible"
))?
.to_string(),
uri,
},
"delete" => Operation::DeletePost { uri },
2023-08-19 19:20:27 +01:00
_ => unreachable!(),
});
}
}
2023-08-19 19:20:27 +01:00
Ok(operations)
}