Properly handle errors in post indexer and profile classifier

Reconnect to Bluesky in the indexer
Don't exit the classifier just because we couldn't fetch profiles
This commit is contained in:
Aleksei Voronov 2023-10-02 16:23:08 +02:00
parent 1ac405e5ee
commit 4a08a283d2
3 changed files with 23 additions and 7 deletions

View File

@ -16,7 +16,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they
- [x] Handle errors in the web service gracefully - [x] Handle errors in the web service gracefully
- [x] Handle missing profiles in the profile classifier - [x] Handle missing profiles in the profile classifier
- [x] Add a way to mark a profile as being from a certain country manually - [x] Add a way to mark a profile as being from a certain country manually
- [ ] Handle reconnecting to websocket somehow - [x] Handle reconnecting to websocket somehow
- [ ] Publish the feed - [ ] Publish the feed
## Configuration ## Configuration

View File

@ -1,8 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use log::info; use log::{error, info};
use crate::algos::Algos; use crate::algos::Algos;
use crate::config::Config; use crate::config::Config;
@ -36,6 +37,18 @@ impl PostIndexer {
pub async fn start(&self) -> Result<()> { pub async fn start(&self) -> Result<()> {
info!("Starting"); info!("Starting");
loop {
if let Err(e) = self.process_from_last_point().await {
error!("Stopped because of an error: {}", e);
}
info!("Waiting 10 seconds before reconnecting...");
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
async fn process_from_last_point(&self) -> Result<()> {
let cursor = self let cursor = self
.database .database
.fetch_subscription_cursor(&self.config.feed_generator_did) .fetch_subscription_cursor(&self.config.feed_generator_did)

View File

@ -25,9 +25,13 @@ impl ProfileClassifier {
pub async fn start(&self) -> Result<()> { pub async fn start(&self) -> Result<()> {
info!("Starting"); info!("Starting");
loop { loop {
// TODO: Don't just exit this function when an error happens, just wait a minute or so? if let Err(e) = self.classify_unclassified_profiles().await {
self.classify_unclassified_profiles().await?; error!("Problem with classifying profiles: {}", e)
}
tokio::time::sleep(Duration::from_secs(10)).await;
} }
} }
@ -35,8 +39,9 @@ impl ProfileClassifier {
// TODO: Maybe streamify this so that each thing is processed in parallel // TODO: Maybe streamify this so that each thing is processed in parallel
let dids = self.database.fetch_unprocessed_profile_dids().await?; let dids = self.database.fetch_unprocessed_profile_dids().await?;
if dids.is_empty() { if dids.is_empty() {
info!("No profiles to process: waiting 10 seconds"); info!("No profiles to process");
} else { } else {
info!("Classifying {} new profiles", dids.len()); info!("Classifying {} new profiles", dids.len());
for did in &dids { for did in &dids {
@ -47,8 +52,6 @@ impl ProfileClassifier {
} }
} }
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(()) Ok(())
} }