From 4a08a283d2fa138a96830c833c94decff2b668a6 Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Mon, 2 Oct 2023 16:23:08 +0200 Subject: [PATCH] Properly handle errors in post indexer and profile classifier Reconnect to Bluesky in the indexer Don't exit the classifier just because we couldn't fetch profiles --- README.md | 2 +- src/processes/post_indexer.rs | 15 ++++++++++++++- src/processes/profile_classifier.rs | 13 ++++++++----- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 14078f7..8d0d30d 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they - [x] Handle errors in the web service gracefully - [x] Handle missing profiles in the profile classifier - [x] Add a way to mark a profile as being from a certain country manually -- [ ] Handle reconnecting to websocket somehow +- [x] Handle reconnecting to websocket somehow - [ ] Publish the feed ## Configuration diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 4bc9bdb..40fc4a6 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use log::info; +use log::{error, info}; use crate::algos::Algos; use crate::config::Config; @@ -36,6 +37,18 @@ impl PostIndexer { pub async fn start(&self) -> Result<()> { info!("Starting"); + loop { + if let Err(e) = self.process_from_last_point().await { + error!("Stopped because of an error: {}", e); + } + + info!("Waiting 10 seconds before reconnecting..."); + + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + + async fn process_from_last_point(&self) -> Result<()> { let cursor = self .database .fetch_subscription_cursor(&self.config.feed_generator_did) diff --git a/src/processes/profile_classifier.rs b/src/processes/profile_classifier.rs index 8c94544..c54c12b 100644 --- a/src/processes/profile_classifier.rs +++ b/src/processes/profile_classifier.rs @@ -25,9 +25,13 @@ impl ProfileClassifier { pub async fn start(&self) -> Result<()> { info!("Starting"); + loop { - // TODO: Don't just exit this function when an error happens, just wait a minute or so? - self.classify_unclassified_profiles().await?; + if let Err(e) = self.classify_unclassified_profiles().await { + error!("Problem with classifying profiles: {}", e) + } + + tokio::time::sleep(Duration::from_secs(10)).await; } } @@ -35,8 +39,9 @@ impl ProfileClassifier { // TODO: Maybe streamify this so that each thing is processed in parallel let dids = self.database.fetch_unprocessed_profile_dids().await?; + if dids.is_empty() { - info!("No profiles to process: waiting 10 seconds"); + info!("No profiles to process"); } else { info!("Classifying {} new profiles", dids.len()); for did in &dids { @@ -47,8 +52,6 @@ impl ProfileClassifier { } } - tokio::time::sleep(Duration::from_secs(10)).await; - Ok(()) }