61 lines
1.6 KiB
Rust
61 lines
1.6 KiB
Rust
use futures::{stream::BoxStream, Future, Stream, TryStreamExt};
|
|
use std::fmt::Debug;
|
|
use thiserror::Error;
|
|
|
|
mod any;
|
|
pub mod telegram;
|
|
pub use any::AnySelector;
|
|
pub use telegram::TelegramSelector;
|
|
|
|
pub trait Selector {
|
|
type Error;
|
|
type Response: Future<Output = Result<bool, Self::Error>>;
|
|
|
|
fn review(&self, data: String) -> Self::Response;
|
|
}
|
|
|
|
pub trait SelectorExt<E: Debug, S: Stream<Item = Result<String, E>>>: Selector
|
|
where
|
|
Self::Error: Debug,
|
|
{
|
|
type Stream: Stream<Item = Result<String, FilterError<E, Self::Error>>>;
|
|
|
|
fn filter(self, stream: S) -> Self::Stream;
|
|
}
|
|
|
|
#[derive(Debug, Error)]
|
|
pub enum FilterError<T: Debug, U: Debug> {
|
|
#[error("model sampling failed: {0:?}")]
|
|
Model(T),
|
|
#[error("filtering failed: {0:?}")]
|
|
Filter(U),
|
|
}
|
|
|
|
impl<
|
|
E: Debug + 'static,
|
|
T: Selector + Sync + Send + Clone + 'static,
|
|
S: Send + Stream<Item = Result<String, E>> + 'static,
|
|
> SelectorExt<E, S> for T
|
|
where
|
|
T::Response: Send,
|
|
T::Error: Debug,
|
|
{
|
|
type Stream = BoxStream<'static, Result<String, FilterError<E, Self::Error>>>;
|
|
|
|
fn filter(self, stream: S) -> Self::Stream {
|
|
Box::pin(
|
|
stream
|
|
.map_err(FilterError::Model)
|
|
.try_filter_map(move |item| {
|
|
let this = self.clone();
|
|
async move {
|
|
this.review(item.clone())
|
|
.await
|
|
.map_err(FilterError::Filter)
|
|
.map(move |keep| if keep { Some(item) } else { None })
|
|
}
|
|
}),
|
|
)
|
|
}
|
|
}
|