Keep subscription state in order to not lose messages

This isn't a good way to do it though, because opreations processor is only called for each operation,
so we end up not updating the cursor as often as we realistically should be.

I'll refactor this slightly later
This commit is contained in:
Aleksei Voronov 2023-09-21 12:33:17 +02:00
parent 62b00ceed7
commit 93c4979c71
7 changed files with 110 additions and 16 deletions

View File

@ -10,7 +10,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they
- [x] Store posts in the database
- [x] Store user profiles in the database
- [x] Detect the country of residence from profile information
- [ ] Keep subscription state to not lose messages
- [x] Keep subscription state to not lose messages
- [x] Serve the feed
- [ ] Publish the feed
- [ ] Handle deleting of posts

View File

@ -13,3 +13,9 @@ CREATE TABLE IF NOT EXISTS Post (
uri TEXT UNIQUE,
author_did TEXT REFERENCES Profile(did)
);
CREATE TABLE IF NOT EXISTS SubscriptionState (
id INT GENERATED ALWAYS AS IDENTITY,
service TEXT UNIQUE,
cursor INT
);

View File

@ -6,9 +6,9 @@ mod services;
use std::sync::Arc;
use anyhow::Result;
use env_logger::Env;
use lingua::LanguageDetectorBuilder;
use log::info;
use env_logger::Env;
use crate::algos::AlgosBuilder;
use crate::algos::Nederlandskie;
@ -48,7 +48,12 @@ async fn main() -> Result<()> {
.build(),
);
let post_indexer = PostIndexer::new(database.clone(), bluesky.clone(), algos.clone());
let post_indexer = PostIndexer::new(
database.clone(),
bluesky.clone(),
algos.clone(),
config.clone(),
);
let profile_classifier = ProfileClassifier::new(database.clone(), ai.clone(), bluesky.clone());
let feed_server = FeedServer::new(database.clone(), config.clone(), algos.clone());

View File

@ -2,9 +2,11 @@ use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use atrium_api::com::atproto::sync::subscribe_repos::Commit;
use log::info;
use crate::algos::Algos;
use crate::config::Config;
use crate::services::bluesky::{Bluesky, Operation, OperationProcessor};
use crate::services::Database;
@ -12,14 +14,21 @@ pub struct PostIndexer {
database: Arc<Database>,
bluesky: Arc<Bluesky>,
algos: Arc<Algos>,
config: Arc<Config>,
}
impl PostIndexer {
pub fn new(database: Arc<Database>, bluesky: Arc<Bluesky>, algos: Arc<Algos>) -> Self {
pub fn new(
database: Arc<Database>,
bluesky: Arc<Bluesky>,
algos: Arc<Algos>,
config: Arc<Config>,
) -> Self {
Self {
database,
bluesky,
algos,
config,
}
}
}
@ -27,13 +36,27 @@ impl PostIndexer {
impl PostIndexer {
pub async fn start(&self) -> Result<()> {
info!("Starting");
Ok(self.bluesky.subscribe_to_operations(self).await?)
let cursor = self
.database
.fetch_subscription_cursor(&self.config.service_did)
.await?;
if cursor.is_none() {
self.database
.create_subscription_state(&self.config.service_did)
.await?;
}
info!("Subscribing with cursor {:?}", cursor);
Ok(self.bluesky.subscribe_to_operations(self, cursor).await?)
}
}
#[async_trait]
impl OperationProcessor for PostIndexer {
async fn process_operation(&self, operation: &Operation) -> Result<()> {
async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()> {
match operation {
Operation::CreatePost {
author_did,
@ -63,6 +86,16 @@ impl OperationProcessor for PostIndexer {
}
};
if commit.seq % 20 == 0 {
info!(
"Updating cursor for {} to {}",
self.config.service_did, commit.seq
);
self.database
.update_subscription_cursor(&self.config.service_did, commit.seq)
.await?;
}
Ok(())
}
}

View File

@ -3,8 +3,8 @@ use atrium_api::client::AtpServiceClient;
use atrium_api::client::AtpServiceWrapper;
use atrium_xrpc::client::reqwest::ReqwestClient;
use futures::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite};
use log::error;
use tokio_tungstenite::{connect_async, tungstenite};
use super::streaming::{handle_message, OperationProcessor};
@ -54,9 +54,17 @@ impl Bluesky {
pub async fn subscribe_to_operations<P: OperationProcessor>(
&self,
processor: &P,
cursor: Option<i32>,
) -> Result<()> {
let (mut stream, _) =
connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?;
let url = match cursor {
Some(cursor) => format!(
"wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos?cursor={}",
cursor
),
None => "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos".to_owned(),
};
let (mut stream, _) = connect_async(url).await?;
while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await {
if let Err(e) = handle_message(&message, processor).await {

View File

@ -11,7 +11,7 @@ use atrium_api::com::atproto::sync::subscribe_repos::Message;
#[async_trait]
pub trait OperationProcessor {
async fn process_operation(&self, operation: &Operation) -> Result<()>;
async fn process_operation(&self, operation: &Operation, commit: &Commit) -> Result<()>;
}
#[derive(Debug)]
@ -36,7 +36,7 @@ pub async fn handle_message<P: OperationProcessor>(message: &[u8], processor: &P
let post_operations = extract_operations(&commit).await?;
for operation in &post_operations {
processor.process_operation(&operation).await?;
processor.process_operation(&operation, &commit).await?;
}
Ok(())

View File

@ -19,11 +19,6 @@ pub struct Profile {
likely_country_of_living: Option<String>,
}
pub struct SubscriptionState {
service: String,
cursor: i64,
}
pub struct Database {
connection_pool: PgPool,
}
@ -144,4 +139,51 @@ impl Database {
.await
.map(|result| result.rows_affected() > 0)?)
}
pub async fn fetch_subscription_cursor(&self, did: &str) -> Result<Option<i32>> {
let mut params = Parameters::new();
Ok(query(
&select("cursor")
.from("SubscriptionState")
.where_(format!("service = {}", params.next()))
.to_string(),
)
.bind(did)
.map(|r: PgRow| r.get("cursor"))
.fetch_optional(&self.connection_pool)
.await?)
}
pub async fn create_subscription_state(&self, did: &str) -> Result<bool> {
let mut params = Parameters::new();
Ok(query(
&insert_into("SubscriptionState")
.columns(("service", "cursor"))
.values([params.next_array()])
.to_string(),
)
.bind(did)
.bind(0)
.execute(&self.connection_pool)
.await
.map(|result| result.rows_affected() > 0)?)
}
pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result<bool> {
let mut params = Parameters::new();
Ok(query(
&update("SubscriptionState")
.set("cursor", params.next())
.where_(format!("service = {}", params.next()))
.to_string(),
)
.bind(cursor)
.bind(did)
.execute(&self.connection_pool)
.await
.map(|result| result.rows_affected() > 0)?)
}
}