Refactor things

Make the overall design a little more flexible if one wants
to easily implement multiple feeds, and also kinda closer to
the original template provided by Bluesky devs
This commit is contained in:
Aleksei Voronov 2023-09-16 21:07:00 +02:00
parent fb17aa3e6a
commit 14b9f846da
9 changed files with 113 additions and 30 deletions

1
Cargo.lock generated
View File

@ -1303,6 +1303,7 @@ dependencies = [
"dotenv", "dotenv",
"futures", "futures",
"libipld-core", "libipld-core",
"once_cell",
"rs-car", "rs-car",
"scooby", "scooby",
"serde", "serde",

View File

@ -17,6 +17,7 @@ ciborium = "0.2.1"
dotenv = "0.15.0" dotenv = "0.15.0"
futures = "0.3.28" futures = "0.3.28"
libipld-core = { version = "0.16.0", features = ["serde-codec"] } libipld-core = { version = "0.16.0", features = ["serde-codec"] }
once_cell = "1.18.0"
rs-car = "0.4.1" rs-car = "0.4.1"
scooby = "0.5.0" scooby = "0.5.0"
serde = "1.0.188" serde = "1.0.188"

45
src/algos.rs Normal file
View File

@ -0,0 +1,45 @@
mod nederlandskie;
use std::collections::{HashMap, HashSet};
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use crate::services::database::{Database, Post};
use self::nederlandskie::Nederlandskie;
#[async_trait]
pub trait Algo {
fn should_index_post(&self, author_did: &str, languages: &HashSet<String>, text: &str) -> bool;
async fn fetch_posts(
&self,
database: &Database,
limit: i32,
earlier_than: Option<(DateTime<Utc>, &str)>,
) -> Result<Vec<Post>>;
}
pub type AnyAlgo = Box<dyn Algo + Sync + Send>;
pub type AlgosMap = HashMap<&'static str, AnyAlgo>;
static ALL_ALGOS: Lazy<AlgosMap> = Lazy::new(|| {
let mut m = AlgosMap::new();
m.insert("nederlandskie", Box::new(Nederlandskie));
m
});
pub fn iter_names() -> impl Iterator<Item = &'static str> {
ALL_ALGOS.keys().map(|s| *s)
}
pub fn iter_all() -> impl Iterator<Item = &'static AnyAlgo> {
ALL_ALGOS.values()
}
pub fn get_by_name(name: &str) -> Option<&'static AnyAlgo> {
ALL_ALGOS.get(name)
}

View File

@ -0,0 +1,37 @@
use std::collections::HashSet;
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use super::Algo;
use crate::services::{database::Post, Database};
pub struct Nederlandskie;
/// An algorithm that serves posts written in Russian by people living in Netherlands
#[async_trait]
impl Algo for Nederlandskie {
fn should_index_post(
&self,
_author_did: &str,
languages: &HashSet<String>,
_text: &str,
) -> bool {
// BlueSky gets confused a lot about Russian vs Ukrainian, so skip posts
// that may be in Ukrainian regardless of whether Russian is in the list
languages.contains("ru") && !languages.contains("uk")
}
async fn fetch_posts(
&self,
database: &Database,
limit: i32,
earlier_than: Option<(DateTime<Utc>, &str)>,
) -> Result<Vec<Post>> {
Ok(database
.fetch_posts_by_authors_country("nl", limit as usize, earlier_than)
.await?)
}
}

View File

@ -1,3 +1,4 @@
mod algos;
mod config; mod config;
mod processes; mod processes;
mod services; mod services;
@ -8,9 +9,9 @@ use crate::config::Config;
use crate::processes::FeedServer; use crate::processes::FeedServer;
use crate::processes::PostIndexer; use crate::processes::PostIndexer;
use crate::processes::ProfileClassifier; use crate::processes::ProfileClassifier;
use crate::services::AI;
use crate::services::Bluesky; use crate::services::Bluesky;
use crate::services::Database; use crate::services::Database;
use crate::services::AI;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {

View File

@ -3,6 +3,7 @@ use atrium_api::app::bsky::feed::describe_feed_generator::{
}; };
use axum::{extract::State, Json}; use axum::{extract::State, Json};
use crate::algos;
use crate::processes::feed_server::state::FeedServerState; use crate::processes::feed_server::state::FeedServerState;
pub async fn describe_feed_generator( pub async fn describe_feed_generator(
@ -10,12 +11,14 @@ pub async fn describe_feed_generator(
) -> Json<FeedGeneratorDescription> { ) -> Json<FeedGeneratorDescription> {
Json(FeedGeneratorDescription { Json(FeedGeneratorDescription {
did: state.config.service_did.clone(), did: state.config.service_did.clone(),
feeds: vec![Feed { feeds: algos::iter_names()
.map(|name| Feed {
uri: format!( uri: format!(
"at://{}/app.bsky.feed.generator/{}", "at://{}/app.bsky.feed.generator/{}",
state.config.publisher_did, "nederlandskie" state.config.publisher_did, name
), ),
}], })
.collect(),
links: None, links: None,
}) })
} }

View File

@ -7,13 +7,18 @@ use axum::extract::{Query, State};
use axum::Json; use axum::Json;
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use crate::algos;
use crate::processes::feed_server::state::FeedServerState; use crate::processes::feed_server::state::FeedServerState;
pub async fn get_feed_skeleton( pub async fn get_feed_skeleton(
State(state): State<FeedServerState>, State(state): State<FeedServerState>,
query: Query<FeedSkeletonQuery>, query: Query<FeedSkeletonQuery>,
) -> Json<FeedSkeleton> { ) -> Json<FeedSkeleton> {
let limit = query.limit.unwrap_or(20) as usize; let algo = algos::get_by_name(&query.feed)
.ok_or_else(|| anyhow!("Feed {} not found", query.feed))
.unwrap(); // TODO: handle error
let limit = query.limit.unwrap_or(20);
let earlier_than = query let earlier_than = query
.cursor .cursor
.as_deref() .as_deref()
@ -21,11 +26,10 @@ pub async fn get_feed_skeleton(
.transpose() .transpose()
.unwrap(); // TODO: handle error .unwrap(); // TODO: handle error
let posts = state let posts = algo
.database .fetch_posts(&state.database, limit, earlier_than)
.fetch_posts_by_authors_country("ru", limit, earlier_than)
.await .await
.unwrap(); .unwrap(); // TODO: handle error
let feed = posts let feed = posts
.iter() .iter()

View File

@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use crate::algos;
use crate::services::bluesky::{Bluesky, Operation, OperationProcessor}; use crate::services::bluesky::{Bluesky, Operation, OperationProcessor};
use crate::services::Database; use crate::services::Database;
@ -32,18 +33,7 @@ impl<'a> OperationProcessor for PostIndexer<'a> {
languages, languages,
text, text,
} => { } => {
// TODO: Configure this via env vars if algos::iter_all().any(|a| a.should_index_post(author_did, languages, text)) {
if !languages.contains("ru") {
return Ok(());
}
// BlueSky gets confused a lot about Russian vs Ukrainian, so skip posts
// that may be in Ukrainian regardless of whether Russian is in the list
// TODO: Configure this via env vars
if languages.contains("uk") {
return Ok(());
}
println!("received insertable post from {author_did}: {text}"); println!("received insertable post from {author_did}: {text}");
self.database self.database
@ -51,6 +41,7 @@ impl<'a> OperationProcessor for PostIndexer<'a> {
.await?; .await?;
self.database.insert_post(&author_did, &cid, &uri).await?; self.database.insert_post(&author_did, &cid, &uri).await?;
} }
}
Operation::DeletePost { uri } => { Operation::DeletePost { uri } => {
println!("received a post do delete: {uri}"); println!("received a post do delete: {uri}");

View File

@ -1,6 +1,6 @@
mod ai; mod ai;
pub mod bluesky; pub mod bluesky;
mod database; pub mod database;
pub use ai::AI; pub use ai::AI;
pub use bluesky::Bluesky; pub use bluesky::Bluesky;