diff --git a/Cargo.lock b/Cargo.lock index 196a005..02ecdf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2249,9 +2249,11 @@ dependencies = [ "chrono", "ciborium", "dotenv", + "env_logger", "futures", "libipld-core", "lingua", + "log", "once_cell", "rs-car", "scooby", diff --git a/Cargo.toml b/Cargo.toml index e13cc3c..1eab673 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,11 @@ chat-gpt-lib-rs = "0.2.1" chrono = "0.4.31" ciborium = "0.2.1" dotenv = "0.15.0" +env_logger = "0.10.0" futures = "0.3.28" libipld-core = { version = "0.16.0", features = ["serde-codec"] } lingua = "1.5.0" +log = "0.4.20" once_cell = "1.18.0" rs-car = "0.4.1" scooby = "0.5.0" diff --git a/src/main.rs b/src/main.rs index fbda0e3..14e09ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,8 @@ use std::sync::Arc; use anyhow::Result; use lingua::LanguageDetectorBuilder; +use log::info; +use env_logger::Env; use crate::algos::AlgosBuilder; use crate::algos::Nederlandskie; @@ -20,11 +22,20 @@ use crate::services::AI; #[tokio::main] async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); + + info!("Loading configuration"); + let config = Arc::new(Config::load()?); + info!("Initializing service clients"); + let ai = Arc::new(AI::new(&config.chat_gpt_api_key, "https://api.openai.com")); let bluesky = Arc::new(Bluesky::new("https://bsky.social")); let database = Arc::new(Database::connect(&config.database_url).await?); + + info!("Initializing language detector"); + let language_detector = Arc::new( LanguageDetectorBuilder::from_all_languages() .with_preloaded_language_models() @@ -41,6 +52,8 @@ async fn main() -> Result<()> { let profile_classifier = ProfileClassifier::new(database.clone(), ai.clone(), bluesky.clone()); let feed_server = FeedServer::new(database.clone(), config.clone(), algos.clone()); + info!("Starting everything up"); + tokio::try_join!( post_indexer.start(), profile_classifier.start(), diff --git a/src/processes/feed_server/server.rs b/src/processes/feed_server/server.rs index f7796b9..d53c34c 100644 --- a/src/processes/feed_server/server.rs +++ b/src/processes/feed_server/server.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::Result; use axum::routing::get; use axum::{Router, Server}; +use log::info; use crate::algos::Algos; use crate::config::Config; @@ -46,6 +47,9 @@ impl FeedServer { }); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + + info!("Serving feed on {}", addr); + Server::bind(&addr).serve(app.into_make_service()).await?; Ok(()) } diff --git a/src/processes/post_indexer.rs b/src/processes/post_indexer.rs index 91caa6e..41d137a 100644 --- a/src/processes/post_indexer.rs +++ b/src/processes/post_indexer.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; +use log::info; use crate::algos::Algos; use crate::services::bluesky::{Bluesky, Operation, OperationProcessor}; @@ -25,6 +26,7 @@ impl PostIndexer { impl PostIndexer { pub async fn start(&self) -> Result<()> { + info!("Starting"); Ok(self.bluesky.subscribe_to_operations(self).await?) } } @@ -45,7 +47,7 @@ impl OperationProcessor for PostIndexer { .iter_all() .any(|a| a.should_index_post(author_did, languages, text)) { - println!("received insertable post from {author_did}: {text}"); + info!("Received insertable post from {author_did}: {text}"); self.database .insert_profile_if_it_doesnt_exist(&author_did) @@ -54,7 +56,7 @@ impl OperationProcessor for PostIndexer { } } Operation::DeletePost { uri } => { - println!("received a post do delete: {uri}"); + info!("Received a post to delete: {uri}"); // TODO: Delete posts from db // self.database.delete_post(&self.db_connection_pool, &uri).await?; diff --git a/src/processes/profile_classifier.rs b/src/processes/profile_classifier.rs index ace45c2..058861e 100644 --- a/src/processes/profile_classifier.rs +++ b/src/processes/profile_classifier.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use log::info; use crate::services::Bluesky; use crate::services::Database; @@ -23,6 +24,7 @@ impl ProfileClassifier { } pub async fn start(&self) -> Result<()> { + info!("Starting"); loop { // TODO: Don't just exit this function when an error happens, just wait a minute or so? self.classify_unclassified_profiles().await?; @@ -34,9 +36,10 @@ impl ProfileClassifier { let dids = self.database.fetch_unprocessed_profile_dids().await?; if dids.is_empty() { - println!("No profiles to process: waiting 10 seconds"); + info!("No profiles to process: waiting 10 seconds"); tokio::time::sleep(Duration::from_secs(10)).await; } else { + info!("Classifying {} new profiles", dids.len()); for did in &dids { self.fill_in_profile_details(did).await?; } @@ -52,7 +55,7 @@ impl ProfileClassifier { .infer_country_of_living(&details.display_name, &details.description) .await?; self.database.store_profile_details(did, &country).await?; - println!("Stored inferred country of living for {did}: {country}"); + info!("Stored inferred country of living for {did}: {country}"); Ok(()) } } diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 490578d..1890ff7 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -4,6 +4,7 @@ use atrium_api::client::AtpServiceWrapper; use atrium_xrpc::client::reqwest::ReqwestClient; use futures::StreamExt; use tokio_tungstenite::{connect_async, tungstenite}; +use log::error; use super::streaming::{handle_message, OperationProcessor}; @@ -59,7 +60,7 @@ impl Bluesky { while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { if let Err(e) = handle_message(&message, processor).await { - println!("Error handling a message: {:?}", e); + error!("Error handling a message: {:?}", e); } }