From 93c4979c71bceaa0afaac3bf8da660654a249d4f Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Thu, 21 Sep 2023 12:33:17 +0200 Subject: [PATCH] Keep subscription state in order to not lose messages This isn't a good way to do it though, because opreations processor is only called for each operation, so we end up not updating the cursor as often as we realistically should be. I'll refactor this slightly later --- README.md | 2 +- sql/01_create_tables.sql | 6 ++++ src/main.rs | 9 ++++-- src/processes/post_indexer.rs | 39 +++++++++++++++++++++-- src/services/bluesky/client.rs | 14 +++++++-- src/services/bluesky/streaming.rs | 4 +-- src/services/database.rs | 52 ++++++++++++++++++++++++++++--- 7 files changed, 110 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 479cd8b..925b058 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they - [x] Store posts in the database - [x] Store user profiles in the database - [x] Detect the country of residence from profile information -- [ ] Keep subscription state to not lose messages +- [x] Keep subscription state to not lose messages - [x] Serve the feed - [ ] Publish the feed - [ ] Handle deleting of posts diff --git a/sql/01_create_tables.sql b/sql/01_create_tables.sql index 83f720a..51165cd 100644 --- a/sql/01_create_tables.sql +++ b/sql/01_create_tables.sql @@ -13,3 +13,9 @@ CREATE TABLE IF NOT EXISTS Post ( uri TEXT UNIQUE, author_did TEXT REFERENCES Profile(did) ); + +CREATE TABLE IF NOT EXISTS SubscriptionState ( + id INT GENERATED ALWAYS AS IDENTITY, + service TEXT UNIQUE, + cursor INT +); diff --git a/src/main.rs b/src/main.rs index 14e09ae..e332b41 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,9 @@ mod services; use std::sync::Arc; use anyhow::Result; +use env_logger::Env; use lingua::LanguageDetectorBuilder; use log::info; -use env_logger::Env; use crate::algos::AlgosBuilder; use crate::algos::Nederlandskie; @@ -48,7 +48,12 @@ async fn main() -> Result<()> { .build(), ); - let post_indexer = PostIndexer::new(database.clone(), bluesky.clone(), algos.clone()); + let post_indexer = PostIndexer::new( + database.clone(), + bluesky.clone(), + algos.clone(), + config.clone(), + ); let profile_classifier = ProfileClassifier::new(database.clone(), ai.clone(), bluesky.clone()); let feed_server = FeedServer::new(database.clone(), config.clone(), algos.clone()); diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 41d137a..d6249e7 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -2,9 +2,11 @@ use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; +use atrium_api::com::atproto::sync::subscribe_repos::Commit; use log::info; use crate::algos::Algos; +use crate::config::Config; use crate::services::bluesky::{Bluesky, Operation, OperationProcessor}; use crate::services::Database; @@ -12,14 +14,21 @@ pub struct PostIndexer { database: Arc, bluesky: Arc, algos: Arc, + config: Arc, } impl PostIndexer { - pub fn new(database: Arc, bluesky: Arc, algos: Arc) -> Self { + pub fn new( + database: Arc, + bluesky: Arc, + algos: Arc, + config: Arc, + ) -> Self { Self { database, bluesky, algos, + config, } } } @@ -27,13 +36,27 @@ impl PostIndexer { impl PostIndexer { pub async fn start(&self) -> Result<()> { info!("Starting"); - Ok(self.bluesky.subscribe_to_operations(self).await?) + + let cursor = self + .database + .fetch_subscription_cursor(&self.config.service_did) + .await?; + + if cursor.is_none() { + self.database + .create_subscription_state(&self.config.service_did) + .await?; + } + + info!("Subscribing with cursor {:?}", cursor); + + Ok(self.bluesky.subscribe_to_operations(self, cursor).await?) } } #[async_trait] impl OperationProcessor for PostIndexer { - async fn process_operation(&self, operation: &Operation) -> Result<()> { + async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()> { match operation { Operation::CreatePost { author_did, @@ -63,6 +86,16 @@ impl OperationProcessor for PostIndexer { } }; + if commit.seq % 20 == 0 { + info!( + "Updating cursor for {} to {}", + self.config.service_did, commit.seq + ); + self.database + .update_subscription_cursor(&self.config.service_did, commit.seq) + .await?; + } + Ok(()) } } diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 1890ff7..21e55bf 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -3,8 +3,8 @@ use atrium_api::client::AtpServiceClient; use atrium_api::client::AtpServiceWrapper; use atrium_xrpc::client::reqwest::ReqwestClient; use futures::StreamExt; -use tokio_tungstenite::{connect_async, tungstenite}; use log::error; +use tokio_tungstenite::{connect_async, tungstenite}; use super::streaming::{handle_message, OperationProcessor}; @@ -54,9 +54,17 @@ impl Bluesky { pub async fn subscribe_to_operations( &self, processor: &P, + cursor: Option, ) -> Result<()> { - let (mut stream, _) = - connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?; + let url = match cursor { + Some(cursor) => format!( + "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos?cursor={}", + cursor + ), + None => "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos".to_owned(), + }; + + let (mut stream, _) = connect_async(url).await?; while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { if let Err(e) = handle_message(&message, processor).await { diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index 10829ab..94adcd5 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -11,7 +11,7 @@ use atrium_api::com::atproto::sync::subscribe_repos::Message; #[async_trait] pub trait OperationProcessor { - async fn process_operation(&self, operation: &Operation) -> Result<()>; + async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()>; } #[derive(Debug)] @@ -36,7 +36,7 @@ pub async fn handle_message(message: &[u8], processor: &P let post_operations = extract_operations(&commit).await?; for operation in &post_operations { - processor.process_operation(&operation).await?; + processor.process_operation(&operation, &commit).await?; } Ok(()) diff --git a/src/services/database.rs b/src/services/database.rs index 8a66e81..37bba47 100644 --- a/src/services/database.rs +++ b/src/services/database.rs @@ -19,11 +19,6 @@ pub struct Profile { likely_country_of_living: Option, } -pub struct SubscriptionState { - service: String, - cursor: i64, -} - pub struct Database { connection_pool: PgPool, } @@ -144,4 +139,51 @@ impl Database { .await .map(|result| result.rows_affected() > 0)?) } + + pub async fn fetch_subscription_cursor(&self, did: &str) -> Result> { + let mut params = Parameters::new(); + + Ok(query( + &select("cursor") + .from("SubscriptionState") + .where_(format!("service = {}", params.next())) + .to_string(), + ) + .bind(did) + .map(|r: PgRow| r.get("cursor")) + .fetch_optional(&self.connection_pool) + .await?) + } + + pub async fn create_subscription_state(&self, did: &str) -> Result { + let mut params = Parameters::new(); + + Ok(query( + &insert_into("SubscriptionState") + .columns(("service", "cursor")) + .values([params.next_array()]) + .to_string(), + ) + .bind(did) + .bind(0) + .execute(&self.connection_pool) + .await + .map(|result| result.rows_affected() > 0)?) + } + + pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result { + let mut params = Parameters::new(); + + Ok(query( + &update("SubscriptionState") + .set("cursor", params.next()) + .where_(format!("service = {}", params.next())) + .to_string(), + ) + .bind(cursor) + .bind(did) + .execute(&self.connection_pool) + .await + .map(|result| result.rows_affected() > 0)?) + } }