50 lines
1.3 KiB
Rust
50 lines
1.3 KiB
Rust
|
use futures::{stream::BoxStream, Future, Stream, TryStreamExt};
|
||
|
|
||
|
mod console;
|
||
|
pub use console::ConsoleSelector;
|
||
|
|
||
|
pub trait Selector {
|
||
|
type Error;
|
||
|
type Response: Future<Output = Result<bool, Self::Error>>;
|
||
|
|
||
|
fn review(&self, data: String) -> Self::Response;
|
||
|
}
|
||
|
|
||
|
pub trait SelectorExt<E, S: Stream<Item = Result<String, E>>>: Selector {
|
||
|
type Stream: Stream<Item = Result<String, FilterError<E, Self::Error>>>;
|
||
|
|
||
|
fn filter(self, stream: S) -> Self::Stream;
|
||
|
}
|
||
|
|
||
|
pub enum FilterError<T, U> {
|
||
|
Model(T),
|
||
|
Filter(U),
|
||
|
}
|
||
|
|
||
|
impl<
|
||
|
E: 'static,
|
||
|
T: Selector + Sync + Send + Clone + 'static,
|
||
|
S: Send + Stream<Item = Result<String, E>> + 'static,
|
||
|
> SelectorExt<E, S> for T
|
||
|
where
|
||
|
T::Response: Send,
|
||
|
{
|
||
|
type Stream = BoxStream<'static, Result<String, FilterError<E, Self::Error>>>;
|
||
|
|
||
|
fn filter(self, stream: S) -> Self::Stream {
|
||
|
Box::pin(
|
||
|
stream
|
||
|
.map_err(FilterError::Model)
|
||
|
.try_filter_map(move |item| {
|
||
|
let this = self.clone();
|
||
|
async move {
|
||
|
this.review(item.clone())
|
||
|
.await
|
||
|
.map_err(FilterError::Filter)
|
||
|
.map(move |keep| if keep { Some(item) } else { None })
|
||
|
}
|
||
|
}),
|
||
|
)
|
||
|
}
|
||
|
}
|