2023-08-31 08:42:56 +01:00
|
|
|
use std::collections::HashSet;
|
|
|
|
|
|
|
|
use async_trait::async_trait;
|
2023-08-18 20:11:49 +01:00
|
|
|
use anyhow::Result;
|
|
|
|
|
|
|
|
use crate::frames::Frame;
|
|
|
|
use anyhow::anyhow;
|
|
|
|
use atrium_api::app::bsky::feed::post::Record;
|
|
|
|
use atrium_api::com::atproto::sync::subscribe_repos::Commit;
|
|
|
|
use atrium_api::com::atproto::sync::subscribe_repos::Message;
|
|
|
|
use futures::StreamExt;
|
|
|
|
use tokio_tungstenite::{connect_async, tungstenite};
|
|
|
|
|
2023-08-31 08:42:56 +01:00
|
|
|
#[async_trait]
|
|
|
|
pub trait OperationProcessor {
|
|
|
|
async fn process_operation(&self, operation: &Operation) -> Result<()>;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum Operation {
|
|
|
|
CreatePost {
|
|
|
|
author_did: String,
|
|
|
|
cid: String,
|
|
|
|
uri: String,
|
|
|
|
languages: HashSet<String>,
|
|
|
|
text: String,
|
|
|
|
},
|
|
|
|
DeletePost {
|
|
|
|
uri: String,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn start_processing_operations_with<P: OperationProcessor>(processor: P) -> Result<()> {
|
2023-08-18 20:11:49 +01:00
|
|
|
let (mut stream, _) =
|
|
|
|
connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?;
|
|
|
|
|
|
|
|
while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await {
|
2023-08-31 08:42:56 +01:00
|
|
|
if let Err(e) = handle_message(&message, &processor).await {
|
2023-08-19 19:20:27 +01:00
|
|
|
println!("Error handling a message: {:?}", e);
|
2023-08-18 20:11:49 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-31 08:42:56 +01:00
|
|
|
async fn handle_message<P: OperationProcessor>(message: &[u8], processor: &P) -> Result<()> {
|
2023-08-19 19:20:27 +01:00
|
|
|
let commit = match parse_commit_from_message(&message)? {
|
|
|
|
Some(commit) => commit,
|
|
|
|
None => return Ok(()),
|
|
|
|
};
|
|
|
|
|
2023-08-31 08:42:56 +01:00
|
|
|
let post_operations = extract_operations(&commit).await?;
|
2023-08-19 19:20:27 +01:00
|
|
|
for operation in &post_operations {
|
2023-08-31 08:42:56 +01:00
|
|
|
processor.process_operation(&operation).await?;
|
2023-08-19 19:20:27 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn parse_commit_from_message(message: &[u8]) -> Result<Option<Commit>> {
|
2023-08-18 20:11:49 +01:00
|
|
|
match Frame::try_from(message)? {
|
|
|
|
Frame::Message(message) => match message.body {
|
|
|
|
Message::Commit(commit) => Ok(Some(*commit)),
|
|
|
|
_ => Ok(None),
|
|
|
|
},
|
|
|
|
Frame::Error(err) => panic!("Frame error: {err:?}"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-31 08:42:56 +01:00
|
|
|
async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
|
2023-08-19 19:20:27 +01:00
|
|
|
let mut operations = Vec::new();
|
2023-08-18 20:11:49 +01:00
|
|
|
|
|
|
|
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
|
|
|
|
for op in &commit.ops {
|
|
|
|
let collection = op.path.split('/').next().expect("op.path is empty");
|
|
|
|
if (op.action != "create" && op.action != "delete") || collection != "app.bsky.feed.post" {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-08-31 08:42:56 +01:00
|
|
|
let uri = format!("at://{}/{}", commit.repo, op.path);
|
2023-08-19 19:20:27 +01:00
|
|
|
|
2023-08-18 20:11:49 +01:00
|
|
|
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) {
|
|
|
|
let record: Record = ciborium::from_reader(&mut item.as_slice())?;
|
|
|
|
|
2023-08-19 19:20:27 +01:00
|
|
|
operations.push(match op.action.as_str() {
|
2023-08-31 08:42:56 +01:00
|
|
|
"create" => Operation::CreatePost {
|
|
|
|
languages: record.langs.unwrap_or_else(Vec::new).iter().cloned().collect(),
|
2023-08-19 19:20:27 +01:00
|
|
|
text: record.text,
|
|
|
|
author_did: commit.repo.clone(),
|
2023-08-31 08:42:56 +01:00
|
|
|
cid: op.cid.ok_or(anyhow!("cid is not present for a post create operation, how is that possible"))?.to_string(),
|
|
|
|
uri,
|
2023-08-18 20:11:49 +01:00
|
|
|
},
|
2023-08-31 08:42:56 +01:00
|
|
|
"delete" => Operation::DeletePost { uri },
|
2023-08-19 19:20:27 +01:00
|
|
|
_ => unreachable!(),
|
|
|
|
});
|
2023-08-18 20:11:49 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-19 19:20:27 +01:00
|
|
|
Ok(operations)
|
2023-08-18 20:11:49 +01:00
|
|
|
}
|