diff --git a/README.md b/README.md index 479cd8b..925b058 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they - [x] Store posts in the database - [x] Store user profiles in the database - [x] Detect the country of residence from profile information -- [ ] Keep subscription state to not lose messages +- [x] Keep subscription state to not lose messages - [x] Serve the feed - [ ] Publish the feed - [ ] Handle deleting of posts diff --git a/sql/01_create_tables.sql b/sql/01_create_tables.sql index 83f720a..51165cd 100644 --- a/sql/01_create_tables.sql +++ b/sql/01_create_tables.sql @@ -13,3 +13,9 @@ CREATE TABLE IF NOT EXISTS Post ( uri TEXT UNIQUE, author_did TEXT REFERENCES Profile(did) ); + +CREATE TABLE IF NOT EXISTS SubscriptionState ( + id INT GENERATED ALWAYS AS IDENTITY, + service TEXT UNIQUE, + cursor INT +); diff --git a/src/main.rs b/src/main.rs index 14e09ae..e332b41 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,9 @@ mod services; use std::sync::Arc; use anyhow::Result; +use env_logger::Env; use lingua::LanguageDetectorBuilder; use log::info; -use env_logger::Env; use crate::algos::AlgosBuilder; use crate::algos::Nederlandskie; @@ -48,7 +48,12 @@ async fn main() -> Result<()> { .build(), ); - let post_indexer = PostIndexer::new(database.clone(), bluesky.clone(), algos.clone()); + let post_indexer = PostIndexer::new( + database.clone(), + bluesky.clone(), + algos.clone(), + config.clone(), + ); let profile_classifier = ProfileClassifier::new(database.clone(), ai.clone(), bluesky.clone()); let feed_server = FeedServer::new(database.clone(), config.clone(), algos.clone()); diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 41d137a..d6249e7 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -2,9 +2,11 @@ use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; +use atrium_api::com::atproto::sync::subscribe_repos::Commit; use log::info; use crate::algos::Algos; +use crate::config::Config; use crate::services::bluesky::{Bluesky, Operation, OperationProcessor}; use crate::services::Database; @@ -12,14 +14,21 @@ pub struct PostIndexer { database: Arc, bluesky: Arc, algos: Arc, + config: Arc, } impl PostIndexer { - pub fn new(database: Arc, bluesky: Arc, algos: Arc) -> Self { + pub fn new( + database: Arc, + bluesky: Arc, + algos: Arc, + config: Arc, + ) -> Self { Self { database, bluesky, algos, + config, } } } @@ -27,13 +36,27 @@ impl PostIndexer { impl PostIndexer { pub async fn start(&self) -> Result<()> { info!("Starting"); - Ok(self.bluesky.subscribe_to_operations(self).await?) + + let cursor = self + .database + .fetch_subscription_cursor(&self.config.service_did) + .await?; + + if cursor.is_none() { + self.database + .create_subscription_state(&self.config.service_did) + .await?; + } + + info!("Subscribing with cursor {:?}", cursor); + + Ok(self.bluesky.subscribe_to_operations(self, cursor).await?) } } #[async_trait] impl OperationProcessor for PostIndexer { - async fn process_operation(&self, operation: &Operation) -> Result<()> { + async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()> { match operation { Operation::CreatePost { author_did, @@ -63,6 +86,16 @@ impl OperationProcessor for PostIndexer { } }; + if commit.seq % 20 == 0 { + info!( + "Updating cursor for {} to {}", + self.config.service_did, commit.seq + ); + self.database + .update_subscription_cursor(&self.config.service_did, commit.seq) + .await?; + } + Ok(()) } } diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 1890ff7..21e55bf 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -3,8 +3,8 @@ use atrium_api::client::AtpServiceClient; use atrium_api::client::AtpServiceWrapper; use atrium_xrpc::client::reqwest::ReqwestClient; use futures::StreamExt; -use tokio_tungstenite::{connect_async, tungstenite}; use log::error; +use tokio_tungstenite::{connect_async, tungstenite}; use super::streaming::{handle_message, OperationProcessor}; @@ -54,9 +54,17 @@ impl Bluesky { pub async fn subscribe_to_operations( &self, processor: &P, + cursor: Option, ) -> Result<()> { - let (mut stream, _) = - connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?; + let url = match cursor { + Some(cursor) => format!( + "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos?cursor={}", + cursor + ), + None => "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos".to_owned(), + }; + + let (mut stream, _) = connect_async(url).await?; while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { if let Err(e) = handle_message(&message, processor).await { diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index 10829ab..94adcd5 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -11,7 +11,7 @@ use atrium_api::com::atproto::sync::subscribe_repos::Message; #[async_trait] pub trait OperationProcessor { - async fn process_operation(&self, operation: &Operation) -> Result<()>; + async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()>; } #[derive(Debug)] @@ -36,7 +36,7 @@ pub async fn handle_message(message: &[u8], processor: &P let post_operations = extract_operations(&commit).await?; for operation in &post_operations { - processor.process_operation(&operation).await?; + processor.process_operation(&operation, &commit).await?; } Ok(()) diff --git a/src/services/database.rs b/src/services/database.rs index 8a66e81..37bba47 100644 --- a/src/services/database.rs +++ b/src/services/database.rs @@ -19,11 +19,6 @@ pub struct Profile { likely_country_of_living: Option, } -pub struct SubscriptionState { - service: String, - cursor: i64, -} - pub struct Database { connection_pool: PgPool, } @@ -144,4 +139,51 @@ impl Database { .await .map(|result| result.rows_affected() > 0)?) } + + pub async fn fetch_subscription_cursor(&self, did: &str) -> Result> { + let mut params = Parameters::new(); + + Ok(query( + &select("cursor") + .from("SubscriptionState") + .where_(format!("service = {}", params.next())) + .to_string(), + ) + .bind(did) + .map(|r: PgRow| r.get("cursor")) + .fetch_optional(&self.connection_pool) + .await?) + } + + pub async fn create_subscription_state(&self, did: &str) -> Result { + let mut params = Parameters::new(); + + Ok(query( + &insert_into("SubscriptionState") + .columns(("service", "cursor")) + .values([params.next_array()]) + .to_string(), + ) + .bind(did) + .bind(0) + .execute(&self.connection_pool) + .await + .map(|result| result.rows_affected() > 0)?) + } + + pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result { + let mut params = Parameters::new(); + + Ok(query( + &update("SubscriptionState") + .set("cursor", params.next()) + .where_(format!("service = {}", params.next())) + .to_string(), + ) + .bind(cursor) + .bind(did) + .execute(&self.connection_pool) + .await + .map(|result| result.rows_affected() > 0)?) + } }