use anyhow::Result; use chrono::{DateTime, Utc}; use scooby::postgres::{ delete_from, insert_into, select, update, Aliasable, Joinable, Orderable, Parameters, }; use sqlx::postgres::{PgPool, PgPoolOptions, PgRow}; use sqlx::query; use sqlx::Row; pub struct Post { pub indexed_at: DateTime, pub author_did: String, pub cid: String, pub uri: String, } pub struct Database { connection_pool: PgPool, } impl Database { pub async fn connect(url: &str) -> Result { Ok(Self { connection_pool: PgPoolOptions::new().max_connections(5).connect(url).await?, }) } pub async fn insert_post(&self, author_did: &str, cid: &str, uri: &str) -> Result<()> { let mut params = Parameters::new(); Ok(query( &insert_into("Post") .columns(("author_did", "cid", "uri")) .values([params.next_array()]) .to_string(), ) .bind(author_did) .bind(cid) .bind(uri) .execute(&self.connection_pool) .await .map(|_| ())?) } pub async fn fetch_posts_by_authors_country( &self, author_country: &str, limit: usize, earlier_than: Option<(DateTime, &str)>, ) -> Result> { let mut params = Parameters::new(); let mut sql_builder = select(("p.indexed_at", "p.author_did", "p.cid", "p.uri")) .from( "Post" .as_("p") .inner_join("Profile".as_("pr")) .on("pr.did = p.author_did"), ) .where_(format!("pr.likely_country_of_living = {}", params.next())) .order_by(("p.indexed_at".desc(), "p.cid".desc())) .limit(limit); if earlier_than.is_some() { sql_builder = sql_builder .where_(format!("p.indexed_at <= {}", params.next())) .where_(format!("p.cid < {}", params.next())); } let sql_string = sql_builder.to_string(); let mut query_object = query(&sql_string).bind(author_country); if let Some((last_indexed_at, last_cid)) = earlier_than { query_object = query_object.bind(last_indexed_at).bind(last_cid); } Ok(query_object .map(|r: PgRow| Post { indexed_at: r.get("indexed_at"), author_did: r.get("author_did"), cid: r.get("cid"), uri: r.get("uri"), }) .fetch_all(&self.connection_pool) .await?) } pub async fn delete_post(&self, uri: &str) -> Result { let mut params = Parameters::new(); Ok(query( &delete_from("Post") .where_(format!("uri = {}", params.next())) .to_string(), ) .bind(uri) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } pub async fn insert_profile_if_it_doesnt_exist(&self, did: &str) -> Result { let mut params = Parameters::new(); Ok(query( &insert_into("Profile") .columns(("did",)) .values([params.next()]) .on_conflict() .do_nothing() .to_string(), ) .bind(did) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } pub async fn fetch_unprocessed_profile_dids(&self) -> Result> { Ok(query( &select("did") .from("Profile") .where_("has_been_processed = FALSE") .to_string(), ) .map(|r: PgRow| r.get(0)) .fetch_all(&self.connection_pool) .await?) } pub async fn store_profile_details( &self, did: &str, likely_country_of_living: &str, ) -> Result { let mut params = Parameters::new(); Ok(query( &update("Profile") .set("has_been_processed", "TRUE") .set("likely_country_of_living", params.next()) .where_(format!("did = {}", params.next())) .to_string(), ) .bind(likely_country_of_living) .bind(did) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } pub async fn force_profile_country( &self, did: &str, likely_country_of_living: &str, ) -> Result { let transaction = self.connection_pool.begin().await?; { let mut params = Parameters::new(); query( &insert_into("Profile") .columns(("did",)) .values([params.next()]) .on_conflict() .do_nothing() .to_string(), ) .bind(did) .execute(&self.connection_pool) .await?; } { let mut params = Parameters::new(); query( &update("Profile") .set("has_been_processed", "TRUE") .set("likely_country_of_living", params.next()) .where_(format!("did = {}", params.next())) .to_string(), ) .bind(likely_country_of_living) .bind(did) .execute(&self.connection_pool) .await?; } transaction.commit().await?; Ok(true) } pub async fn fetch_subscription_cursor(&self, did: &str) -> Result> { let mut params = Parameters::new(); Ok(query( &select("cursor") .from("SubscriptionState") .where_(format!("service = {}", params.next())) .to_string(), ) .bind(did) .map(|r: PgRow| r.get("cursor")) .fetch_optional(&self.connection_pool) .await?) } pub async fn create_subscription_state(&self, did: &str) -> Result { let mut params = Parameters::new(); Ok(query( &insert_into("SubscriptionState") .columns(("service", "cursor")) .values([params.next_array()]) .to_string(), ) .bind(did) .bind(0) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } pub async fn update_subscription_cursor(&self, did: &str, cursor: i32) -> Result { let mut params = Parameters::new(); Ok(query( &update("SubscriptionState") .set("cursor", params.next()) .where_(format!("service = {}", params.next())) .to_string(), ) .bind(cursor) .bind(did) .execute(&self.connection_pool) .await .map(|result| result.rows_affected() > 0)?) } }