From 642a3d57cc79c9aae92d2d123684dc472ee15763 Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Sun, 24 Sep 2023 20:06:20 +0200 Subject: [PATCH] Remove ciborium in favor of custom deserialization logic Unfortunately, looks like serde is not flexible enough to support everything CBOR does, so a lot of messages cannot be deserialized properly. Other serde-based CBOR libraries suffer from the same problem. So now we have a bunch of boring deserialization logic supported by sk-cbor --- Cargo.lock | 41 ++------ Cargo.toml | 2 +- src/services/bluesky.rs | 1 + src/services/bluesky/decode.rs | 154 ++++++++++++++++++++++++++++++ src/services/bluesky/streaming.rs | 14 +-- 5 files changed, 167 insertions(+), 45 deletions(-) create mode 100644 src/services/bluesky/decode.rs diff --git a/Cargo.lock b/Cargo.lock index 7286389..1c52698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,33 +437,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "ciborium" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926" -dependencies = [ - "ciborium-io", - "ciborium-ll", - "serde", -] - -[[package]] -name = "ciborium-io" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656" - -[[package]] -name = "ciborium-ll" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" -dependencies = [ - "ciborium-io", - "half", -] - [[package]] name = "cid" version = "0.10.1" @@ -1035,12 +1008,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "half" -version = "1.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" - [[package]] name = "hashbrown" version = "0.12.3" @@ -2341,7 +2308,6 @@ dependencies = [ "axum", "chat-gpt-lib-rs", "chrono", - "ciborium", "clap", "dotenv", "env_logger", @@ -2354,6 +2320,7 @@ dependencies = [ "scooby", "serde", "serde_ipld_dagcbor", + "sk-cbor", "sqlx", "tokio", "tokio-tungstenite", @@ -3091,6 +3058,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "sk-cbor" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e94879a793aba6e65d691f345cfd172c4cc924a78259d5f9612a2cbfb78847a" + [[package]] name = "slab" version = "0.4.9" diff --git a/Cargo.toml b/Cargo.toml index ba9fe7b..857a36d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ atrium-xrpc = "0.4.0" axum = "0.6.20" chat-gpt-lib-rs = "0.2.1" chrono = "0.4.31" -ciborium = "0.2.1" clap = { version = "4.4.4", features = ["derive"] } dotenv = "0.15.0" env_logger = "0.10.0" @@ -27,6 +26,7 @@ rs-car = "0.4.1" scooby = "0.5.0" serde = "1.0.188" serde_ipld_dagcbor = "0.4.2" +sk-cbor = "0.1.2" sqlx = { version = "0.7.1", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] } tokio = { version = "1.32.0", features = ["full"] } tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } diff --git a/src/services/bluesky.rs b/src/services/bluesky.rs index 1ffded9..a0cf0a7 100644 --- a/src/services/bluesky.rs +++ b/src/services/bluesky.rs @@ -1,6 +1,7 @@ mod client; mod proto; mod streaming; +mod decode; pub use client::Bluesky; pub use streaming::{CommitDetails, CommitProcessor, Operation}; diff --git a/src/services/bluesky/decode.rs b/src/services/bluesky/decode.rs new file mode 100644 index 0000000..b8fb4bd --- /dev/null +++ b/src/services/bluesky/decode.rs @@ -0,0 +1,154 @@ +use sk_cbor::Value; +use anyhow::{Result, Error, anyhow}; + +type CborMap = Vec<(Value, Value)>; + +pub struct PostRecord { + pub langs: Option>, + pub text: String +} + +impl TryFrom<&CborMap> for PostRecord { + type Error = Error; + + fn try_from(root: &CborMap) -> Result { + let mut text: Option<&str> = None; + let mut langs: Option> = None; + + for (key, value) in iter_string_keys(&root) { + match key { + "text" => { text = Some(string(value)?) }, + "langs" => { langs = Some(array_of_strings(value)?) }, + _ => continue, + } + } + + Ok(PostRecord { + text: text.ok_or_else(|| anyhow!("Missing field: text"))?.to_owned(), + langs: langs.map(|v| v.into_iter().map(str::to_owned).collect()), + }) + } +} + +pub struct LikeRecord { + pub subject: Subject, +} + +impl TryFrom<&CborMap> for LikeRecord { + type Error = Error; + + fn try_from(root: &CborMap) -> Result { + let mut subject = None; + + for (key, value) in iter_string_keys(&root) { + match key { + "subject" => { subject = Some(map(value)?.try_into()?) }, + _ => continue, + } + } + + Ok(LikeRecord { + subject: subject.ok_or_else(|| anyhow!("Missing field: subject"))? + }) + } +} + +pub struct Subject { + pub cid: String, + pub uri: String, +} + +impl TryFrom<&CborMap> for Subject { + type Error = Error; + + fn try_from(root: &CborMap) -> Result { + let mut cid = None; + let mut uri = None; + + for (key, value) in iter_string_keys(&root) { + match key { + "cid" => { cid = Some(string(value)?) }, + "uri" => { uri = Some(string(value)?) }, + _ => continue, + } + } + + Ok(Subject { + cid: cid.ok_or_else(|| anyhow!("Missing field: cid"))?.to_owned(), + uri: uri.ok_or_else(|| anyhow!("Missing field: uri"))?.to_owned(), + }) + } +} + +pub struct FollowRecord { + pub subject: String, +} + +impl TryFrom<&CborMap> for FollowRecord { + type Error = Error; + + fn try_from(root: &CborMap) -> Result { + let mut subject = None; + + for (key, value) in iter_string_keys(&root) { + match key { + "subject" => { subject = Some(string(value)?) }, + _ => continue, + } + } + + Ok(FollowRecord { + subject: subject.ok_or_else(|| anyhow!("Missing field: subject"))?.to_owned(), + }) + } +} + +pub fn read_record TryFrom<&'a CborMap, Error = Error>>(bytes: &[u8]) -> Result { + let root = match sk_cbor::read(bytes) { + Err(_) => return Err(anyhow!("Could not decode anything")), + Ok(v) => v + }; + + let root_map = match root { + Value::Map(m) => m, + _ => return Err(anyhow!("Expected root object to be a map")), + }; + + (&root_map).try_into() +} + +fn iter_string_keys(map: &CborMap) -> impl Iterator { + map.into_iter().flat_map(|(k, v)| { + match k { + Value::TextString(k) => Some((k.as_str(), v)), + _ => None + } + }) +} + +fn map(value: &Value) -> Result<&CborMap> { + match value { + Value::Map(m) => Ok(m), + _ => Err(anyhow!("Expected a map")), + } +} + +fn string(value: &Value) -> Result<&str> { + match value { + Value::TextString(value) => Ok(value.as_str()), + _ => Err(anyhow!("Expected string")) + } +} + +fn array_of_strings(value: &Value) -> Result> { + match value { + Value::Array(vec) => { + let mut res = Vec::with_capacity(vec.len()); + for vec_value in vec { + res.push(string(&vec_value)?) + } + Ok(res) + } + _ => Err(anyhow!("Expected array")) + } +} diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index d02b976..00ff1b2 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -4,7 +4,7 @@ use anyhow::Result; use async_trait::async_trait; use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; -use super::proto::Frame; +use super::{proto::Frame, decode::{read_record, PostRecord, LikeRecord, FollowRecord}}; const COLLECTION_POST: &str = "app.bsky.feed.post"; const COLLECTION_LIKE: &str = "app.bsky.feed.like"; @@ -109,9 +109,7 @@ async fn extract_operations(commit: &Commit) -> Result> { match collection { COLLECTION_POST => { - use atrium_api::app::bsky::feed::post::Record; - - let record: Record = ciborium::from_reader(&mut block.as_slice())?; + let record: PostRecord = read_record(&block)?; Operation::CreatePost { author_did: commit.repo.clone(), @@ -122,9 +120,7 @@ async fn extract_operations(commit: &Commit) -> Result> { } } COLLECTION_LIKE => { - use atrium_api::app::bsky::feed::like::Record; - - let record: Record = ciborium::from_reader(&mut block.as_slice())?; + let record: LikeRecord = read_record(&block)?; Operation::CreateLike { author_did: commit.repo.clone(), @@ -135,9 +131,7 @@ async fn extract_operations(commit: &Commit) -> Result> { } } COLLECTION_FOLLOW => { - use atrium_api::app::bsky::graph::follow::Record; - - let record: Record = ciborium::from_reader(&mut block.as_slice())?; + let record: FollowRecord = read_record(&block)?; Operation::CreateFollow { author_did: commit.repo.clone(),