Restructure Bluesky-related code a bit

- Put internal stuff (cbor, ipld deserialization, xprc client) into internals module
- Move various record types into separate modules under entities
- Also move session into entities as well
- Simplify CBOR conversion stuff by liberal usage of TryFrom

This will all make it a little easier to implement additional things, like filtering out replies
This commit is contained in:
Aleksei Voronov 2023-10-15 11:47:48 +02:00
parent f008057f8a
commit 5eeb0e45b1
13 changed files with 186 additions and 165 deletions

View File

@ -1,9 +1,7 @@
mod client; mod client;
mod decode; mod entities;
mod proto; mod internals;
mod session;
mod streaming; mod streaming;
mod xrpc_client;
pub use client::Bluesky; pub use client::Bluesky;
pub use streaming::{CommitDetails, CommitProcessor, Operation}; pub use streaming::{CommitDetails, CommitProcessor, Operation};

View File

@ -13,9 +13,9 @@ use futures::StreamExt;
use log::error; use log::error;
use tokio_tungstenite::{connect_async, tungstenite}; use tokio_tungstenite::{connect_async, tungstenite};
use super::session::Session; use super::entities::Session;
use super::internals::xrpc::AuthenticateableXrpcClient;
use super::streaming::{handle_message, CommitProcessor}; use super::streaming::{handle_message, CommitProcessor};
use super::xrpc_client::AuthenticateableXrpcClient;
#[derive(Debug)] #[derive(Debug)]
pub struct ProfileDetails { pub struct ProfileDetails {

View File

@ -1,156 +0,0 @@
use anyhow::{anyhow, Error, Result};
use sk_cbor::Value;
type CborMap = Vec<(Value, Value)>;
pub struct PostRecord {
pub langs: Option<Vec<String>>,
pub text: String,
}
impl TryFrom<&CborMap> for PostRecord {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut text: Option<&str> = None;
let mut langs: Option<Vec<&str>> = None;
for (key, value) in iter_string_keys(root) {
match key {
"text" => text = Some(string(value)?),
"langs" => langs = Some(array_of_strings(value)?),
_ => continue,
}
}
Ok(PostRecord {
text: text
.ok_or_else(|| anyhow!("Missing field: text"))?
.to_owned(),
langs: langs.map(|v| v.into_iter().map(str::to_owned).collect()),
})
}
}
pub struct LikeRecord {
pub subject: Subject,
}
impl TryFrom<&CborMap> for LikeRecord {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut subject = None;
for (key, value) in iter_string_keys(root) {
match key {
"subject" => subject = Some(map(value)?.try_into()?),
_ => continue,
}
}
Ok(LikeRecord {
subject: subject.ok_or_else(|| anyhow!("Missing field: subject"))?,
})
}
}
pub struct Subject {
pub cid: String,
pub uri: String,
}
impl TryFrom<&CborMap> for Subject {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut cid = None;
let mut uri = None;
for (key, value) in iter_string_keys(root) {
match key {
"cid" => cid = Some(string(value)?),
"uri" => uri = Some(string(value)?),
_ => continue,
}
}
Ok(Subject {
cid: cid.ok_or_else(|| anyhow!("Missing field: cid"))?.to_owned(),
uri: uri.ok_or_else(|| anyhow!("Missing field: uri"))?.to_owned(),
})
}
}
pub struct FollowRecord {
pub subject: String,
}
impl TryFrom<&CborMap> for FollowRecord {
type Error = Error;
fn try_from(root: &CborMap) -> Result<Self> {
let mut subject = None;
for (key, value) in iter_string_keys(root) {
match key {
"subject" => subject = Some(string(value)?),
_ => continue,
}
}
Ok(FollowRecord {
subject: subject
.ok_or_else(|| anyhow!("Missing field: subject"))?
.to_owned(),
})
}
}
pub fn read_record<T: for<'a> TryFrom<&'a CborMap, Error = Error>>(bytes: &[u8]) -> Result<T> {
let root = match sk_cbor::read(bytes) {
Err(_) => return Err(anyhow!("Could not decode anything")),
Ok(v) => v,
};
let root_map = match root {
Value::Map(m) => m,
_ => return Err(anyhow!("Expected root object to be a map")),
};
(&root_map).try_into()
}
fn iter_string_keys(map: &CborMap) -> impl Iterator<Item = (&str, &Value)> {
map.iter().flat_map(|(k, v)| match k {
Value::TextString(k) => Some((k.as_str(), v)),
_ => None,
})
}
fn map(value: &Value) -> Result<&CborMap> {
match value {
Value::Map(m) => Ok(m),
_ => Err(anyhow!("Expected a map")),
}
}
fn string(value: &Value) -> Result<&str> {
match value {
Value::TextString(value) => Ok(value.as_str()),
_ => Err(anyhow!("Expected string")),
}
}
fn array_of_strings(value: &Value) -> Result<Vec<&str>> {
match value {
Value::Array(vec) => {
let mut res = Vec::with_capacity(vec.len());
for vec_value in vec {
res.push(string(vec_value)?)
}
Ok(res)
}
_ => Err(anyhow!("Expected array")),
}
}

View File

@ -0,0 +1,9 @@
mod follow;
mod like;
mod post;
mod session;
pub use follow::FollowRecord;
pub use like::LikeRecord;
pub use post::PostRecord;
pub use session::Session;

View File

@ -0,0 +1,24 @@
use std::collections::HashMap;
use anyhow::{anyhow, Error, Result};
use crate::services::bluesky::internals::cbor::CborValue;
pub struct FollowRecord {
pub subject: String,
}
impl TryFrom<CborValue> for FollowRecord {
type Error = Error;
fn try_from(root: CborValue) -> Result<Self> {
let mut map: HashMap<_, _> = root.try_into()?;
Ok(FollowRecord {
subject: map
.remove("subject")
.ok_or_else(|| anyhow!("Missing field: subject"))?
.try_into()?,
})
}
}

View File

@ -0,0 +1,48 @@
use std::collections::HashMap;
use anyhow::{anyhow, Error, Result};
use crate::services::bluesky::internals::cbor::CborValue;
pub struct LikeRecord {
pub subject: Subject,
}
impl TryFrom<CborValue> for LikeRecord {
type Error = Error;
fn try_from(root: CborValue) -> Result<Self> {
let mut map: HashMap<_, _> = root.try_into()?;
Ok(LikeRecord {
subject: map
.remove("subject")
.ok_or_else(|| anyhow!("Missing field: subject"))?
.try_into()?,
})
}
}
pub struct Subject {
pub cid: String,
pub uri: String,
}
impl TryFrom<CborValue> for Subject {
type Error = Error;
fn try_from(root: CborValue) -> Result<Self> {
let mut map: HashMap<_, _> = root.try_into()?;
Ok(Subject {
cid: map
.remove("cid")
.ok_or_else(|| anyhow!("Missing field: cid"))?
.try_into()?,
uri: map
.remove("uri")
.ok_or_else(|| anyhow!("Missing field: uri"))?
.try_into()?,
})
}
}

View File

@ -0,0 +1,26 @@
use std::collections::HashMap;
use anyhow::{anyhow, Error, Result};
use crate::services::bluesky::internals::cbor::CborValue;
pub struct PostRecord {
pub langs: Option<Vec<String>>,
pub text: String,
}
impl TryFrom<CborValue> for PostRecord {
type Error = Error;
fn try_from(root: CborValue) -> Result<Self> {
let mut map: HashMap<_, _> = root.try_into()?;
Ok(PostRecord {
text: map
.remove("text")
.ok_or_else(|| anyhow!("Missing field: text"))?
.try_into()?,
langs: map.remove("langs").map(|value| value.try_into()).transpose()?,
})
}
}

View File

@ -0,0 +1,3 @@
pub mod cbor;
pub mod ipld;
pub mod xrpc;

View File

@ -0,0 +1,68 @@
use std::collections::HashMap;
use anyhow::{anyhow, Error, Result};
use sk_cbor::Value;
pub struct CborValue {
inner: Value,
}
impl CborValue {
pub fn new(inner: Value) -> CborValue {
CborValue { inner }
}
}
impl TryFrom<CborValue> for HashMap<String, CborValue> {
type Error = Error;
fn try_from(value: CborValue) -> Result<Self> {
match value.inner {
Value::Map(entries) => {
let mut result = HashMap::with_capacity(entries.len());
for (key, value) in entries {
result.insert(CborValue::new(key).try_into()?, CborValue::new(value));
}
Ok(result)
}
_ => Err(anyhow!("Not a map")),
}
}
}
impl TryFrom<CborValue> for String {
type Error = Error;
fn try_from(value: CborValue) -> Result<Self> {
match value.inner {
Value::TextString(value) => Ok(value),
_ => Err(anyhow!("Expected string")),
}
}
}
impl TryFrom<CborValue> for Vec<String> {
type Error = Error;
fn try_from(value: CborValue) -> Result<Self> {
match value.inner {
Value::Array(vec) => {
let mut res = Vec::with_capacity(vec.len());
for vec_value in vec {
res.push(CborValue::new(vec_value).try_into()?)
}
Ok(res)
}
_ => Err(anyhow!("Expected array")),
}
}
}
pub fn read_record<T: TryFrom<CborValue, Error = Error>>(bytes: &[u8]) -> Result<T> {
let root = match sk_cbor::read(bytes) {
Err(_) => return Err(anyhow!("Could not decode anything")),
Ok(v) => v,
};
CborValue::new(root).try_into()
}

View File

@ -3,7 +3,7 @@ use atrium_xrpc::{client::reqwest::ReqwestClient, HttpClient, XrpcClient};
use http::{Method, Request, Response}; use http::{Method, Request, Response};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use super::session::Session; use crate::services::bluesky::entities::Session;
pub struct AuthenticateableXrpcClient { pub struct AuthenticateableXrpcClient {
inner: ReqwestClient, inner: ReqwestClient,

View File

@ -6,8 +6,9 @@ use atrium_api::com::atproto::sync::subscribe_repos::{Commit, Message};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use super::{ use super::{
decode::{read_record, FollowRecord, LikeRecord, PostRecord}, entities::{FollowRecord, LikeRecord, PostRecord},
proto::Frame, internals::cbor::read_record,
internals::ipld::Frame,
}; };
const COLLECTION_POST: &str = "app.bsky.feed.post"; const COLLECTION_POST: &str = "app.bsky.feed.post";