fix stream header
This commit is contained in:
parent
1ff3cbc281
commit
6fefb35aca
|
@ -1351,6 +1351,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"base64",
|
||||||
"config_struct",
|
"config_struct",
|
||||||
"desec",
|
"desec",
|
||||||
"enum-display",
|
"enum-display",
|
||||||
|
@ -1358,6 +1359,7 @@ dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"pretty_env_logger",
|
"pretty_env_logger",
|
||||||
"quick-xml",
|
"quick-xml",
|
||||||
|
"rand",
|
||||||
"rcgen",
|
"rcgen",
|
||||||
"rsdns",
|
"rsdns",
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
|
|
|
@ -22,3 +22,5 @@ desec = { path = "../desec" }
|
||||||
rcgen = "0.11.1"
|
rcgen = "0.11.1"
|
||||||
rsdns = { version = "0.15.0", features = ["net-tokio"] }
|
rsdns = { version = "0.15.0", features = ["net-tokio"] }
|
||||||
rustls-pemfile = "1.0.3"
|
rustls-pemfile = "1.0.3"
|
||||||
|
rand = "0.8.5"
|
||||||
|
base64 = "0.21.2"
|
||||||
|
|
|
@ -1,16 +1,12 @@
|
||||||
use log::info;
|
use base64::Engine;
|
||||||
|
use log::{debug, info};
|
||||||
use quick_xml::{
|
use quick_xml::{
|
||||||
events::{attributes::Attributes, BytesDecl, BytesEnd, BytesStart, Event},
|
events::{attributes::Attributes, BytesDecl, BytesEnd, BytesStart, Event},
|
||||||
Writer,
|
Writer,
|
||||||
};
|
};
|
||||||
use rustls_pemfile::Item;
|
|
||||||
use tokio::io::AsyncWrite;
|
use tokio::io::AsyncWrite;
|
||||||
|
|
||||||
use crate::{
|
use crate::{error::StreamError, tag};
|
||||||
error::StreamError,
|
|
||||||
feature::Feature,
|
|
||||||
tag::{self, Tag},
|
|
||||||
};
|
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, StreamError>;
|
type Result<T> = std::result::Result<T, StreamError>;
|
||||||
|
|
||||||
|
@ -35,20 +31,33 @@ pub async fn error<W: AsyncWrite + Unpin>(writer: W, err: StreamError) -> Result
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_stream_header<W: AsyncWrite + Unpin>(writer: W, req: StreamAttrs) -> Result<()> {
|
pub async fn write_stream_header<W: AsyncWrite + Unpin>(
|
||||||
|
writer: W,
|
||||||
|
req: StreamHeader,
|
||||||
|
) -> Result<()> {
|
||||||
let mut writer = Writer::new(writer);
|
let mut writer = Writer::new(writer);
|
||||||
writer
|
writer
|
||||||
.write_event_async(Event::Decl(BytesDecl::new("1.0", Some("utf-8"), None)))
|
.write_event_async(Event::Decl(BytesDecl::new("1.0", Some("utf-8"), None)))
|
||||||
.await?;
|
.await?;
|
||||||
|
let id = req.id.unwrap_or_default();
|
||||||
|
let (from, to) = (req.from.unwrap_or_default(), req.to.unwrap_or_default());
|
||||||
|
|
||||||
|
let mut attrs = vec![
|
||||||
|
("id", id.as_str()),
|
||||||
|
("xmlns:stream", "http://etherx.jabber.org/streams"),
|
||||||
|
("xmlns", "jabber:client"),
|
||||||
|
("xml:lang", "en"),
|
||||||
|
("version", "1.0"),
|
||||||
|
];
|
||||||
|
if !to.is_empty() {
|
||||||
|
attrs.push(("to", to.as_str()));
|
||||||
|
}
|
||||||
|
if !from.is_empty() {
|
||||||
|
attrs.push(("from", from.as_str()));
|
||||||
|
}
|
||||||
writer
|
writer
|
||||||
.write_event_async(Event::Start(
|
.write_event_async(Event::Start(
|
||||||
BytesStart::new("stream:stream").with_attributes(vec![
|
BytesStart::new("stream:stream").with_attributes(attrs),
|
||||||
("from", req.from.as_str()),
|
|
||||||
("to", req.to.as_str()),
|
|
||||||
("xmlns:stream", "http://etherx.jabber.org/streams"),
|
|
||||||
("xml:lang", "en"),
|
|
||||||
("version", "1.0"),
|
|
||||||
]),
|
|
||||||
))
|
))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -56,13 +65,14 @@ pub async fn write_stream_header<W: AsyncWrite + Unpin>(writer: W, req: StreamAt
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct StreamAttrs {
|
pub struct StreamHeader {
|
||||||
pub from: String,
|
pub from: Option<String>,
|
||||||
pub to: String,
|
pub to: Option<String>,
|
||||||
|
pub id: Option<String>,
|
||||||
pub namespace: XMLNamespace,
|
pub namespace: XMLNamespace,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<Attributes<'_>> for StreamAttrs {
|
impl TryFrom<Attributes<'_>> for StreamHeader {
|
||||||
type Error = StreamError;
|
type Error = StreamError;
|
||||||
|
|
||||||
fn try_from(value: Attributes<'_>) -> std::result::Result<Self, Self::Error> {
|
fn try_from(value: Attributes<'_>) -> std::result::Result<Self, Self::Error> {
|
||||||
|
@ -86,15 +96,18 @@ impl TryFrom<Attributes<'_>> for StreamAttrs {
|
||||||
},
|
},
|
||||||
other => {
|
other => {
|
||||||
info!(
|
info!(
|
||||||
"ignoring key {}",
|
"ignoring key {} with value << {} >>",
|
||||||
String::from_utf8(other.to_vec()).unwrap_or_default()
|
String::from_utf8(other.to_vec()).unwrap_or_default(),
|
||||||
|
String::from_utf8(v.value.to_vec()).unwrap_or_default(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(StreamAttrs {
|
debug!("from: {from:?}; to: {to:?}");
|
||||||
from: from.ok_or(StreamError::InvalidFrom)?,
|
Ok(StreamHeader {
|
||||||
to: to.ok_or(StreamError::HostUnknown)?,
|
from: from,
|
||||||
|
to: Some(to.ok_or(StreamError::HostUnknown)?),
|
||||||
|
id: None,
|
||||||
namespace: ns.ok_or(StreamError::BadNamespacePrefix)?,
|
namespace: ns.ok_or(StreamError::BadNamespacePrefix)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -104,3 +117,11 @@ impl TryFrom<Attributes<'_>> for StreamAttrs {
|
||||||
pub enum XMLNamespace {
|
pub enum XMLNamespace {
|
||||||
JabberClient,
|
JabberClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn gen_id() -> String {
|
||||||
|
let buf: [u8; 16] = rand::random();
|
||||||
|
let mut output = String::new();
|
||||||
|
base64::engine::general_purpose::STANDARD_NO_PAD.encode_string(buf, &mut output);
|
||||||
|
|
||||||
|
output
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{net::SocketAddr, sync::Arc};
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
|
|
||||||
use log::{error, info};
|
use log::{debug, error, info};
|
||||||
use quick_xml::{
|
use quick_xml::{
|
||||||
events::{BytesEnd, BytesStart, Event},
|
events::{BytesEnd, BytesStart, Event},
|
||||||
Reader, Writer,
|
Reader, Writer,
|
||||||
|
@ -12,7 +12,7 @@ use tokio::{
|
||||||
use tokio_rustls::rustls;
|
use tokio_rustls::rustls;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{self, StreamAttrs, XMLNamespace},
|
common::{self, StreamHeader, XMLNamespace},
|
||||||
error::StreamError,
|
error::StreamError,
|
||||||
feature::Feature,
|
feature::Feature,
|
||||||
negotiator::{self, TcpConnOrTLS},
|
negotiator::{self, TcpConnOrTLS},
|
||||||
|
@ -33,6 +33,7 @@ type Result<T> = std::result::Result<T, StreamError>;
|
||||||
const FEATURES: &'static [Feature] = &[Feature::start_tls(true)];
|
const FEATURES: &'static [Feature] = &[Feature::start_tls(true)];
|
||||||
|
|
||||||
pub struct StreamStart {
|
pub struct StreamStart {
|
||||||
|
id: String,
|
||||||
reader: Reader<BufReader<ReadHalf<TcpStream>>>,
|
reader: Reader<BufReader<ReadHalf<TcpStream>>>,
|
||||||
writer: Writer<WriteHalf<TcpStream>>,
|
writer: Writer<WriteHalf<TcpStream>>,
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
|
@ -53,6 +54,7 @@ impl StreamStart {
|
||||||
writer,
|
writer,
|
||||||
hostname,
|
hostname,
|
||||||
tls_config,
|
tls_config,
|
||||||
|
id: common::gen_id(),
|
||||||
buffer: vec![],
|
buffer: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,7 +92,7 @@ impl StreamStart {
|
||||||
match event {
|
match event {
|
||||||
Event::Start(start) => {
|
Event::Start(start) => {
|
||||||
if start.name().as_ref() == tag::STREAM_ELEMENT_NAME {
|
if start.name().as_ref() == tag::STREAM_ELEMENT_NAME {
|
||||||
let attrs: StreamAttrs = match start.attributes().try_into() {
|
let attrs: StreamHeader = match start.attributes().try_into() {
|
||||||
Ok(a) => a,
|
Ok(a) => a,
|
||||||
Err(err) => return StartTLSResult::Failure(self, err.into()),
|
Err(err) => return StartTLSResult::Failure(self, err.into()),
|
||||||
};
|
};
|
||||||
|
@ -110,9 +112,10 @@ impl StreamStart {
|
||||||
info!("starting negotiation with: {attrs:?}");
|
info!("starting negotiation with: {attrs:?}");
|
||||||
if let Err(err) = common::write_stream_header(
|
if let Err(err) = common::write_stream_header(
|
||||||
self.writer.get_mut(),
|
self.writer.get_mut(),
|
||||||
StreamAttrs {
|
StreamHeader {
|
||||||
from: attrs.to.clone(),
|
from: Some(self.hostname.clone()),
|
||||||
to: attrs.from,
|
to: attrs.from,
|
||||||
|
id: Some(self.id.clone()),
|
||||||
namespace: XMLNamespace::JabberClient,
|
namespace: XMLNamespace::JabberClient,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -120,9 +123,10 @@ impl StreamStart {
|
||||||
{
|
{
|
||||||
return StartTLSResult::Failure(self, err);
|
return StartTLSResult::Failure(self, err);
|
||||||
};
|
};
|
||||||
if attrs.to != self.hostname {
|
if attrs.to.unwrap_or_default() != self.hostname {
|
||||||
return StartTLSResult::Failure(self, StreamError::HostUnknown);
|
return StartTLSResult::Failure(self, StreamError::HostUnknown);
|
||||||
}
|
}
|
||||||
|
debug!("sending features");
|
||||||
if let Err(err) = self.send_features().await {
|
if let Err(err) = self.send_features().await {
|
||||||
return StartTLSResult::Failure(self, err);
|
return StartTLSResult::Failure(self, err);
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,15 +1,28 @@
|
||||||
use quick_xml::{Reader, Writer};
|
use log::{error, info};
|
||||||
|
use quick_xml::{
|
||||||
|
events::{BytesEnd, BytesStart, Event},
|
||||||
|
Reader, Writer,
|
||||||
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{BufReader, ReadHalf, WriteHalf},
|
io::{AsyncWriteExt, BufReader, ReadHalf, WriteHalf},
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
};
|
};
|
||||||
use tokio_rustls::server::TlsStream;
|
use tokio_rustls::server::TlsStream;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
common::{self, StreamHeader},
|
||||||
|
error::StreamError,
|
||||||
|
feature::Feature,
|
||||||
|
tag::{self, Tag},
|
||||||
|
};
|
||||||
|
type Result<T> = std::result::Result<T, StreamError>;
|
||||||
pub struct TLSStream {
|
pub struct TLSStream {
|
||||||
|
id: String,
|
||||||
reader: Reader<BufReader<ReadHalf<TlsStream<TcpStream>>>>,
|
reader: Reader<BufReader<ReadHalf<TlsStream<TcpStream>>>>,
|
||||||
writer: Writer<WriteHalf<TlsStream<TcpStream>>>,
|
writer: Writer<WriteHalf<TlsStream<TcpStream>>>,
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
hostname: String,
|
hostname: String,
|
||||||
|
features: Vec<Feature>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TLSStream {
|
impl TLSStream {
|
||||||
|
@ -24,9 +37,89 @@ impl TLSStream {
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
hostname,
|
hostname,
|
||||||
|
id: common::gen_id(),
|
||||||
buffer: vec![],
|
buffer: vec![],
|
||||||
|
features: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_stream(self) {}
|
pub async fn start_stream(mut self) {
|
||||||
|
if let Err(err) = self.handle_stream().await {
|
||||||
|
if let Err(err2) = common::error(self.writer.get_mut(), err).await {
|
||||||
|
error!("error writing error: {err2}");
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
info!("wrote error {err}")
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = self.writer.get_mut().write_all(b"</stream:stream>").await {
|
||||||
|
error!("writing end to stream: {e}")
|
||||||
|
}
|
||||||
|
if let Err(e) = self.writer.get_mut().shutdown().await {
|
||||||
|
error!("shutting down stream: {e}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_stream(&mut self) -> Result<()> {
|
||||||
|
let header = self.get_stream_header().await?;
|
||||||
|
if !header
|
||||||
|
.to
|
||||||
|
.unwrap_or_default()
|
||||||
|
.eq_ignore_ascii_case(&self.hostname)
|
||||||
|
{
|
||||||
|
return Err(StreamError::HostUnknown);
|
||||||
|
}
|
||||||
|
common::write_stream_header(
|
||||||
|
self.writer.get_mut(),
|
||||||
|
StreamHeader {
|
||||||
|
from: Some(self.hostname.clone()),
|
||||||
|
to: header.from,
|
||||||
|
id: Some(self.id.clone()),
|
||||||
|
namespace: common::XMLNamespace::JabberClient,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
self.send_features().await?;
|
||||||
|
self.negotiate_features().await?;
|
||||||
|
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn negotiate_features(&mut self) -> Result<()> {
|
||||||
|
let available_features: Vec<&Feature> = (&self.features).into_iter().collect();
|
||||||
|
while available_features.len() > 0 {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_features(&mut self) -> Result<()> {
|
||||||
|
self.writer
|
||||||
|
.write_event_async(Event::Start(BytesStart::new(tag::FEATURE)))
|
||||||
|
.await?;
|
||||||
|
for feature in &self.features {
|
||||||
|
feature.write_tag(self.writer.get_mut()).await?;
|
||||||
|
}
|
||||||
|
self.writer
|
||||||
|
.write_event_async(Event::End(BytesEnd::new(tag::FEATURE)))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_stream_header(&mut self) -> Result<StreamHeader> {
|
||||||
|
loop {
|
||||||
|
match self.reader.read_event_into_async(&mut self.buffer).await? {
|
||||||
|
Event::Start(start) => {
|
||||||
|
if start.name().as_ref() == tag::STREAM_ELEMENT_NAME {
|
||||||
|
break Ok(start.attributes().try_into()?);
|
||||||
|
} else {
|
||||||
|
break Err(StreamError::BadFormat);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Event::Decl(_) => continue,
|
||||||
|
_ => break Err(StreamError::BadFormat),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue