use futures::{stream::BoxStream, Future, Stream, TryStreamExt}; mod console; pub mod telegram; pub use console::ConsoleSelector; pub use telegram::TelegramSelector; pub trait Selector { type Error; type Response: Future>; fn review(&self, data: String) -> Self::Response; } pub trait SelectorExt>>: Selector { type Stream: Stream>>; fn filter(self, stream: S) -> Self::Stream; } pub enum FilterError { Model(T), Filter(U), } impl< E: 'static, T: Selector + Sync + Send + Clone + 'static, S: Send + Stream> + 'static, > SelectorExt for T where T::Response: Send, { type Stream = BoxStream<'static, Result>>; fn filter(self, stream: S) -> Self::Stream { Box::pin( stream .map_err(FilterError::Model) .try_filter_map(move |item| { let this = self.clone(); async move { this.review(item.clone()) .await .map_err(FilterError::Filter) .map(move |keep| if keep { Some(item) } else { None }) } }), ) } }