Refactor streaming and start inserting Russian posts into db

This commit is contained in:
Aleksei Voronov 2023-08-31 09:42:56 +02:00
parent 1b80bf6ab5
commit e5b3db1470
6 changed files with 160 additions and 32 deletions

16
Cargo.lock generated
View File

@ -1180,6 +1180,7 @@ name = "nederlandskie"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"atrium-api", "atrium-api",
"atrium-xrpc", "atrium-xrpc",
"chrono", "chrono",
@ -1906,6 +1907,7 @@ dependencies = [
"indexmap 2.0.0", "indexmap 2.0.0",
"log", "log",
"memchr", "memchr",
"native-tls",
"once_cell", "once_cell",
"paste", "paste",
"percent-encoding", "percent-encoding",
@ -1915,6 +1917,8 @@ dependencies = [
"smallvec", "smallvec",
"sqlformat", "sqlformat",
"thiserror", "thiserror",
"tokio",
"tokio-stream",
"tracing", "tracing",
"url", "url",
] ]
@ -1954,6 +1958,7 @@ dependencies = [
"sqlx-sqlite", "sqlx-sqlite",
"syn 1.0.109", "syn 1.0.109",
"tempfile", "tempfile",
"tokio",
"url", "url",
] ]
@ -2212,6 +2217,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.20.0" version = "0.20.0"

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.75" anyhow = "1.0.75"
async-trait = "0.1.73"
atrium-api = "0.4.0" atrium-api = "0.4.0"
atrium-xrpc = "0.3.0" atrium-xrpc = "0.3.0"
chrono = "0.4.26" chrono = "0.4.26"
@ -16,6 +17,6 @@ libipld-core = { version = "0.16.0", features = ["serde-codec"] }
rs-car = "0.4.1" rs-car = "0.4.1"
scooby = "0.4.0" scooby = "0.4.0"
serde_ipld_dagcbor = "0.4.0" serde_ipld_dagcbor = "0.4.0"
sqlx = { version = "0.7.1", features = ["chrono"] } sqlx = { version = "0.7.1", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] }
tokio = { version = "1.32.0", features = ["full"] } tokio = { version = "1.32.0", features = ["full"] }
tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] }

7
sql/01_create_tables.sql Normal file
View File

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS Post (
id INT GENERATED ALWAYS AS IDENTITY,
indexed_at TIMESTAMP WITH TIME ZONE,
cid TEXT UNIQUE,
uri TEXT UNIQUE,
author_did TEXT
);

View File

@ -1,4 +1,11 @@
use anyhow::Result;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use scooby::postgres::{insert_into, Parameters};
use sqlx::postgres::{PgPool, PgPoolOptions};
use sqlx::query;
pub type ConnectionPool = PgPool;
pub struct Post { pub struct Post {
indexed_at: DateTime<Utc>, indexed_at: DateTime<Utc>,
@ -10,11 +17,41 @@ pub struct Post {
pub struct Profile { pub struct Profile {
first_seen_at: DateTime<Utc>, first_seen_at: DateTime<Utc>,
did: String, did: String,
handle: String, handle: Option<String>,
likely_country_of_living: String, likely_country_of_living: Option<String>,
} }
pub struct SubscriptionState { pub struct SubscriptionState {
service: String, service: String,
cursor: i64, cursor: i64,
} }
pub async fn make_connection_pool() -> Result<ConnectionPool> {
// TODO: get options from env vars
Ok(PgPoolOptions::new()
.max_connections(5)
.connect("postgres://postgres:password@localhost/nederlandskie").await?)
}
pub async fn insert_post(
db: &ConnectionPool,
author_did: &str,
cid: &str,
uri: &str,
) -> Result<()> {
let mut params = Parameters::new();
Ok(query(
&insert_into("Post")
.columns(("indexed_at", "author_did", "cid", "uri"))
.values([["now()".to_owned(), params.next(), params.next(), params.next()]])
.to_string(),
)
.bind(author_did)
.bind(cid)
.bind(uri)
.execute(db)
.await
.map(|_| ())?)
}

View File

@ -2,11 +2,70 @@ mod database;
mod frames; mod frames;
mod streaming; mod streaming;
use crate::streaming::start_stream; use crate::database::ConnectionPool;
use anyhow::Result;
use async_trait::async_trait;
use database::insert_post;
use streaming::{Operation, OperationProcessor};
use crate::database::make_connection_pool;
use crate::streaming::start_processing_operations_with;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<()> {
start_stream().await?; let db_connection_pool = make_connection_pool().await?;
// FIXME: This struct shouldn't really exist, but I couldn't find a way to replace
// this whole nonsense with a closure, which is what this whole thing should be in
// first place.
let post_saver = PostSaver { db_connection_pool };
start_processing_operations_with(post_saver).await?;
Ok(()) Ok(())
} }
struct PostSaver {
db_connection_pool: ConnectionPool,
}
#[async_trait]
impl OperationProcessor for PostSaver {
async fn process_operation(&self, operation: &Operation) -> Result<()> {
match operation {
Operation::CreatePost {
author_did,
cid,
uri,
languages,
text,
} => {
// TODO: Configure this via env vars
if !languages.contains("ru") {
return Ok(())
}
// BlueSky gets confused a lot about Russian vs Ukrainian, so skip posts
// that may be in Ukrainian regardless of whether Russian is in the list
// TODO: Configure this via env vars
if languages.contains("uk") {
return Ok(());
}
println!("received insertable post from {author_did}: {text}");
insert_post(&self.db_connection_pool, &author_did, &cid, &uri).await?;
// TODO: Insert profile if it doesn't exist yet
// insert_profile_if_it_doesnt_exist(&self.db_connection_pool, &author_did).await?;
}
Operation::DeletePost { uri: _ } => {
// TODO: Delete posts from db
// delete_post(&self.db_connection_pool, &uri).await?;
}
};
Ok(())
}
}

View File

@ -1,3 +1,6 @@
use std::collections::HashSet;
use async_trait::async_trait;
use anyhow::Result; use anyhow::Result;
use crate::frames::Frame; use crate::frames::Frame;
@ -8,12 +11,31 @@ use atrium_api::com::atproto::sync::subscribe_repos::Message;
use futures::StreamExt; use futures::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite}; use tokio_tungstenite::{connect_async, tungstenite};
pub async fn start_stream() -> Result<()> { #[async_trait]
pub trait OperationProcessor {
async fn process_operation(&self, operation: &Operation) -> Result<()>;
}
#[derive(Debug)]
pub enum Operation {
CreatePost {
author_did: String,
cid: String,
uri: String,
languages: HashSet<String>,
text: String,
},
DeletePost {
uri: String,
},
}
pub async fn start_processing_operations_with<P: OperationProcessor>(processor: P) -> Result<()> {
let (mut stream, _) = let (mut stream, _) =
connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?; connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?;
while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await { while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await {
if let Err(e) = handle_message(&message).await { if let Err(e) = handle_message(&message, &processor).await {
println!("Error handling a message: {:?}", e); println!("Error handling a message: {:?}", e);
} }
} }
@ -21,15 +43,15 @@ pub async fn start_stream() -> Result<()> {
Ok(()) Ok(())
} }
async fn handle_message(message: &[u8]) -> Result<()> { async fn handle_message<P: OperationProcessor>(message: &[u8], processor: &P) -> Result<()> {
let commit = match parse_commit_from_message(&message)? { let commit = match parse_commit_from_message(&message)? {
Some(commit) => commit, Some(commit) => commit,
None => return Ok(()), None => return Ok(()),
}; };
let post_operations = extract_post_operations(&commit).await?; let post_operations = extract_operations(&commit).await?;
for operation in &post_operations { for operation in &post_operations {
println!("{:?}", operation); processor.process_operation(&operation).await?;
} }
Ok(()) Ok(())
@ -45,21 +67,7 @@ fn parse_commit_from_message(message: &[u8]) -> Result<Option<Commit>> {
} }
} }
#[derive(Debug)] async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
enum PostOperation {
Create {
author_did: String,
cid: String,
uri: String,
languages: Vec<String>,
text: String,
},
Delete {
cid: String,
},
}
async fn extract_post_operations(commit: &Commit) -> Result<Vec<PostOperation>> {
let mut operations = Vec::new(); let mut operations = Vec::new();
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?; let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
@ -69,20 +77,20 @@ async fn extract_post_operations(commit: &Commit) -> Result<Vec<PostOperation>>
continue; continue;
} }
let cid = op.cid.ok_or(anyhow!("cid is not there, how is that possible"))?.to_string(); let uri = format!("at://{}/{}", commit.repo, op.path);
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) { if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) {
let record: Record = ciborium::from_reader(&mut item.as_slice())?; let record: Record = ciborium::from_reader(&mut item.as_slice())?;
operations.push(match op.action.as_str() { operations.push(match op.action.as_str() {
"create" => PostOperation::Create { "create" => Operation::CreatePost {
languages: record.langs.unwrap_or_else(Vec::new), languages: record.langs.unwrap_or_else(Vec::new).iter().cloned().collect(),
text: record.text, text: record.text,
author_did: commit.repo.clone(), author_did: commit.repo.clone(),
cid, cid: op.cid.ok_or(anyhow!("cid is not present for a post create operation, how is that possible"))?.to_string(),
uri: format!("at://{}/{}", commit.repo, op.path), uri,
}, },
"delete" => PostOperation::Delete { cid }, "delete" => Operation::DeletePost { uri },
_ => unreachable!(), _ => unreachable!(),
}); });
} }