From 5128bf9d4aa1d341594cbed8bbb3dd3ce89bfa8f Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Fri, 22 Sep 2023 12:37:10 +0200 Subject: [PATCH] Refactor streaming stuff Now we call the processor once per commit, and it's also now a commit processor, not an operation processor, so that we can update the cursor properly --- src/processes/post_indexer.rs | 53 ++++++++++++++++--------------- src/services/bluesky.rs | 2 +- src/services/bluesky/client.rs | 4 +-- src/services/bluesky/streaming.rs | 26 +++++++++------ 4 files changed, 47 insertions(+), 38 deletions(-) diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 55da4d0..8d6d6f5 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -2,12 +2,11 @@ use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; -use atrium_api::com::atproto::sync::subscribe_repos::Commit; use log::info; use crate::algos::Algos; use crate::config::Config; -use crate::services::bluesky::{Bluesky, Operation, OperationProcessor}; +use crate::services::bluesky::{Bluesky, CommitDetails, CommitProcessor, Operation}; use crate::services::Database; pub struct PostIndexer { @@ -55,37 +54,39 @@ impl PostIndexer { } #[async_trait] -impl OperationProcessor for PostIndexer { - async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()> { - match operation { - Operation::CreatePost { - author_did, - cid, - uri, - languages, - text, - } => { - for algo in self.algos.iter_all() { - if algo.should_index_post(author_did, languages, text).await? { - info!("Received insertable post from {author_did}: {text}"); +impl CommitProcessor for PostIndexer { + async fn process_commit(&self, commit: &CommitDetails) -> Result<()> { + for operation in &commit.operations { + match operation { + Operation::CreatePost { + author_did, + cid, + uri, + languages, + text, + } => { + for algo in self.algos.iter_all() { + if algo.should_index_post(author_did, languages, text).await? { + info!("Received insertable post from {author_did}: {text}"); - self.database - .insert_profile_if_it_doesnt_exist(&author_did) - .await?; + self.database + .insert_profile_if_it_doesnt_exist(&author_did) + .await?; - self.database.insert_post(&author_did, &cid, &uri).await?; + self.database.insert_post(author_did, cid, uri).await?; - break; + break; + } } } - } - Operation::DeletePost { uri } => { - info!("Received a post to delete: {uri}"); + Operation::DeletePost { uri } => { + info!("Received a post to delete: {uri}"); - // TODO: Delete posts from db - // self.database.delete_post(&self.db_connection_pool, &uri).await?; + // TODO: Delete posts from db + // self.database.delete_post(&self.db_connection_pool, &uri).await?; + } } - }; + } if commit.seq % 20 == 0 { info!( diff --git a/src/services/bluesky.rs b/src/services/bluesky.rs index 91a0f04..1ffded9 100644 --- a/src/services/bluesky.rs +++ b/src/services/bluesky.rs @@ -3,4 +3,4 @@ mod proto; mod streaming; pub use client::Bluesky; -pub use streaming::{Operation, OperationProcessor}; +pub use streaming::{CommitDetails, CommitProcessor, Operation}; diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 21e55bf..8f865ff 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -6,7 +6,7 @@ use futures::StreamExt; use log::error; use tokio_tungstenite::{connect_async, tungstenite}; -use super::streaming::{handle_message, OperationProcessor}; +use super::streaming::{handle_message, CommitProcessor}; #[derive(Debug)] pub struct ProfileDetails { @@ -51,7 +51,7 @@ impl Bluesky { }) } - pub async fn subscribe_to_operations( + pub async fn subscribe_to_operations( &self, processor: &P, cursor: Option, diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index 94adcd5..28fd76b 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -6,12 +6,16 @@ use async_trait::async_trait; use super::proto::Frame; use anyhow::anyhow; use atrium_api::app::bsky::feed::post::Record; -use atrium_api::com::atproto::sync::subscribe_repos::Commit; -use atrium_api::com::atproto::sync::subscribe_repos::Message; +use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; #[async_trait] -pub trait OperationProcessor { - async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()>; +pub trait CommitProcessor { + async fn process_commit(&self, commit: &CommitDetails) -> Result<()>; +} + +pub struct CommitDetails { + pub seq: i32, + pub operations: Vec, } #[derive(Debug)] @@ -28,16 +32,20 @@ pub enum Operation { }, } -pub async fn handle_message(message: &[u8], processor: &P) -> Result<()> { +pub async fn handle_message(message: &[u8], processor: &P) -> Result<()> { let commit = match parse_commit_from_message(&message)? { Some(commit) => commit, None => return Ok(()), }; - let post_operations = extract_operations(&commit).await?; - for operation in &post_operations { - processor.process_operation(&operation, &commit).await?; - } + let operations = extract_operations(&commit).await?; + + processor + .process_commit(&CommitDetails { + seq: commit.seq, + operations: operations, + }) + .await?; Ok(()) }