From 1ac405e5ee463f174769e0d1eafea301360a0b45 Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Wed, 27 Sep 2023 13:22:26 +0200 Subject: [PATCH] Add a way to manually mark a certain profile as being from a specific country --- README.md | 2 +- src/bin/force_profile_country.rs | 50 ++++++++++++++++++++++++++++++++ src/services/bluesky/client.rs | 37 +++++++++++++++++++++++ src/services/database.rs | 43 +++++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 src/bin/force_profile_country.rs diff --git a/README.md b/README.md index 31b7f7e..14078f7 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they - [x] Handle deleting of posts - [x] Handle errors in the web service gracefully - [x] Handle missing profiles in the profile classifier -- [ ] Add a way to mark a profile as being from a certain country manually +- [x] Add a way to mark a profile as being from a certain country manually - [ ] Handle reconnecting to websocket somehow - [ ] Publish the feed diff --git a/src/bin/force_profile_country.rs b/src/bin/force_profile_country.rs new file mode 100644 index 0000000..5bb0590 --- /dev/null +++ b/src/bin/force_profile_country.rs @@ -0,0 +1,50 @@ +extern crate nederlandskie; + +use std::env; + +use anyhow::{anyhow, Context, Result}; +use clap::Parser; +use dotenv::dotenv; + +use nederlandskie::services::{Bluesky, Database}; + +#[derive(Parser, Debug)] +struct Args { + /// Handle of the user to force the country for + #[arg(long)] + handle: String, + + /// Country to use, two letters + #[arg(long)] + country: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenv()?; + + let args = Args::parse(); + + let database_url = + env::var("DATABASE_URL").context("DATABASE_URL environment variable must be set")?; + + let bluesky = Bluesky::new("https://bsky.social"); + + let did = bluesky + .resolve_handle(&args.handle) + .await? + .ok_or_else(|| anyhow!("No such user: {}", args.handle))?; + + println!("Resolved handle '{}' to did '{}'", args.handle, did); + + let database = Database::connect(&database_url).await?; + + database.force_profile_country(&did, &args.country).await?; + + println!( + "Stored '{}' as the country for profile with did '{}'", + args.country, did + ); + + Ok(()) +} diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 8a195a5..89bec1d 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -139,6 +139,27 @@ impl Bluesky { })) } + pub async fn resolve_handle(&self, handle: &str) -> Result> { + use atrium_api::com::atproto::identity::resolve_handle::Parameters; + + let result = self + .client + .service + .com + .atproto + .identity + .resolve_handle(Parameters { + handle: handle.to_owned(), + }) + .await; + + match result { + Ok(result) => Ok(Some(result.did)), + Err(e) if is_unable_to_resolve_handle_error(&e) => Ok(None), + Err(e) => Err(e.into()), + } + } + pub async fn subscribe_to_operations( &self, processor: &P, @@ -179,3 +200,19 @@ fn is_missing_record_error(error: &atrium_xrpc::error::Error) -> bool { && error_message.starts_with("Could not locate record") ) } + +fn is_unable_to_resolve_handle_error(error: &atrium_xrpc::error::Error) -> bool { + use atrium_xrpc::error::{Error, ErrorResponseBody, XrpcError, XrpcErrorKind}; + + matches!(error, + Error::XrpcResponse(XrpcError { + status: StatusCode::BAD_REQUEST, + error: + Some(XrpcErrorKind::Undefined(ErrorResponseBody { + error: Some(error_code), + message: Some(error_message), + })), + }) if error_code == "InvalidRequest" + && error_message.starts_with("Unable to resolve handle") + ) +} diff --git a/src/services/database.rs b/src/services/database.rs index ccbd3e4..0b41496 100644 --- a/src/services/database.rs +++ b/src/services/database.rs @@ -149,6 +149,49 @@ impl Database { .map(|result| result.rows_affected() > 0)?) } + pub async fn force_profile_country( + &self, + did: &str, + likely_country_of_living: &str, + ) -> Result { + let transaction = self.connection_pool.begin().await?; + + { + let mut params = Parameters::new(); + + query( + &insert_into("Profile") + .columns(("did",)) + .values([params.next()]) + .on_conflict() + .do_nothing() + .to_string(), + ) + .bind(did) + .execute(&self.connection_pool) + .await?; + } + + { + let mut params = Parameters::new(); + query( + &update("Profile") + .set("has_been_processed", "TRUE") + .set("likely_country_of_living", params.next()) + .where_(format!("did = {}", params.next())) + .to_string(), + ) + .bind(likely_country_of_living) + .bind(did) + .execute(&self.connection_pool) + .await?; + } + + transaction.commit().await?; + + Ok(true) + } + pub async fn fetch_subscription_cursor(&self, did: &str) -> Result> { let mut params = Parameters::new();