Store profiles in the database as we see them
No processing yet, but it's coming soon I guess
This commit is contained in:
parent
6365007fd8
commit
a18d448c38
|
@ -8,7 +8,7 @@ Heavily WIP. Doesn't work yet at all, but does read the stream of posts as they
|
||||||
|
|
||||||
- [x] Read stream of posts from Bluesky
|
- [x] Read stream of posts from Bluesky
|
||||||
- [x] Store posts in the database
|
- [x] Store posts in the database
|
||||||
- [ ] Store user profiles in the database
|
- [x] Store user profiles in the database
|
||||||
- [ ] Detect the country of residence from profile information
|
- [ ] Detect the country of residence from profile information
|
||||||
- [ ] Keep subscription state to not lose messages
|
- [ ] Keep subscription state to not lose messages
|
||||||
- [ ] Serve the feed
|
- [ ] Serve the feed
|
||||||
|
|
|
@ -1,7 +1,15 @@
|
||||||
|
CREATE TABLE IF NOT EXISTS Profile {
|
||||||
|
id INT GENERATED ALWAYS AS IDENTITY,
|
||||||
|
first_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||||
|
did TEXT UNIQUE,
|
||||||
|
handle TEXT NULL DEFAULT NULL,
|
||||||
|
likely_country_of_living varchar(2) NULL DEFAULT NULL
|
||||||
|
}
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS Post (
|
CREATE TABLE IF NOT EXISTS Post (
|
||||||
id INT GENERATED ALWAYS AS IDENTITY,
|
id INT GENERATED ALWAYS AS IDENTITY,
|
||||||
indexed_at TIMESTAMP WITH TIME ZONE,
|
indexed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||||
cid TEXT UNIQUE,
|
cid TEXT UNIQUE,
|
||||||
uri TEXT UNIQUE,
|
uri TEXT UNIQUE,
|
||||||
author_did TEXT
|
author_did TEXT REFERENCES Profile(did)
|
||||||
);
|
);
|
||||||
|
|
|
@ -44,13 +44,8 @@ pub async fn insert_post(
|
||||||
|
|
||||||
Ok(query(
|
Ok(query(
|
||||||
&insert_into("Post")
|
&insert_into("Post")
|
||||||
.columns(("indexed_at", "author_did", "cid", "uri"))
|
.columns(("author_did", "cid", "uri"))
|
||||||
.values([[
|
.values([params.next_array()])
|
||||||
"now()".to_owned(),
|
|
||||||
params.next(),
|
|
||||||
params.next(),
|
|
||||||
params.next(),
|
|
||||||
]])
|
|
||||||
.to_string(),
|
.to_string(),
|
||||||
)
|
)
|
||||||
.bind(author_did)
|
.bind(author_did)
|
||||||
|
@ -60,3 +55,20 @@ pub async fn insert_post(
|
||||||
.await
|
.await
|
||||||
.map(|_| ())?)
|
.map(|_| ())?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn insert_profile_if_it_doesnt_exist(db: &ConnectionPool, did: &str) -> Result<bool> {
|
||||||
|
let mut params = Parameters::new();
|
||||||
|
|
||||||
|
Ok(query(
|
||||||
|
&insert_into("Profile")
|
||||||
|
.columns(("did",))
|
||||||
|
.values([params.next()])
|
||||||
|
.on_conflict()
|
||||||
|
.do_nothing()
|
||||||
|
.to_string(),
|
||||||
|
)
|
||||||
|
.bind(did)
|
||||||
|
.execute(db)
|
||||||
|
.await
|
||||||
|
.map(|result| result.rows_affected() > 0)?)
|
||||||
|
}
|
||||||
|
|
10
src/main.rs
10
src/main.rs
|
@ -5,7 +5,7 @@ mod streaming;
|
||||||
use crate::database::ConnectionPool;
|
use crate::database::ConnectionPool;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use database::insert_post;
|
use database::{insert_post, insert_profile_if_it_doesnt_exist};
|
||||||
|
|
||||||
use streaming::{Operation, OperationProcessor};
|
use streaming::{Operation, OperationProcessor};
|
||||||
|
|
||||||
|
@ -55,12 +55,12 @@ impl OperationProcessor for PostSaver {
|
||||||
|
|
||||||
println!("received insertable post from {author_did}: {text}");
|
println!("received insertable post from {author_did}: {text}");
|
||||||
|
|
||||||
|
insert_profile_if_it_doesnt_exist(&self.db_connection_pool, &author_did).await?;
|
||||||
insert_post(&self.db_connection_pool, &author_did, &cid, &uri).await?;
|
insert_post(&self.db_connection_pool, &author_did, &cid, &uri).await?;
|
||||||
|
|
||||||
// TODO: Insert profile if it doesn't exist yet
|
|
||||||
// insert_profile_if_it_doesnt_exist(&self.db_connection_pool, &author_did).await?;
|
|
||||||
}
|
}
|
||||||
Operation::DeletePost { uri: _ } => {
|
Operation::DeletePost { uri } => {
|
||||||
|
println!("received a post do delete: {uri}");
|
||||||
|
|
||||||
// TODO: Delete posts from db
|
// TODO: Delete posts from db
|
||||||
// delete_post(&self.db_connection_pool, &uri).await?;
|
// delete_post(&self.db_connection_pool, &uri).await?;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue