use circular::Buffer; use futures::{FutureExt, Stream}; use nom::Err; use std::{ collections::{BTreeMap, HashMap, HashSet}, future::Future, path::Prefix, pin::{pin, Pin}, str::{self, FromStr}, }; use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt}; static MAX_STANZA_SIZE: usize = 65536; use crate::{ element::{Content, Element, Name, NamespaceDeclaration}, error::Error, xml::{self, parsers::Parser}, Result, }; /// streaming reader that tracks depth and available namespaces at current depth pub struct Reader { inner: R, buffer: Buffer, // holds which tags we are in atm over depth // to have names reference namespaces could depth: Vec, namespace_declarations: Vec>, } impl Reader { pub fn new(reader: R) -> Self { Self { inner: reader, buffer: Buffer::with_capacity(MAX_STANZA_SIZE), depth: Vec::new(), namespace_declarations: Vec::new(), } } } impl Reader where R: AsyncRead + Unpin, { async fn read_buf<'s>(&mut self) -> Result { Ok(self.inner.read_buf(&mut self.buffer).await?) } async fn read_prolog<'s>(&'s mut self) -> Result<()> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::Prolog::parse(input) { Ok((rest, _prolog)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); self.buffer.consume(len); return Ok(()); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_start_tag<'s>(&'s mut self) -> Result { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::STag::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Reader::::start_tag_from_xml( &mut self.depth, &mut self.namespace_declarations, e, )?; self.buffer.consume(len); return Ok(element); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_end_tag<'s>(&'s mut self) -> Result<()> { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::ETag::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); Reader::::end_tag_from_xml( &mut self.depth, &mut self.namespace_declarations, e, )?; self.buffer.consume(len); return Ok(()); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_element<'s>(&'s mut self) -> Result { loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; match xml::Element::parse(input) { Ok((rest, e)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Reader::::element_from_xml(&mut self.namespace_declarations, e)?; self.buffer.consume(len); return Ok(element); } std::result::Result::Err(e) => match e { Err::Incomplete(_) => {} // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } async fn read_content<'s>(&'s mut self) -> Result { let mut last_char = false; let mut text = String::new(); loop { self.read_buf().await?; let input = str::from_utf8(self.buffer.data())?; if last_char == false { match xml::CharData::parse(input) { Ok((rest, char_data)) => { let len = self.buffer.available_data() - rest.as_bytes().len(); text.push_str(*char_data); self.buffer.consume(len); last_char = true; } std::result::Result::Err(e) => match e { Err::Incomplete(_needed) => continue, _ => match xml::ContentItem::parse(input) { Ok((rest, content_item)) => match content_item { xml::ContentItem::Element(element) => { if !text.is_empty() { return Ok(Content::Text(text)); } else { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Self::element_from_xml( &mut self.namespace_declarations, element, )?; self.buffer.consume(len); return Ok(Content::Element(element)); } } xml::ContentItem::Reference(reference) => { let len = self.buffer.available_data() - rest.as_bytes().len(); text.push(reference.process()?); self.buffer.consume(len); continue; } xml::ContentItem::CDSect(cd_sect) => { let len = self.buffer.available_data() - rest.as_bytes().len(); text.push_str(**cd_sect); self.buffer.consume(len); continue; } xml::ContentItem::PI(_pi) => { if !text.is_empty() { return Ok(Content::Text(text)); } else { let len = self.buffer.available_data() - rest.as_bytes().len(); self.buffer.consume(len); return Ok(Content::PI); } } xml::ContentItem::Comment(comment) => { if !text.is_empty() { return Ok(Content::Text(text)); } else { let len = self.buffer.available_data() - rest.as_bytes().len(); let comment = comment.to_string(); self.buffer.consume(len); return Ok(Content::Comment(comment)); } } }, std::result::Result::Err(e) => match e { Err::Incomplete(_) => continue, // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, }, }, } } else { match xml::ContentItem::parse(input) { Ok((rest, content_item)) => match content_item { xml::ContentItem::Element(element) => { // text can still be empty if !text.is_empty() { return Ok(Content::Text(text)); } else { let len = self.buffer.available_data() - rest.as_bytes().len(); let element = Self::element_from_xml( &mut self.namespace_declarations, element, )?; self.buffer.consume(len); return Ok(Content::Element(element)); } } xml::ContentItem::Reference(reference) => { let len = self.buffer.available_data() - rest.as_bytes().len(); text.push(reference.process()?); self.buffer.consume(len); last_char = false; continue; } xml::ContentItem::CDSect(cd_sect) => { let len = self.buffer.available_data() - rest.as_bytes().len(); text.push_str(**cd_sect); self.buffer.consume(len); last_char = false; continue; } xml::ContentItem::PI(_pi) => { if !text.is_empty() { return Ok(Content::Text(text)); } else { let len = self.buffer.available_data() - rest.as_bytes().len(); self.buffer.consume(len); return Ok(Content::PI); } } xml::ContentItem::Comment(comment) => { let len = self.buffer.available_data() - rest.as_bytes().len(); let comment = comment.to_string(); self.buffer.consume(len); return Ok(Content::Comment(comment)); } }, std::result::Result::Err(e) => match e { Err::Incomplete(_) => continue, // TODO: better error Err::Error(e) => return Err(Error::ParseError(e.to_string())), Err::Failure(e) => return Err(Error::ParseError(e.to_string())), }, } } } } } impl Reader { fn start_tag_from_xml( depth: &mut Vec, namespace_declarations: &mut Vec>, s_tag: xml::STag, ) -> Result { // namespace declarations on element let mut element_namespace_declarations = HashSet::new(); for (prefix, namespace) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = NamespaceDeclaration { prefix, namespace: namespace.process()?, }; if !element_namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpaceDeclaration(namespace)); } } // all namespaces available in the element scope (from both parent elements and element itself) let namespace_declarations_stack: Vec<&NamespaceDeclaration> = namespace_declarations .iter() .flatten() .chain(element_namespace_declarations.iter()) .collect(); // element name and default attribute namespace let element_namespace_declaration; let element_local_name = s_tag.name.local_part().to_string(); match s_tag.name.prefix() { Some(prefix) => { element_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix)); } None => { element_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix == None); } } let element_default_namespace = element_namespace_declaration .ok_or_else(|| Error::UnqualifiedNamespace(s_tag.name.to_string()))? .namespace .clone(); let element_name = Name { namespace: element_default_namespace, local_name: element_local_name, }; // attributes let mut attributes = HashMap::new(); for (q_name, value) in s_tag.attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let attribute_namespace_declaration; let attribute_local_name = q_name.local_part().to_string(); match q_name.prefix() { Some(prefix) => { attribute_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace_declaration| { namespace_declaration.prefix.as_deref() == Some(prefix) }); } None => attribute_namespace_declaration = element_namespace_declaration, } if let Some(namespace_declaration) = attribute_namespace_declaration { let name = Name { namespace: namespace_declaration.namespace.clone(), local_name: attribute_local_name, }; let value = value.process()?; // check for duplicate attribute if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } depth.push(element_name.clone()); namespace_declarations.push(element_namespace_declarations.clone()); return Ok(Element { name: element_name, namespace_declarations: element_namespace_declarations, attributes, content: Vec::new(), }); } fn end_tag_from_xml( depth: &mut Vec, namespace_declarations: &mut Vec>, xml_e_tag: xml::ETag, ) -> Result<()> { if let Some(s_tag_name) = depth.pop() { let e_tag_namespace_declaration; let e_tag_local_name = xml_e_tag.name.local_part().to_string(); let namespace_declarations_stack: Vec<_> = namespace_declarations.iter().flatten().collect(); match xml_e_tag.name.prefix() { Some(prefix) => { e_tag_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix)); } None => { e_tag_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix == None); } } let e_tag_namespace = e_tag_namespace_declaration .ok_or_else(|| Error::UnqualifiedNamespace(xml_e_tag.name.to_string()))? .namespace .clone(); let e_tag_name = Name { namespace: e_tag_namespace, local_name: e_tag_local_name, }; if e_tag_name != s_tag_name { return Err(Error::MismatchedEndTag(s_tag_name, e_tag_name)); } if s_tag_name == e_tag_name { namespace_declarations.pop(); return Ok(()); } else { return Err(Error::MismatchedEndTag(s_tag_name, e_tag_name)); } } else { return Err(Error::NotInElement(xml_e_tag.name.to_string())); } } fn element_from_xml( namespace_declarations: &mut Vec>, element: xml::Element, ) -> Result { let xml_name; let xml_attributes; let xml_content; let xml_e_name; match element { xml::Element::Empty(empty_elem_tag) => { xml_name = empty_elem_tag.name; xml_attributes = empty_elem_tag.attributes; xml_content = None; xml_e_name = None; } xml::Element::NotEmpty(s_tag, content, e_tag) => { xml_name = s_tag.name; xml_attributes = s_tag.attributes; xml_content = Some(content); xml_e_name = Some(e_tag.name); } } // namespace declarations on element let mut element_namespace_declarations = HashSet::new(); for (prefix, namespace) in xml_attributes.iter().filter_map(|attribute| { if let xml::Attribute::NamespaceDeclaration { ns_name, value } = attribute { Some((ns_name, value)) } else { None } }) { let prefix = match prefix { xml::NSAttName::PrefixedAttName(prefixed_att_name) => { Some(prefixed_att_name.to_string()) } xml::NSAttName::DefaultAttName => None, }; let namespace = NamespaceDeclaration { prefix, namespace: namespace.process()?, }; if !element_namespace_declarations.insert(namespace.clone()) { return Err(Error::DuplicateNameSpaceDeclaration(namespace)); } } // all namespaces available in the element scope (from both parent elements and element itself) let namespace_declarations_stack: Vec<&NamespaceDeclaration> = namespace_declarations .iter() .flatten() .chain(element_namespace_declarations.iter()) .collect(); // element name and default attribute namespace let element_namespace_declaration; let element_local_name = xml_name.local_part().to_string(); match xml_name.prefix() { Some(prefix) => { element_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix)); } None => { element_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix == None); } } let element_default_namespace = element_namespace_declaration .ok_or_else(|| Error::UnqualifiedNamespace(xml_name.to_string()))? .namespace .clone(); let element_name = Name { namespace: element_default_namespace, local_name: element_local_name, }; // end tag name match check if let Some(xml_e_name) = xml_e_name { let e_tag_namespace_declaration; let e_tag_local_name = xml_e_name.local_part().to_string(); match xml_e_name.prefix() { Some(prefix) => { e_tag_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix.as_deref() == Some(prefix)); } None => { e_tag_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace| namespace.prefix == None); } } let e_tag_namespace = e_tag_namespace_declaration .ok_or_else(|| Error::UnqualifiedNamespace(xml_name.to_string()))? .namespace .clone(); let e_tag_name = Name { namespace: e_tag_namespace, local_name: e_tag_local_name, }; if e_tag_name != element_name { return Err(Error::MismatchedEndTag(element_name, e_tag_name)); } } // attributes let mut attributes = HashMap::new(); for (q_name, value) in xml_attributes.iter().filter_map(|attribute| { if let xml::Attribute::Attribute { name, value } = attribute { Some((name, value)) } else { None } }) { let attribute_namespace_declaration; let attribute_local_name = q_name.local_part().to_string(); match q_name.prefix() { Some(prefix) => { attribute_namespace_declaration = namespace_declarations_stack .iter() .rfind(|namespace_declaration| { namespace_declaration.prefix.as_deref() == Some(prefix) }); } None => attribute_namespace_declaration = element_namespace_declaration, } if let Some(namespace_declaration) = attribute_namespace_declaration { let name = Name { namespace: namespace_declaration.namespace.clone(), local_name: attribute_local_name, }; let value = value.process()?; // check for duplicate attribute if let Some(_value) = attributes.insert(name, value) { return Err(Error::DuplicateAttribute(q_name.to_string())); } } else { return Err(Error::UnqualifiedNamespace(q_name.to_string())); } } let content; if let Some(xml_content) = xml_content { namespace_declarations.push(element_namespace_declarations.clone()); content = Self::content_from_xml(namespace_declarations, xml_content)?; namespace_declarations.pop(); } else { content = Vec::new(); } return Ok(Element { name: element_name, namespace_declarations: element_namespace_declarations, attributes, content, }); } fn content_from_xml( namespaces: &mut Vec>, xml_content: xml::Content, ) -> Result> { let mut content = Vec::new(); let mut text = xml_content.char_data.map(|str| String::from(*str)); for (content_item, char_data) in xml_content.content { match content_item { xml::ContentItem::Element(element) => { text.map(|text| content.push(Content::Text(text))); content.push(Content::Element(Self::element_from_xml( namespaces, element, )?)); text = char_data.map(|str| String::from(*str)); } xml::ContentItem::Reference(reference) => { let data = reference.process()?; if let Some(text) = &mut text { text.push(data) } else { text = Some(String::from(data)) } char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } xml::ContentItem::CDSect(cd_sect) => { if let Some(text) = &mut text { text.push_str(**cd_sect) } else { text = Some(String::from(**cd_sect)) } char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } // TODO: is this important? xml::ContentItem::PI(pi) => { char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } // TODO: comments? xml::ContentItem::Comment(comment) => { char_data.map(|char_data| text.as_mut().map(|s| s.push_str(*char_data))); } } } text.map(|text| content.push(Content::Text(text))); Ok(content) } } impl Stream for Reader { type Item = Result; fn poll_next( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { let mut e = self; let mut pinned = pin!(e.read_content()); pinned.as_mut().poll(cx).map(|result| Some(result)) } } #[cfg(test)] mod test { use futures::{sink::Buffer, StreamExt}; use tokio::io::AsyncRead; use super::Reader; struct MockAsyncReader<'s> { put: bool, data: &'s str, } impl<'s> MockAsyncReader<'s> { fn new(data: &'s str) -> Self { Self { put: false, data } } } impl<'s> AsyncRead for MockAsyncReader<'s> { fn poll_read( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { if !self.put { buf.put_slice(self.data.as_bytes()); self.get_mut().put = true }; std::task::Poll::Ready(Ok(())) } } const TEST_DOC: &'static str = " asdf"; #[tokio::test] async fn test_element_read() { let mock = MockAsyncReader::new(TEST_DOC); let mut reader = Reader::new(mock); let element = reader.read_element().await.unwrap(); println!("{:#?}", element); } #[tokio::test] async fn test_element_stream() { let mock = MockAsyncReader::new(TEST_DOC); let mut reader = Reader::new(mock); let element = reader.read_start_tag().await.unwrap(); println!("start element: {:#?}", element); let mut content_count = 0; loop { if let Some(content) = reader.next().await { match content { Ok(content) => { content_count += 1; println!("content {}: {:#?}", content_count, content) } Err(_) => break, } } } reader.read_end_tag().await.unwrap() } }