Add a way to manually mark a certain profile as being from a specific country
This commit is contained in:
		
							parent
							
								
									db8a85624f
								
							
						
					
					
						commit
						1ac405e5ee
					
				| 
						 | 
				
			
			@ -15,7 +15,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they
 | 
			
		|||
- [x] Handle deleting of posts
 | 
			
		||||
- [x] Handle errors in the web service gracefully
 | 
			
		||||
- [x] Handle missing profiles in the profile classifier
 | 
			
		||||
- [ ] Add a way to mark a profile as being from a certain country manually
 | 
			
		||||
- [x] Add a way to mark a profile as being from a certain country manually
 | 
			
		||||
- [ ] Handle reconnecting to websocket somehow
 | 
			
		||||
- [ ] Publish the feed
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,50 @@
 | 
			
		|||
extern crate nederlandskie;
 | 
			
		||||
 | 
			
		||||
use std::env;
 | 
			
		||||
 | 
			
		||||
use anyhow::{anyhow, Context, Result};
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
use dotenv::dotenv;
 | 
			
		||||
 | 
			
		||||
use nederlandskie::services::{Bluesky, Database};
 | 
			
		||||
 | 
			
		||||
#[derive(Parser, Debug)]
 | 
			
		||||
struct Args {
 | 
			
		||||
    /// Handle of the user to force the country for
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    handle: String,
 | 
			
		||||
 | 
			
		||||
    /// Country to use, two letters
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    country: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[tokio::main]
 | 
			
		||||
async fn main() -> Result<()> {
 | 
			
		||||
    dotenv()?;
 | 
			
		||||
 | 
			
		||||
    let args = Args::parse();
 | 
			
		||||
 | 
			
		||||
    let database_url =
 | 
			
		||||
        env::var("DATABASE_URL").context("DATABASE_URL environment variable must be set")?;
 | 
			
		||||
 | 
			
		||||
    let bluesky = Bluesky::new("https://bsky.social");
 | 
			
		||||
 | 
			
		||||
    let did = bluesky
 | 
			
		||||
        .resolve_handle(&args.handle)
 | 
			
		||||
        .await?
 | 
			
		||||
        .ok_or_else(|| anyhow!("No such user: {}", args.handle))?;
 | 
			
		||||
 | 
			
		||||
    println!("Resolved handle '{}' to did '{}'", args.handle, did);
 | 
			
		||||
 | 
			
		||||
    let database = Database::connect(&database_url).await?;
 | 
			
		||||
 | 
			
		||||
    database.force_profile_country(&did, &args.country).await?;
 | 
			
		||||
 | 
			
		||||
    println!(
 | 
			
		||||
        "Stored '{}' as the country for profile with did '{}'",
 | 
			
		||||
        args.country, did
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -139,6 +139,27 @@ impl Bluesky {
 | 
			
		|||
        }))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn resolve_handle(&self, handle: &str) -> Result<Option<String>> {
 | 
			
		||||
        use atrium_api::com::atproto::identity::resolve_handle::Parameters;
 | 
			
		||||
 | 
			
		||||
        let result = self
 | 
			
		||||
            .client
 | 
			
		||||
            .service
 | 
			
		||||
            .com
 | 
			
		||||
            .atproto
 | 
			
		||||
            .identity
 | 
			
		||||
            .resolve_handle(Parameters {
 | 
			
		||||
                handle: handle.to_owned(),
 | 
			
		||||
            })
 | 
			
		||||
            .await;
 | 
			
		||||
 | 
			
		||||
        match result {
 | 
			
		||||
            Ok(result) => Ok(Some(result.did)),
 | 
			
		||||
            Err(e) if is_unable_to_resolve_handle_error(&e) => Ok(None),
 | 
			
		||||
            Err(e) => Err(e.into()),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn subscribe_to_operations<P: CommitProcessor>(
 | 
			
		||||
        &self,
 | 
			
		||||
        processor: &P,
 | 
			
		||||
| 
						 | 
				
			
			@ -179,3 +200,19 @@ fn is_missing_record_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {
 | 
			
		|||
            && error_message.starts_with("Could not locate record")
 | 
			
		||||
    )
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn is_unable_to_resolve_handle_error<T>(error: &atrium_xrpc::error::Error<T>) -> bool {
 | 
			
		||||
    use atrium_xrpc::error::{Error, ErrorResponseBody, XrpcError, XrpcErrorKind};
 | 
			
		||||
 | 
			
		||||
    matches!(error,
 | 
			
		||||
        Error::XrpcResponse(XrpcError {
 | 
			
		||||
            status: StatusCode::BAD_REQUEST,
 | 
			
		||||
            error:
 | 
			
		||||
                Some(XrpcErrorKind::Undefined(ErrorResponseBody {
 | 
			
		||||
                    error: Some(error_code),
 | 
			
		||||
                    message: Some(error_message),
 | 
			
		||||
                })),
 | 
			
		||||
        }) if error_code == "InvalidRequest"
 | 
			
		||||
            && error_message.starts_with("Unable to resolve handle")
 | 
			
		||||
    )
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -149,6 +149,49 @@ impl Database {
 | 
			
		|||
        .map(|result| result.rows_affected() > 0)?)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn force_profile_country(
 | 
			
		||||
        &self,
 | 
			
		||||
        did: &str,
 | 
			
		||||
        likely_country_of_living: &str,
 | 
			
		||||
    ) -> Result<bool> {
 | 
			
		||||
        let transaction = self.connection_pool.begin().await?;
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            let mut params = Parameters::new();
 | 
			
		||||
 | 
			
		||||
            query(
 | 
			
		||||
                &insert_into("Profile")
 | 
			
		||||
                    .columns(("did",))
 | 
			
		||||
                    .values([params.next()])
 | 
			
		||||
                    .on_conflict()
 | 
			
		||||
                    .do_nothing()
 | 
			
		||||
                    .to_string(),
 | 
			
		||||
            )
 | 
			
		||||
            .bind(did)
 | 
			
		||||
            .execute(&self.connection_pool)
 | 
			
		||||
            .await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            let mut params = Parameters::new();
 | 
			
		||||
            query(
 | 
			
		||||
                &update("Profile")
 | 
			
		||||
                    .set("has_been_processed", "TRUE")
 | 
			
		||||
                    .set("likely_country_of_living", params.next())
 | 
			
		||||
                    .where_(format!("did = {}", params.next()))
 | 
			
		||||
                    .to_string(),
 | 
			
		||||
            )
 | 
			
		||||
            .bind(likely_country_of_living)
 | 
			
		||||
            .bind(did)
 | 
			
		||||
            .execute(&self.connection_pool)
 | 
			
		||||
            .await?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        transaction.commit().await?;
 | 
			
		||||
 | 
			
		||||
        Ok(true)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn fetch_subscription_cursor(&self, did: &str) -> Result<Option<i32>> {
 | 
			
		||||
        let mut params = Parameters::new();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue