Refactor streaming stuff

Now we call the processor once per commit, and it's also now
a commit processor, not an operation processor, so that we can
update the cursor properly
This commit is contained in:
Aleksei Voronov 2023-09-22 12:37:10 +02:00
parent 08dc55b2cd
commit 5128bf9d4a
4 changed files with 47 additions and 38 deletions

View File

@ -2,12 +2,11 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use atrium_api::com::atproto::sync::subscribe_repos::Commit;
use log::info; use log::info;
use crate::algos::Algos; use crate::algos::Algos;
use crate::config::Config; use crate::config::Config;
use crate::services::bluesky::{Bluesky, Operation, OperationProcessor}; use crate::services::bluesky::{Bluesky, CommitDetails, CommitProcessor, Operation};
use crate::services::Database; use crate::services::Database;
pub struct PostIndexer { pub struct PostIndexer {
@ -55,37 +54,39 @@ impl PostIndexer {
} }
#[async_trait] #[async_trait]
impl OperationProcessor for PostIndexer { impl CommitProcessor for PostIndexer {
async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()> { async fn process_commit(&self, commit: &CommitDetails) -> Result<()> {
match operation { for operation in &commit.operations {
Operation::CreatePost { match operation {
author_did, Operation::CreatePost {
cid, author_did,
uri, cid,
languages, uri,
text, languages,
} => { text,
for algo in self.algos.iter_all() { } => {
if algo.should_index_post(author_did, languages, text).await? { for algo in self.algos.iter_all() {
info!("Received insertable post from {author_did}: {text}"); if algo.should_index_post(author_did, languages, text).await? {
info!("Received insertable post from {author_did}: {text}");
self.database self.database
.insert_profile_if_it_doesnt_exist(&author_did) .insert_profile_if_it_doesnt_exist(&author_did)
.await?; .await?;
self.database.insert_post(&author_did, &cid, &uri).await?; self.database.insert_post(author_did, cid, uri).await?;
break; break;
}
} }
} }
} Operation::DeletePost { uri } => {
Operation::DeletePost { uri } => { info!("Received a post to delete: {uri}");
info!("Received a post to delete: {uri}");
// TODO: Delete posts from db // TODO: Delete posts from db
// self.database.delete_post(&self.db_connection_pool, &uri).await?; // self.database.delete_post(&self.db_connection_pool, &uri).await?;
}
} }
}; }
if commit.seq % 20 == 0 { if commit.seq % 20 == 0 {
info!( info!(

View File

@ -3,4 +3,4 @@ mod proto;
mod streaming; mod streaming;
pub use client::Bluesky; pub use client::Bluesky;
pub use streaming::{Operation, OperationProcessor}; pub use streaming::{CommitDetails, CommitProcessor, Operation};

View File

@ -6,7 +6,7 @@ use futures::StreamExt;
use log::error; use log::error;
use tokio_tungstenite::{connect_async, tungstenite}; use tokio_tungstenite::{connect_async, tungstenite};
use super::streaming::{handle_message, OperationProcessor}; use super::streaming::{handle_message, CommitProcessor};
#[derive(Debug)] #[derive(Debug)]
pub struct ProfileDetails { pub struct ProfileDetails {
@ -51,7 +51,7 @@ impl Bluesky {
}) })
} }
pub async fn subscribe_to_operations<P: OperationProcessor>( pub async fn subscribe_to_operations<P: CommitProcessor>(
&self, &self,
processor: &P, processor: &P,
cursor: Option<i32>, cursor: Option<i32>,

View File

@ -6,12 +6,16 @@ use async_trait::async_trait;
use super::proto::Frame; use super::proto::Frame;
use anyhow::anyhow; use anyhow::anyhow;
use atrium_api::app::bsky::feed::post::Record; use atrium_api::app::bsky::feed::post::Record;
use atrium_api::com::atproto::sync::subscribe_repos::Commit; use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message};
use atrium_api::com::atproto::sync::subscribe_repos::Message;
#[async_trait] #[async_trait]
pub trait OperationProcessor { pub trait CommitProcessor {
async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()>; async fn process_commit(&self, commit: &CommitDetails) -> Result<()>;
}
pub struct CommitDetails {
pub seq: i32,
pub operations: Vec<Operation>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -28,16 +32,20 @@ pub enum Operation {
}, },
} }
pub async fn handle_message<P: OperationProcessor>(message: &[u8], processor: &P) -> Result<()> { pub async fn handle_message<P: CommitProcessor>(message: &[u8], processor: &P) -> Result<()> {
let commit = match parse_commit_from_message(&message)? { let commit = match parse_commit_from_message(&message)? {
Some(commit) => commit, Some(commit) => commit,
None => return Ok(()), None => return Ok(()),
}; };
let post_operations = extract_operations(&commit).await?; let operations = extract_operations(&commit).await?;
for operation in &post_operations {
processor.process_operation(&operation, &commit).await?; processor
} .process_commit(&CommitDetails {
seq: commit.seq,
operations: operations,
})
.await?;
Ok(()) Ok(())
} }