From 96915ca986f1534ed93bb6a87c21267d9363d72c Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Sun, 15 Oct 2023 11:55:25 +0200 Subject: [PATCH] Pass around bluesky objects instead of restructuring them everywhere Makes thing a little simpler in many places, especially once I start adding more fields to posts --- src/algos.rs | 15 ++++++--------- src/algos/nederlandskie.rs | 11 +++++------ src/processes/post_indexer.rs | 7 +++---- src/services/bluesky.rs | 1 + src/services/bluesky/entities/follow.rs | 1 + src/services/bluesky/entities/like.rs | 2 ++ src/services/bluesky/entities/post.rs | 6 +++++- src/services/bluesky/streaming.rs | 24 ++++++++++-------------- 8 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/algos.rs b/src/algos.rs index ca89486..8abf20c 100644 --- a/src/algos.rs +++ b/src/algos.rs @@ -1,30 +1,27 @@ mod nederlandskie; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use crate::services::database::{Database, Post}; +use crate::services::bluesky; +use crate::services::database::{self, Database}; pub use self::nederlandskie::Nederlandskie; #[async_trait] pub trait Algo { - async fn should_index_post( - &self, - author_did: &str, - languages: &HashSet, - text: &str, - ) -> Result; + async fn should_index_post(&self, author_did: &str, post: &bluesky::PostRecord) + -> Result; async fn fetch_posts( &self, database: &Database, limit: i32, earlier_than: Option<(DateTime, &str)>, - ) -> Result>; + ) -> Result>; } pub type AnyAlgo = Box; diff --git a/src/algos/nederlandskie.rs b/src/algos/nederlandskie.rs index 6004333..cc220b0 100644 --- a/src/algos/nederlandskie.rs +++ b/src/algos/nederlandskie.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use anyhow::Result; @@ -9,7 +8,8 @@ use lingua::LanguageDetector; use super::Algo; -use crate::services::{database::Post, Database}; +use crate::services::bluesky; +use crate::services::database::{self, Database}; pub struct Nederlandskie { language_detector: Arc, @@ -27,10 +27,9 @@ impl Algo for Nederlandskie { async fn should_index_post( &self, _author_did: &str, - _languages: &HashSet, - text: &str, + post: &bluesky::PostRecord, ) -> Result { - Ok(self.language_detector.detect_language_of(text) == Some(Russian)) + Ok(self.language_detector.detect_language_of(&post.text) == Some(Russian)) } async fn fetch_posts( @@ -38,7 +37,7 @@ impl Algo for Nederlandskie { database: &Database, limit: i32, earlier_than: Option<(DateTime, &str)>, - ) -> Result> { + ) -> Result> { Ok(database .fetch_posts_by_authors_country("nl", limit as usize, earlier_than) .await?) diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index ba88f11..848d9d1 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -75,12 +75,11 @@ impl CommitProcessor for PostIndexer { author_did, cid, uri, - languages, - text, + post, } => { for algo in self.algos.iter_all() { - if algo.should_index_post(author_did, languages, text).await? { - info!("Received insertable post from {author_did}: {text}"); + if algo.should_index_post(author_did, post).await? { + info!("Received insertable post from {author_did}: {}", post.text); self.database .insert_profile_if_it_doesnt_exist(author_did) diff --git a/src/services/bluesky.rs b/src/services/bluesky.rs index 27b59bb..76d08f3 100644 --- a/src/services/bluesky.rs +++ b/src/services/bluesky.rs @@ -4,4 +4,5 @@ mod internals; mod streaming; pub use client::Bluesky; +pub use entities::{FollowRecord, LikeRecord, PostRecord, Session}; pub use streaming::{CommitDetails, CommitProcessor, Operation}; diff --git a/src/services/bluesky/entities/follow.rs b/src/services/bluesky/entities/follow.rs index 8c96f13..6316be7 100644 --- a/src/services/bluesky/entities/follow.rs +++ b/src/services/bluesky/entities/follow.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, Error, Result}; use crate::services::bluesky::internals::cbor::CborValue; +#[derive(Debug)] pub struct FollowRecord { pub subject: String, } diff --git a/src/services/bluesky/entities/like.rs b/src/services/bluesky/entities/like.rs index 40282d3..c98cef7 100644 --- a/src/services/bluesky/entities/like.rs +++ b/src/services/bluesky/entities/like.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, Error, Result}; use crate::services::bluesky::internals::cbor::CborValue; +#[derive(Debug)] pub struct LikeRecord { pub subject: Subject, } @@ -23,6 +24,7 @@ impl TryFrom for LikeRecord { } } +#[derive(Debug)] pub struct Subject { pub cid: String, pub uri: String, diff --git a/src/services/bluesky/entities/post.rs b/src/services/bluesky/entities/post.rs index 1facfef..60e18f6 100644 --- a/src/services/bluesky/entities/post.rs +++ b/src/services/bluesky/entities/post.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, Error, Result}; use crate::services::bluesky::internals::cbor::CborValue; +#[derive(Debug)] pub struct PostRecord { pub langs: Option>, pub text: String, @@ -20,7 +21,10 @@ impl TryFrom for PostRecord { .remove("text") .ok_or_else(|| anyhow!("Missing field: text"))? .try_into()?, - langs: map.remove("langs").map(|value| value.try_into()).transpose()?, + langs: map + .remove("langs") + .map(|value| value.try_into()) + .transpose()?, }) } } diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index aea8590..99eeafe 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use anyhow::Result; use async_trait::async_trait; @@ -35,21 +35,19 @@ pub enum Operation { author_did: String, cid: String, uri: String, - languages: HashSet, - text: String, + post: PostRecord, }, CreateLike { author_did: String, cid: String, uri: String, - subject_cid: String, - subject_uri: String, + like: LikeRecord, }, CreateFollow { author_did: String, cid: String, uri: String, - subject: String, + follow: FollowRecord, }, DeletePost { uri: String, @@ -116,35 +114,33 @@ async fn extract_operations(commit: &Commit) -> Result> { match collection { COLLECTION_POST => { - let record: PostRecord = read_record(block)?; + let post: PostRecord = read_record(block)?; Operation::CreatePost { author_did: commit.repo.clone(), cid: cid.to_string(), uri, - languages: record.langs.unwrap_or_default().iter().cloned().collect(), - text: record.text, + post, } } COLLECTION_LIKE => { - let record: LikeRecord = read_record(block)?; + let like: LikeRecord = read_record(block)?; Operation::CreateLike { author_did: commit.repo.clone(), cid: cid.to_string(), uri, - subject_cid: record.subject.cid, - subject_uri: record.subject.uri, + like, } } COLLECTION_FOLLOW => { - let record: FollowRecord = read_record(block)?; + let follow: FollowRecord = read_record(block)?; Operation::CreateFollow { author_did: commit.repo.clone(), cid: cid.to_string(), uri, - subject: record.subject, + follow, } } _ => continue,