diff --git a/src/streaming.rs b/src/streaming.rs index eb01f03..b43aa6c 100644 --- a/src/streaming.rs +++ b/src/streaming.rs @@ -13,32 +13,29 @@ pub async fn start_stream() -> Result<()> { connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?; while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { - let commit = match parse_commit_message(&message) { - Ok(Some(commit)) => commit, - Ok(None) => continue, - Err(e) => { - println!("Couldn't parse commit: {:?}", e); - continue; - } - }; - - let post_messages = extract_post_messages(&commit).await; - match post_messages { - Ok(post_messages) => { - if !post_messages.is_empty() { - println!("{:?}", post_messages); - } - } - Err(e) => { - println!("Coudln't extract post messages: {:?}", e); - } + if let Err(e) = handle_message(&message).await { + println!("Error handling a message: {:?}", e); } } Ok(()) } -fn parse_commit_message(message: &[u8]) -> Result> { +async fn handle_message(message: &[u8]) -> Result<()> { + let commit = match parse_commit_from_message(&message)? { + Some(commit) => commit, + None => return Ok(()), + }; + + let post_operations = extract_post_operations(&commit).await?; + for operation in &post_operations { + println!("{:?}", operation); + } + + Ok(()) +} + +fn parse_commit_from_message(message: &[u8]) -> Result> { match Frame::try_from(message)? { Frame::Message(message) => match message.body { Message::Commit(commit) => Ok(Some(*commit)), @@ -49,23 +46,21 @@ fn parse_commit_message(message: &[u8]) -> Result> { } #[derive(Debug)] -enum Action { - Create, - Delete, +enum PostOperation { + Create { + author_did: String, + cid: String, + uri: String, + languages: Vec, + text: String, + }, + Delete { + cid: String, + }, } -#[derive(Debug)] -struct PostMessage { - action: Action, - author_did: String, - cid: String, - uri: String, - languages: Vec, - text: String, -} - -async fn extract_post_messages(commit: &Commit) -> Result> { - let mut posts = Vec::new(); +async fn extract_post_operations(commit: &Commit) -> Result> { + let mut operations = Vec::new(); let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; for op in &commit.ops { @@ -74,23 +69,24 @@ async fn extract_post_messages(commit: &Commit) -> Result> { continue; } + let cid = op.cid.expect("cid is not there, what").to_string(); + if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) { let record: Record = ciborium::from_reader(&mut item.as_slice())?; - posts.push(PostMessage { - action: if op.action == "create" { - Action::Create - } else { - Action::Delete + operations.push(match op.action.as_str() { + "create" => PostOperation::Create { + languages: record.langs.unwrap_or_else(Vec::new), + text: record.text, + author_did: commit.repo.clone(), + cid, + uri: format!("at://{}/{}", commit.repo, op.path), }, - languages: record.langs.unwrap_or_else(Vec::new), - text: record.text, - author_did: commit.repo.clone(), - cid: op.cid.expect("cid is not there, what").to_string(), - uri: format!("at://{}/{}", commit.repo, op.path), - }) + "delete" => PostOperation::Delete { cid }, + _ => unreachable!(), + }); } } - Ok(posts) + Ok(operations) }