From a18d448c38b5145ca01969b59bd63b0c897203ed Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Thu, 31 Aug 2023 15:11:14 +0200 Subject: [PATCH] Store profiles in the database as we see them No processing yet, but it's coming soon I guess --- README.md | 2 +- sql/01_create_tables.sql | 12 ++++++++++-- src/database.rs | 26 +++++++++++++++++++------- src/main.rs | 10 +++++----- 4 files changed, 35 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 24d463c..0d32ac0 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they - [x] Read stream of posts from Bluesky - [x] Store posts in the database -- [ ] Store user profiles in the database +- [x] Store user profiles in the database - [ ] Detect the country of residence from profile information - [ ] Keep subscription state to not lose messages - [ ] Serve the feed diff --git a/sql/01_create_tables.sql b/sql/01_create_tables.sql index a11b567..d626fec 100644 --- a/sql/01_create_tables.sql +++ b/sql/01_create_tables.sql @@ -1,7 +1,15 @@ +CREATE TABLE IF NOT EXISTS Profile { + id INT GENERATED ALWAYS AS IDENTITY, + first_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + did TEXT UNIQUE, + handle TEXT NULL DEFAULT NULL, + likely_country_of_living varchar(2) NULL DEFAULT NULL +} + CREATE TABLE IF NOT EXISTS Post ( id INT GENERATED ALWAYS AS IDENTITY, - indexed_at TIMESTAMP WITH TIME ZONE, + indexed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), cid TEXT UNIQUE, uri TEXT UNIQUE, - author_did TEXT + author_did TEXT REFERENCES Profile(did) ); diff --git a/src/database.rs b/src/database.rs index 6c3338a..55d173f 100644 --- a/src/database.rs +++ b/src/database.rs @@ -44,13 +44,8 @@ pub async fn insert_post( Ok(query( &insert_into("Post") - .columns(("indexed_at", "author_did", "cid", "uri")) - .values([[ - "now()".to_owned(), - params.next(), - params.next(), - params.next(), - ]]) + .columns(("author_did", "cid", "uri")) + .values([params.next_array()]) .to_string(), ) .bind(author_did) @@ -60,3 +55,20 @@ pub async fn insert_post( .await .map(|_| ())?) } + +pub async fn insert_profile_if_it_doesnt_exist(db: &ConnectionPool, did: &str) -> Result { + let mut params = Parameters::new(); + + Ok(query( + &insert_into("Profile") + .columns(("did",)) + .values([params.next()]) + .on_conflict() + .do_nothing() + .to_string(), + ) + .bind(did) + .execute(db) + .await + .map(|result| result.rows_affected() > 0)?) +} diff --git a/src/main.rs b/src/main.rs index dd266ce..5b368b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod streaming; use crate::database::ConnectionPool; use anyhow::Result; use async_trait::async_trait; -use database::insert_post; +use database::{insert_post, insert_profile_if_it_doesnt_exist}; use streaming::{Operation, OperationProcessor}; @@ -55,12 +55,12 @@ impl OperationProcessor for PostSaver { println!("received insertable post from {author_did}: {text}"); + insert_profile_if_it_doesnt_exist(&self.db_connection_pool, &author_did).await?; insert_post(&self.db_connection_pool, &author_did, &cid, &uri).await?; - - // TODO: Insert profile if it doesn't exist yet - // insert_profile_if_it_doesnt_exist(&self.db_connection_pool, &author_did).await?; } - Operation::DeletePost { uri: _ } => { + Operation::DeletePost { uri } => { + println!("received a post do delete: {uri}"); + // TODO: Delete posts from db // delete_post(&self.db_connection_pool, &uri).await?; }