From 5eeb0e45b1a8ac0ec516a7c8eb44f8b752608420 Mon Sep 17 00:00:00 2001 From: Aleksei Voronov Date: Sun, 15 Oct 2023 11:47:48 +0200 Subject: [PATCH] Restructure Bluesky-related code a bit - Put internal stuff (cbor, ipld deserialization, xprc client) into internals module - Move various record types into separate modules under entities - Also move session into entities as well - Simplify CBOR conversion stuff by liberal usage of TryFrom This will all make it a little easier to implement additional things, like filtering out replies --- src/services/bluesky.rs | 6 +- src/services/bluesky/client.rs | 4 +- src/services/bluesky/decode.rs | 156 ------------------ src/services/bluesky/entities.rs | 9 + src/services/bluesky/entities/follow.rs | 24 +++ src/services/bluesky/entities/like.rs | 48 ++++++ src/services/bluesky/entities/post.rs | 26 +++ .../bluesky/{ => entities}/session.rs | 0 src/services/bluesky/internals.rs | 3 + src/services/bluesky/internals/cbor.rs | 68 ++++++++ .../bluesky/{proto.rs => internals/ipld.rs} | 0 .../{xrpc_client.rs => internals/xrpc.rs} | 2 +- src/services/bluesky/streaming.rs | 5 +- 13 files changed, 186 insertions(+), 165 deletions(-) delete mode 100644 src/services/bluesky/decode.rs create mode 100644 src/services/bluesky/entities.rs create mode 100644 src/services/bluesky/entities/follow.rs create mode 100644 src/services/bluesky/entities/like.rs create mode 100644 src/services/bluesky/entities/post.rs rename src/services/bluesky/{ => entities}/session.rs (100%) create mode 100644 src/services/bluesky/internals.rs create mode 100644 src/services/bluesky/internals/cbor.rs rename src/services/bluesky/{proto.rs => internals/ipld.rs} (100%) rename src/services/bluesky/{xrpc_client.rs => internals/xrpc.rs} (97%) diff --git a/src/services/bluesky.rs b/src/services/bluesky.rs index 1abd439..27b59bb 100644 --- a/src/services/bluesky.rs +++ b/src/services/bluesky.rs @@ -1,9 +1,7 @@ mod client; -mod decode; -mod proto; -mod session; +mod entities; +mod internals; mod streaming; -mod xrpc_client; pub use client::Bluesky; pub use streaming::{CommitDetails, CommitProcessor, Operation}; diff --git a/src/services/bluesky/client.rs b/src/services/bluesky/client.rs index 250aa2f..cc50af1 100644 --- a/src/services/bluesky/client.rs +++ b/src/services/bluesky/client.rs @@ -13,9 +13,9 @@ use futures::StreamExt; use log::error; use tokio_tungstenite::{connect_async, tungstenite}; -use super::session::Session; +use super::entities::Session; +use super::internals::xrpc::AuthenticateableXrpcClient; use super::streaming::{handle_message, CommitProcessor}; -use super::xrpc_client::AuthenticateableXrpcClient; #[derive(Debug)] pub struct ProfileDetails { diff --git a/src/services/bluesky/decode.rs b/src/services/bluesky/decode.rs deleted file mode 100644 index ffeb2b5..0000000 --- a/src/services/bluesky/decode.rs +++ /dev/null @@ -1,156 +0,0 @@ -use anyhow::{anyhow, Error, Result}; -use sk_cbor::Value; - -type CborMap = Vec<(Value, Value)>; - -pub struct PostRecord { - pub langs: Option>, - pub text: String, -} - -impl TryFrom<&CborMap> for PostRecord { - type Error = Error; - - fn try_from(root: &CborMap) -> Result { - let mut text: Option<&str> = None; - let mut langs: Option> = None; - - for (key, value) in iter_string_keys(root) { - match key { - "text" => text = Some(string(value)?), - "langs" => langs = Some(array_of_strings(value)?), - _ => continue, - } - } - - Ok(PostRecord { - text: text - .ok_or_else(|| anyhow!("Missing field: text"))? - .to_owned(), - langs: langs.map(|v| v.into_iter().map(str::to_owned).collect()), - }) - } -} - -pub struct LikeRecord { - pub subject: Subject, -} - -impl TryFrom<&CborMap> for LikeRecord { - type Error = Error; - - fn try_from(root: &CborMap) -> Result { - let mut subject = None; - - for (key, value) in iter_string_keys(root) { - match key { - "subject" => subject = Some(map(value)?.try_into()?), - _ => continue, - } - } - - Ok(LikeRecord { - subject: subject.ok_or_else(|| anyhow!("Missing field: subject"))?, - }) - } -} - -pub struct Subject { - pub cid: String, - pub uri: String, -} - -impl TryFrom<&CborMap> for Subject { - type Error = Error; - - fn try_from(root: &CborMap) -> Result { - let mut cid = None; - let mut uri = None; - - for (key, value) in iter_string_keys(root) { - match key { - "cid" => cid = Some(string(value)?), - "uri" => uri = Some(string(value)?), - _ => continue, - } - } - - Ok(Subject { - cid: cid.ok_or_else(|| anyhow!("Missing field: cid"))?.to_owned(), - uri: uri.ok_or_else(|| anyhow!("Missing field: uri"))?.to_owned(), - }) - } -} - -pub struct FollowRecord { - pub subject: String, -} - -impl TryFrom<&CborMap> for FollowRecord { - type Error = Error; - - fn try_from(root: &CborMap) -> Result { - let mut subject = None; - - for (key, value) in iter_string_keys(root) { - match key { - "subject" => subject = Some(string(value)?), - _ => continue, - } - } - - Ok(FollowRecord { - subject: subject - .ok_or_else(|| anyhow!("Missing field: subject"))? - .to_owned(), - }) - } -} - -pub fn read_record TryFrom<&'a CborMap, Error = Error>>(bytes: &[u8]) -> Result { - let root = match sk_cbor::read(bytes) { - Err(_) => return Err(anyhow!("Could not decode anything")), - Ok(v) => v, - }; - - let root_map = match root { - Value::Map(m) => m, - _ => return Err(anyhow!("Expected root object to be a map")), - }; - - (&root_map).try_into() -} - -fn iter_string_keys(map: &CborMap) -> impl Iterator { - map.iter().flat_map(|(k, v)| match k { - Value::TextString(k) => Some((k.as_str(), v)), - _ => None, - }) -} - -fn map(value: &Value) -> Result<&CborMap> { - match value { - Value::Map(m) => Ok(m), - _ => Err(anyhow!("Expected a map")), - } -} - -fn string(value: &Value) -> Result<&str> { - match value { - Value::TextString(value) => Ok(value.as_str()), - _ => Err(anyhow!("Expected string")), - } -} - -fn array_of_strings(value: &Value) -> Result> { - match value { - Value::Array(vec) => { - let mut res = Vec::with_capacity(vec.len()); - for vec_value in vec { - res.push(string(vec_value)?) - } - Ok(res) - } - _ => Err(anyhow!("Expected array")), - } -} diff --git a/src/services/bluesky/entities.rs b/src/services/bluesky/entities.rs new file mode 100644 index 0000000..e734a80 --- /dev/null +++ b/src/services/bluesky/entities.rs @@ -0,0 +1,9 @@ +mod follow; +mod like; +mod post; +mod session; + +pub use follow::FollowRecord; +pub use like::LikeRecord; +pub use post::PostRecord; +pub use session::Session; diff --git a/src/services/bluesky/entities/follow.rs b/src/services/bluesky/entities/follow.rs new file mode 100644 index 0000000..8c96f13 --- /dev/null +++ b/src/services/bluesky/entities/follow.rs @@ -0,0 +1,24 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Error, Result}; + +use crate::services::bluesky::internals::cbor::CborValue; + +pub struct FollowRecord { + pub subject: String, +} + +impl TryFrom for FollowRecord { + type Error = Error; + + fn try_from(root: CborValue) -> Result { + let mut map: HashMap<_, _> = root.try_into()?; + + Ok(FollowRecord { + subject: map + .remove("subject") + .ok_or_else(|| anyhow!("Missing field: subject"))? + .try_into()?, + }) + } +} diff --git a/src/services/bluesky/entities/like.rs b/src/services/bluesky/entities/like.rs new file mode 100644 index 0000000..40282d3 --- /dev/null +++ b/src/services/bluesky/entities/like.rs @@ -0,0 +1,48 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Error, Result}; + +use crate::services::bluesky::internals::cbor::CborValue; + +pub struct LikeRecord { + pub subject: Subject, +} + +impl TryFrom for LikeRecord { + type Error = Error; + + fn try_from(root: CborValue) -> Result { + let mut map: HashMap<_, _> = root.try_into()?; + + Ok(LikeRecord { + subject: map + .remove("subject") + .ok_or_else(|| anyhow!("Missing field: subject"))? + .try_into()?, + }) + } +} + +pub struct Subject { + pub cid: String, + pub uri: String, +} + +impl TryFrom for Subject { + type Error = Error; + + fn try_from(root: CborValue) -> Result { + let mut map: HashMap<_, _> = root.try_into()?; + + Ok(Subject { + cid: map + .remove("cid") + .ok_or_else(|| anyhow!("Missing field: cid"))? + .try_into()?, + uri: map + .remove("uri") + .ok_or_else(|| anyhow!("Missing field: uri"))? + .try_into()?, + }) + } +} diff --git a/src/services/bluesky/entities/post.rs b/src/services/bluesky/entities/post.rs new file mode 100644 index 0000000..1facfef --- /dev/null +++ b/src/services/bluesky/entities/post.rs @@ -0,0 +1,26 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Error, Result}; + +use crate::services::bluesky::internals::cbor::CborValue; + +pub struct PostRecord { + pub langs: Option>, + pub text: String, +} + +impl TryFrom for PostRecord { + type Error = Error; + + fn try_from(root: CborValue) -> Result { + let mut map: HashMap<_, _> = root.try_into()?; + + Ok(PostRecord { + text: map + .remove("text") + .ok_or_else(|| anyhow!("Missing field: text"))? + .try_into()?, + langs: map.remove("langs").map(|value| value.try_into()).transpose()?, + }) + } +} diff --git a/src/services/bluesky/session.rs b/src/services/bluesky/entities/session.rs similarity index 100% rename from src/services/bluesky/session.rs rename to src/services/bluesky/entities/session.rs diff --git a/src/services/bluesky/internals.rs b/src/services/bluesky/internals.rs new file mode 100644 index 0000000..9b3ebb0 --- /dev/null +++ b/src/services/bluesky/internals.rs @@ -0,0 +1,3 @@ +pub mod cbor; +pub mod ipld; +pub mod xrpc; diff --git a/src/services/bluesky/internals/cbor.rs b/src/services/bluesky/internals/cbor.rs new file mode 100644 index 0000000..fe82ea2 --- /dev/null +++ b/src/services/bluesky/internals/cbor.rs @@ -0,0 +1,68 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, Error, Result}; +use sk_cbor::Value; + +pub struct CborValue { + inner: Value, +} + +impl CborValue { + pub fn new(inner: Value) -> CborValue { + CborValue { inner } + } +} + +impl TryFrom for HashMap { + type Error = Error; + + fn try_from(value: CborValue) -> Result { + match value.inner { + Value::Map(entries) => { + let mut result = HashMap::with_capacity(entries.len()); + for (key, value) in entries { + result.insert(CborValue::new(key).try_into()?, CborValue::new(value)); + } + Ok(result) + } + _ => Err(anyhow!("Not a map")), + } + } +} + +impl TryFrom for String { + type Error = Error; + + fn try_from(value: CborValue) -> Result { + match value.inner { + Value::TextString(value) => Ok(value), + _ => Err(anyhow!("Expected string")), + } + } +} + +impl TryFrom for Vec { + type Error = Error; + + fn try_from(value: CborValue) -> Result { + match value.inner { + Value::Array(vec) => { + let mut res = Vec::with_capacity(vec.len()); + for vec_value in vec { + res.push(CborValue::new(vec_value).try_into()?) + } + Ok(res) + } + _ => Err(anyhow!("Expected array")), + } + } +} + +pub fn read_record>(bytes: &[u8]) -> Result { + let root = match sk_cbor::read(bytes) { + Err(_) => return Err(anyhow!("Could not decode anything")), + Ok(v) => v, + }; + + CborValue::new(root).try_into() +} diff --git a/src/services/bluesky/proto.rs b/src/services/bluesky/internals/ipld.rs similarity index 100% rename from src/services/bluesky/proto.rs rename to src/services/bluesky/internals/ipld.rs diff --git a/src/services/bluesky/xrpc_client.rs b/src/services/bluesky/internals/xrpc.rs similarity index 97% rename from src/services/bluesky/xrpc_client.rs rename to src/services/bluesky/internals/xrpc.rs index 888000f..b3fa58a 100644 --- a/src/services/bluesky/xrpc_client.rs +++ b/src/services/bluesky/internals/xrpc.rs @@ -3,7 +3,7 @@ use atrium_xrpc::{client::reqwest::ReqwestClient, HttpClient, XrpcClient}; use http::{Method, Request, Response}; use std::sync::{Arc, Mutex}; -use super::session::Session; +use crate::services::bluesky::entities::Session; pub struct AuthenticateableXrpcClient { inner: ReqwestClient, diff --git a/src/services/bluesky/streaming.rs b/src/services/bluesky/streaming.rs index d84e879..aea8590 100644 --- a/src/services/bluesky/streaming.rs +++ b/src/services/bluesky/streaming.rs @@ -6,8 +6,9 @@ use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message}; use chrono::{DateTime, Utc}; use super::{ - decode::{read_record, FollowRecord, LikeRecord, PostRecord}, - proto::Frame, + entities::{FollowRecord, LikeRecord, PostRecord}, + internals::cbor::read_record, + internals::ipld::Frame, }; const COLLECTION_POST: &str = "app.bsky.feed.post";