Remove ciborium in favor of custom deserialization logic

Unfortunately, looks like serde is not flexible enough to support everything CBOR does,
so a lot of messages cannot be deserialized properly. Other serde-based CBOR libraries
suffer from the same problem.

So now we have a bunch of boring deserialization logic supported by sk-cbor
This commit is contained in:
Aleksei Voronov 2023-09-24 20:06:20 +02:00
parent ffccdc40fe
commit 642a3d57cc
5 changed files with 167 additions and 45 deletions

41
Cargo.lock generated
View File

@ -437,33 +437,6 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "ciborium"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926"
dependencies = [
"ciborium-io",
"ciborium-ll",
"serde",
]
[[package]]
name = "ciborium-io"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656"
[[package]]
name = "ciborium-ll"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b"
dependencies = [
"ciborium-io",
"half",
]
[[package]] [[package]]
name = "cid" name = "cid"
version = "0.10.1" version = "0.10.1"
@ -1035,12 +1008,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "half"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@ -2341,7 +2308,6 @@ dependencies = [
"axum", "axum",
"chat-gpt-lib-rs", "chat-gpt-lib-rs",
"chrono", "chrono",
"ciborium",
"clap", "clap",
"dotenv", "dotenv",
"env_logger", "env_logger",
@ -2354,6 +2320,7 @@ dependencies = [
"scooby", "scooby",
"serde", "serde",
"serde_ipld_dagcbor", "serde_ipld_dagcbor",
"sk-cbor",
"sqlx", "sqlx",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
@ -3091,6 +3058,12 @@ dependencies = [
"rand_core", "rand_core",
] ]
[[package]]
name = "sk-cbor"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e94879a793aba6e65d691f345cfd172c4cc924a78259d5f9612a2cbfb78847a"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.9" version = "0.4.9"

View File

@ -14,7 +14,6 @@ atrium-xrpc = "0.4.0"
axum = "0.6.20" axum = "0.6.20"
chat-gpt-lib-rs = "0.2.1" chat-gpt-lib-rs = "0.2.1"
chrono = "0.4.31" chrono = "0.4.31"
ciborium = "0.2.1"
clap = { version = "4.4.4", features = ["derive"] } clap = { version = "4.4.4", features = ["derive"] }
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.10.0" env_logger = "0.10.0"
@ -27,6 +26,7 @@ rs-car = "0.4.1"
scooby = "0.5.0" scooby = "0.5.0"
serde = "1.0.188" serde = "1.0.188"
serde_ipld_dagcbor = "0.4.2" serde_ipld_dagcbor = "0.4.2"
sk-cbor = "0.1.2"
sqlx = { version = "0.7.1", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] } sqlx = { version = "0.7.1", default-features = false, features = ["postgres", "runtime-tokio-native-tls", "chrono"] }
tokio = { version = "1.32.0", features = ["full"] } tokio = { version = "1.32.0", features = ["full"] }
tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] }

View File

@ -1,6 +1,7 @@
mod client; mod client;
mod proto; mod proto;
mod streaming; mod streaming;
mod decode;
pub use client::Bluesky; pub use client::Bluesky;
pub use streaming::{CommitDetails, CommitProcessor, Operation}; pub use streaming::{CommitDetails, CommitProcessor, Operation};

View File

@ -0,0 +1,154 @@
use sk_cbor::Value;
use anyhow::{Result, Error, anyhow};
type CborMap = Vec<(Value, Value)>;
pub struct PostRecord {
pub langs: Option<Vec<String>>,
pub text: String
}
impl TryFrom<&CborMap> for PostRecord {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut text: Option<&str> = None;
let mut langs: Option<Vec<&str>> = None;
for (key, value) in iter_string_keys(&root) {
match key {
"text" => { text = Some(string(value)?) },
"langs" => { langs = Some(array_of_strings(value)?) },
_ => continue,
}
}
Ok(PostRecord {
text: text.ok_or_else(|| anyhow!("Missing field: text"))?.to_owned(),
langs: langs.map(|v| v.into_iter().map(str::to_owned).collect()),
})
}
}
pub struct LikeRecord {
pub subject: Subject,
}
impl TryFrom<&CborMap> for LikeRecord {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut subject = None;
for (key, value) in iter_string_keys(&root) {
match key {
"subject" => { subject = Some(map(value)?.try_into()?) },
_ => continue,
}
}
Ok(LikeRecord {
subject: subject.ok_or_else(|| anyhow!("Missing field: subject"))?
})
}
}
pub struct Subject {
pub cid: String,
pub uri: String,
}
impl TryFrom<&CborMap> for Subject {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut cid = None;
let mut uri = None;
for (key, value) in iter_string_keys(&root) {
match key {
"cid" => { cid = Some(string(value)?) },
"uri" => { uri = Some(string(value)?) },
_ => continue,
}
}
Ok(Subject {
cid: cid.ok_or_else(|| anyhow!("Missing field: cid"))?.to_owned(),
uri: uri.ok_or_else(|| anyhow!("Missing field: uri"))?.to_owned(),
})
}
}
pub struct FollowRecord {
pub subject: String,
}
impl TryFrom<&CborMap> for FollowRecord {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut subject = None;
for (key, value) in iter_string_keys(&root) {
match key {
"subject" => { subject = Some(string(value)?) },
_ => continue,
}
}
Ok(FollowRecord {
subject: subject.ok_or_else(|| anyhow!("Missing field: subject"))?.to_owned(),
})
}
}
pub fn read_record<T: for<'a> TryFrom<&'a CborMap, Error = Error>>(bytes: &[u8]) -> Result<T> {
let root = match sk_cbor::read(bytes) {
Err(_) => return Err(anyhow!("Could not decode anything")),
Ok(v) => v
};
let root_map = match root {
Value::Map(m) => m,
_ => return Err(anyhow!("Expected root object to be a map")),
};
(&root_map).try_into()
}
fn iter_string_keys(map: &CborMap) -> impl Iterator<Item = (&str, &Value)> {
map.into_iter().flat_map(|(k, v)| {
match k {
Value::TextString(k) => Some((k.as_str(), v)),
_ => None
}
})
}
fn map(value: &Value) -> Result<&CborMap> {
match value {
Value::Map(m) => Ok(m),
_ => Err(anyhow!("Expected a map")),
}
}
fn string(value: &Value) -> Result<&str> {
match value {
Value::TextString(value) => Ok(value.as_str()),
_ => Err(anyhow!("Expected string"))
}
}
fn array_of_strings(value: &Value) -> Result<Vec<&str>> {
match value {
Value::Array(vec) => {
let mut res = Vec::with_capacity(vec.len());
for vec_value in vec {
res.push(string(&vec_value)?)
}
Ok(res)
}
_ => Err(anyhow!("Expected array"))
}
}

View File

@ -4,7 +4,7 @@ use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message};
use super::proto::Frame; use super::{proto::Frame, decode::{read_record, PostRecord, LikeRecord, FollowRecord}};
const COLLECTION_POST: &str = "app.bsky.feed.post"; const COLLECTION_POST: &str = "app.bsky.feed.post";
const COLLECTION_LIKE: &str = "app.bsky.feed.like"; const COLLECTION_LIKE: &str = "app.bsky.feed.like";
@ -109,9 +109,7 @@ async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
match collection { match collection {
COLLECTION_POST => { COLLECTION_POST => {
use atrium_api::app::bsky::feed::post::Record; let record: PostRecord = read_record(&block)?;
let record: Record = ciborium::from_reader(&mut block.as_slice())?;
Operation::CreatePost { Operation::CreatePost {
author_did: commit.repo.clone(), author_did: commit.repo.clone(),
@ -122,9 +120,7 @@ async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
} }
} }
COLLECTION_LIKE => { COLLECTION_LIKE => {
use atrium_api::app::bsky::feed::like::Record; let record: LikeRecord = read_record(&block)?;
let record: Record = ciborium::from_reader(&mut block.as_slice())?;
Operation::CreateLike { Operation::CreateLike {
author_did: commit.repo.clone(), author_did: commit.repo.clone(),
@ -135,9 +131,7 @@ async fn extract_operations(commit: &Commit) -> Result<Vec<Operation>> {
} }
} }
COLLECTION_FOLLOW => { COLLECTION_FOLLOW => {
use atrium_api::app::bsky::graph::follow::Record; let record: FollowRecord = read_record(&block)?;
let record: Record = ciborium::from_reader(&mut block.as_slice())?;
Operation::CreateFollow { Operation::CreateFollow {
author_did: commit.repo.clone(), author_did: commit.repo.clone(),