diff --git a/Cargo.lock b/Cargo.lock index 1c41e07..c03d954 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1180,6 +1180,7 @@ name = "nederlandskie" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "atrium-api", "atrium-xrpc", "chrono", @@ -1906,6 +1907,7 @@ dependencies = [ "indexmap 2.0.0", "log", "memchr", + "native-tls", "once_cell", "paste", "percent-encoding", @@ -1915,6 +1917,8 @@ dependencies = [ "smallvec", "sqlformat", "thiserror", + "tokio", + "tokio-stream", "tracing", "url", ] @@ -1954,6 +1958,7 @@ dependencies = [ "sqlx-sqlite", "syn 1.0.109", "tempfile", + "tokio", "url", ] @@ -2212,6 +2217,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.20.0" diff --git a/Cargo.toml b/Cargo.toml index 69452c3..09cfea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0.75" +async-trait = "0.1.73" atrium-api = "0.4.0" atrium-xrpc = "0.3.0" chrono = "0.4.26" @@ -16,6 +17,6 @@ libipld-core = { version = "0.16.0", features = ["serde-codec"] } rs-car = "0.4.1" scooby = "0.4.0" serde_ipld_dagcbor = "0.4.0" -sqlx = { version = "0.7.1", features = ["chrono"] } +sqlx = { version = "0.7.1", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] } tokio = { version = "1.32.0", features = ["full"] } tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } diff --git a/sql/01_create_tables.sql b/sql/01_create_tables.sql new file mode 100644 index 0000000..a11b567 --- /dev/null +++ b/sql/01_create_tables.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS Post ( + id INT GENERATED ALWAYS AS IDENTITY, + indexed_at TIMESTAMP WITH TIME ZONE, + cid TEXT UNIQUE, + uri TEXT UNIQUE, + author_did TEXT +); diff --git a/src/database.rs b/src/database.rs index d8a07ac..0cb4f08 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,4 +1,11 @@ +use anyhow::Result; use chrono::{DateTime, Utc}; +use scooby::postgres::{insert_into, Parameters}; + +use sqlx::postgres::{PgPool, PgPoolOptions}; +use sqlx::query; + +pub type ConnectionPool = PgPool; pub struct Post { indexed_at: DateTime, @@ -10,11 +17,41 @@ pub struct Post { pub struct Profile { first_seen_at: DateTime, did: String, - handle: String, - likely_country_of_living: String, + handle: Option, + likely_country_of_living: Option, } pub struct SubscriptionState { service: String, cursor: i64, } + +pub async fn make_connection_pool() -> Result { + // TODO: get options from env vars + Ok(PgPoolOptions::new() + .max_connections(5) + .connect("postgres://postgres:password@localhost/nederlandskie").await?) + +} + +pub async fn insert_post( + db: &ConnectionPool, + author_did: &str, + cid: &str, + uri: &str, +) -> Result<()> { + let mut params = Parameters::new(); + + Ok(query( + &insert_into("Post") + .columns(("indexed_at", "author_did", "cid", "uri")) + .values([["now()".to_owned(), params.next(), params.next(), params.next()]]) + .to_string(), + ) + .bind(author_did) + .bind(cid) + .bind(uri) + .execute(db) + .await + .map(|_| ())?) +} diff --git a/src/main.rs b/src/main.rs index 7823bb8..fccbe7d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,70 @@ mod database; mod frames; mod streaming; -use crate::streaming::start_stream; +use crate::database::ConnectionPool; +use anyhow::Result; +use async_trait::async_trait; +use database::insert_post; + +use streaming::{Operation, OperationProcessor}; + +use crate::database::make_connection_pool; +use crate::streaming::start_processing_operations_with; #[tokio::main] -async fn main() -> Result<(), Box> { - start_stream().await?; +async fn main() -> Result<()> { + let db_connection_pool = make_connection_pool().await?; + + // FIXME: This struct shouldn't really exist, but I couldn't find a way to replace + // this whole nonsense with a closure, which is what this whole thing should be in + // first place. + let post_saver = PostSaver { db_connection_pool }; + + start_processing_operations_with(post_saver).await?; Ok(()) } + +struct PostSaver { + db_connection_pool: ConnectionPool, +} + +#[async_trait] +impl OperationProcessor for PostSaver { + async fn process_operation(&self, operation: &Operation) -> Result<()> { + match operation { + Operation::CreatePost { + author_did, + cid, + uri, + languages, + text, + } => { + // TODO: Configure this via env vars + if !languages.contains("ru") { + return Ok(()) + } + + // BlueSky gets confused a lot about Russian vs Ukrainian, so skip posts + // that may be in Ukrainian regardless of whether Russian is in the list + // TODO: Configure this via env vars + if languages.contains("uk") { + return Ok(()); + } + + println!("received insertable post from {author_did}: {text}"); + + insert_post(&self.db_connection_pool, &author_did, &cid, &uri).await?; + + // TODO: Insert profile if it doesn't exist yet + // insert_profile_if_it_doesnt_exist(&self.db_connection_pool, &author_did).await?; + } + Operation::DeletePost { uri: _ } => { + // TODO: Delete posts from db + // delete_post(&self.db_connection_pool, &uri).await?; + } + }; + + Ok(()) + } +} diff --git a/src/streaming.rs b/src/streaming.rs index 36feb4d..fa3aed9 100644 --- a/src/streaming.rs +++ b/src/streaming.rs @@ -1,3 +1,6 @@ +use std::collections::HashSet; + +use async_trait::async_trait; use anyhow::Result; use crate::frames::Frame; @@ -8,12 +11,31 @@ use atrium_api::com::atproto::sync::subscribe_repos::Message; use futures::StreamExt; use tokio_tungstenite::{connect_async, tungstenite}; -pub async fn start_stream() -> Result<()> { +#[async_trait] +pub trait OperationProcessor { + async fn process_operation(&self, operation: &Operation) -> Result<()>; +} + +#[derive(Debug)] +pub enum Operation { + CreatePost { + author_did: String, + cid: String, + uri: String, + languages: HashSet, + text: String, + }, + DeletePost { + uri: String, + }, +} + +pub async fn start_processing_operations_with(processor: P) -> Result<()> { let (mut stream, _) = connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?; while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { - if let Err(e) = handle_message(&message).await { + if let Err(e) = handle_message(&message, &processor).await { println!("Error handling a message: {:?}", e); } } @@ -21,15 +43,15 @@ pub async fn start_stream() -> Result<()> { Ok(()) } -async fn handle_message(message: &[u8]) -> Result<()> { +async fn handle_message(message: &[u8], processor: &P) -> Result<()> { let commit = match parse_commit_from_message(&message)? { Some(commit) => commit, None => return Ok(()), }; - let post_operations = extract_post_operations(&commit).await?; + let post_operations = extract_operations(&commit).await?; for operation in &post_operations { - println!("{:?}", operation); + processor.process_operation(&operation).await?; } Ok(()) @@ -45,21 +67,7 @@ fn parse_commit_from_message(message: &[u8]) -> Result> { } } -#[derive(Debug)] -enum PostOperation { - Create { - author_did: String, - cid: String, - uri: String, - languages: Vec, - text: String, - }, - Delete { - cid: String, - }, -} - -async fn extract_post_operations(commit: &Commit) -> Result> { +async fn extract_operations(commit: &Commit) -> Result> { let mut operations = Vec::new(); let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; @@ -69,20 +77,20 @@ async fn extract_post_operations(commit: &Commit) -> Result> continue; } - let cid = op.cid.ok_or(anyhow!("cid is not there, how is that possible"))?.to_string(); + let uri = format!("at://{}/{}", commit.repo, op.path); if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) { let record: Record = ciborium::from_reader(&mut item.as_slice())?; operations.push(match op.action.as_str() { - "create" => PostOperation::Create { - languages: record.langs.unwrap_or_else(Vec::new), + "create" => Operation::CreatePost { + languages: record.langs.unwrap_or_else(Vec::new).iter().cloned().collect(), text: record.text, author_did: commit.repo.clone(), - cid, - uri: format!("at://{}/{}", commit.repo, op.path), + cid: op.cid.ok_or(anyhow!("cid is not present for a post create operation, how is that possible"))?.to_string(), + uri, }, - "delete" => PostOperation::Delete { cid }, + "delete" => Operation::DeletePost { uri }, _ => unreachable!(), }); }