Initial version of things: just reads the stream of messages and prints it out and that's that

This commit is contained in:
Aleksei Voronov 2023-08-18 21:11:49 +02:00
commit 13cef8786c
7 changed files with 2944 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

2610
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

21
Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "nederlandskie"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.75"
atrium-api = "0.4.0"
atrium-xrpc = "0.3.0"
chrono = "0.4.26"
ciborium = "0.2.1"
futures = "0.3.28"
libipld-core = { version = "0.16.0", features = ["serde-codec"] }
rs-car = "0.4.1"
scooby = "0.4.0"
serde_ipld_dagcbor = "0.4.0"
sqlx = { version = "0.7.1", features = ["chrono"] }
tokio = { version = "1.32.0", features = ["full"] }
tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] }

20
src/database.rs Normal file
View File

@ -0,0 +1,20 @@
use chrono::{DateTime, Utc};
pub struct Post {
indexed_at: DateTime<Utc>,
author_did: String,
cid: String,
uri: String,
}
pub struct Profile {
first_seen_at: DateTime<Utc>,
did: String,
handle: String,
likely_country_of_living: String,
}
pub struct SubscriptionState {
service: String,
cursor: i64,
}

184
src/frames.rs Normal file
View File

@ -0,0 +1,184 @@
use atrium_api::com::atproto::sync::subscribe_repos::{
Commit, Handle, Info, Message, Migrate, Tombstone,
};
use libipld_core::ipld::Ipld;
use std::io::Cursor;
// original definition:
//```
// export enum FrameType {
// Message = 1,
// Error = -1,
// }
// export const messageFrameHeader = z.object({
// op: z.literal(FrameType.Message), // Frame op
// t: z.string().optional(), // Message body type discriminator
// })
// export type MessageFrameHeader = z.infer<typeof messageFrameHeader>
// export const errorFrameHeader = z.object({
// op: z.literal(FrameType.Error),
// })
// export type ErrorFrameHeader = z.infer<typeof errorFrameHeader>
// ```
#[derive(Debug, Clone, PartialEq, Eq)]
enum FrameHeader {
Message(Option<String>),
Error,
}
impl TryFrom<Ipld> for FrameHeader {
type Error = anyhow::Error;
fn try_from(value: Ipld) -> Result<Self, <FrameHeader as TryFrom<Ipld>>::Error> {
if let Ipld::Map(map) = value {
if let Some(Ipld::Integer(i)) = map.get("op") {
match i {
1 => {
let t = if let Some(Ipld::String(s)) = map.get("t") {
Some(s.clone())
} else {
None
};
return Ok(FrameHeader::Message(t));
}
-1 => return Ok(FrameHeader::Error),
_ => {}
}
}
}
Err(anyhow::anyhow!("invalid frame type"))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Frame {
Message(Box<MessageFrame>),
Error(ErrorFrame),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MessageFrame {
pub body: Message,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ErrorFrame {
// TODO
// body: Value,
}
impl TryFrom<&[u8]> for Frame {
type Error = anyhow::Error;
fn try_from(value: &[u8]) -> Result<Self, <Frame as TryFrom<&[u8]>>::Error> {
let mut cursor = Cursor::new(value);
let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
Err(serde_ipld_dagcbor::DecodeError::TrailingData) => {
value.split_at(cursor.position() as usize)
}
_ => {
// TODO
return Err(anyhow::anyhow!("invalid frame type"));
}
};
let ipld = serde_ipld_dagcbor::from_slice::<Ipld>(left)?;
let header = FrameHeader::try_from(ipld)?;
match header {
FrameHeader::Message(t) => match t.as_deref() {
Some("#commit") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Commit(Box::new(serde_ipld_dagcbor::from_slice::<Commit>(
right,
)?)),
}))),
Some("#handle") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Handle(Box::new(serde_ipld_dagcbor::from_slice::<Handle>(
right,
)?)),
}))),
Some("#info") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Info(Box::new(serde_ipld_dagcbor::from_slice::<Info>(right)?)),
}))),
Some("#migrate") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Migrate(Box::new(serde_ipld_dagcbor::from_slice::<Migrate>(
right,
)?)),
}))),
Some("#tombstone") => Ok(Frame::Message(Box::new(MessageFrame {
body: Message::Tombstone(Box::new(
serde_ipld_dagcbor::from_slice::<Tombstone>(right)?,
)),
}))),
_ => {
let tag = t.as_deref();
Err(anyhow::anyhow!("frame not implemented: tag={tag:?}"))
}
},
FrameHeader::Error => Ok(Frame::Error(ErrorFrame {})),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn serialized_data(s: &str) -> Vec<u8> {
assert!(s.len() % 2 == 0);
let b2u = |b: u8| match b {
b'0'..=b'9' => b - b'0',
b'a'..=b'f' => b - b'a' + 10,
_ => unreachable!(),
};
s.as_bytes()
.chunks(2)
.map(|b| (b2u(b[0]) << 4) + b2u(b[1]))
.collect()
}
#[test]
fn deserialize_message_frame_header() {
// {"op": 1, "t": "#commit"}
let data = serialized_data("a2626f700161746723636f6d6d6974");
let ipld = serde_ipld_dagcbor::from_slice::<Ipld>(&data).expect("failed to deserialize");
let result = FrameHeader::try_from(ipld);
assert_eq!(
result.expect("failed to deserialize"),
FrameHeader::Message(Some(String::from("#commit")))
);
}
#[test]
fn deserialize_error_frame_header() {
// {"op": -1}
let data = serialized_data("a1626f7020");
let ipld = serde_ipld_dagcbor::from_slice::<Ipld>(&data).expect("failed to deserialize");
let result = FrameHeader::try_from(ipld);
assert_eq!(result.expect("failed to deserialize"), FrameHeader::Error);
}
#[test]
fn deserialize_invalid_frame_header() {
{
// {"op": 2, "t": "#commit"}
let data = serialized_data("a2626f700261746723636f6d6d6974");
let ipld =
serde_ipld_dagcbor::from_slice::<Ipld>(&data).expect("failed to deserialize");
let result = FrameHeader::try_from(ipld);
assert_eq!(
result.expect_err("must be failed").to_string(),
"invalid frame type"
);
}
{
// {"op": -2}
let data = serialized_data("a1626f7021");
let ipld =
serde_ipld_dagcbor::from_slice::<Ipld>(&data).expect("failed to deserialize");
let result = FrameHeader::try_from(ipld);
assert_eq!(
result.expect_err("must be failed").to_string(),
"invalid frame type"
);
}
}
}

12
src/main.rs Normal file
View File

@ -0,0 +1,12 @@
mod database;
mod frames;
mod streaming;
use crate::streaming::start_stream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_stream().await?;
Ok(())
}

96
src/streaming.rs Normal file
View File

@ -0,0 +1,96 @@
use anyhow::Result;
use crate::frames::Frame;
use anyhow::anyhow;
use atrium_api::app::bsky::feed::post::Record;
use atrium_api::com::atproto::sync::subscribe_repos::Commit;
use atrium_api::com::atproto::sync::subscribe_repos::Message;
use futures::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite};
pub async fn start_stream() -> Result<()> {
let (mut stream, _) =
connect_async("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").await?;
while let Some(Ok(tungstenite::Message::Binary(message))) = stream.next().await {
let commit = match parse_commit_message(&message) {
Ok(Some(commit)) => commit,
Ok(None) => continue,
Err(e) => {
println!("Couldn't parse commit: {:?}", e);
continue;
}
};
let post_messages = extract_post_messages(&commit).await;
match post_messages {
Ok(post_messages) => {
if !post_messages.is_empty() {
println!("{:?}", post_messages);
}
}
Err(e) => {
println!("Coudln't extract post messages: {:?}", e);
}
}
}
Ok(())
}
fn parse_commit_message(message: &[u8]) -> Result<Option<Commit>> {
match Frame::try_from(message)? {
Frame::Message(message) => match message.body {
Message::Commit(commit) => Ok(Some(*commit)),
_ => Ok(None),
},
Frame::Error(err) => panic!("Frame error: {err:?}"),
}
}
#[derive(Debug)]
enum Action {
Create,
Delete,
}
#[derive(Debug)]
struct PostMessage {
action: Action,
author_did: String,
cid: String,
uri: String,
languages: Vec<String>,
text: String,
}
async fn extract_post_messages(commit: &Commit) -> Result<Vec<PostMessage>> {
let mut posts = Vec::new();
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
for op in &commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
if (op.action != "create" && op.action != "delete") || collection != "app.bsky.feed.post" {
continue;
}
if let Some((_, item)) = items.iter().find(|(cid, _)| Some(*cid) == op.cid) {
let record: Record = ciborium::from_reader(&mut item.as_slice())?;
posts.push(PostMessage {
action: if op.action == "create" {
Action::Create
} else {
Action::Delete
},
languages: record.langs.unwrap_or_else(Vec::new),
text: record.text,
author_did: commit.repo.clone(),
cid: op.cid.expect("cid is not there, what").to_string(),
uri: format!("at://{}/{}", commit.repo, op.path),
})
}
}
Ok(posts)
}