implement connection

This commit is contained in:
cel 🌸 2023-10-21 01:28:54 +01:00
parent ba94ee66fa
commit e893869df9
11 changed files with 403 additions and 605 deletions

View File

@ -9,6 +9,7 @@ edition = "2021"
[dependencies] [dependencies]
async-recursion = "1.0.4" async-recursion = "1.0.4"
async-trait = "0.1.68" async-trait = "0.1.68"
lazy_static = "1.4.0"
nanoid = "0.4.0" nanoid = "0.4.0"
quick-xml = { git = "https://github.com/tafia/quick-xml.git", features = ["async-tokio", "serialize"] } quick-xml = { git = "https://github.com/tafia/quick-xml.git", features = ["async-tokio", "serialize"] }
# TODO: remove unneeded features # TODO: remove unneeded features

30
TODO.md
View File

@ -1,11 +1,23 @@
# TODO # TODO
[ ] recognise starttls required ## next
[ ] logging
[ ] documentation ci/cd: doc generation
[ ] error handling feature: error handling on stream according to rfc6120
[ ] remove unwraps docs: jid
[ ] proper error types docs: jabber
[ ] stream error type docs: starttls
[ ] change stanzas from owned to borrowed types with lifetimes docs: sasl
[ ] Into<Element> trait with event() and content() functions docs: resource binding
feature: logging
feature: starttls
feature: sasl
feature: resource binding
## in progress
## done
feature: jabber client connection
feature: jid

View File

@ -1,205 +0,0 @@
use std::{collections::BTreeMap, str};
use quick_xml::{
events::{BytesDecl, Event},
NsReader, Writer,
};
use rsasl::prelude::{Mechname, SASLClient};
use tokio::io::{BufReader, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;
use crate::Jabber;
use crate::JabberError;
use crate::Result;
pub struct JabberClient<'j> {
pub reader: NsReader<BufReader<ReadHalf<TlsStream<TcpStream>>>>,
pub writer: Writer<WriteHalf<TlsStream<TcpStream>>>,
jabber: &'j mut Jabber<'j>,
}
impl<'j> JabberClient<'j> {
pub fn new(
reader: NsReader<BufReader<ReadHalf<TlsStream<TcpStream>>>>,
writer: Writer<WriteHalf<TlsStream<TcpStream>>>,
jabber: &'j mut Jabber<'j>,
) -> Self {
Self {
reader,
writer,
jabber,
}
}
pub async fn start_stream(&mut self) -> Result<()> {
// client to server
let declaration = BytesDecl::new("1.0", None, None);
let server = &self.jabber.server.to_owned().try_into()?;
let stream_element =
Stream::new_client(&self.jabber.jid, server, None, Some("en".to_string()));
self.writer
.write_event_async(Event::Decl(declaration))
.await;
let stream_element: Element<'_> = stream_element.into();
stream_element
.write_start(&mut self.writer, &BTreeMap::new())
.await?;
// server to client
let mut buf = Vec::new();
self.reader.read_event_into_async(&mut buf).await?;
let _stream_response = Element::read_start(&mut self.reader).await?;
Ok(())
}
pub async fn get_features(&mut self) -> Result<Vec<StreamFeature>> {
Element::read(&mut self.reader).await?.try_into()
}
pub async fn negotiate(&mut self) -> Result<()> {
loop {
println!("negotiate loop");
let features = self.get_features().await?;
println!("features: {:?}", features);
match &features[0] {
StreamFeature::Sasl(sasl) => {
println!("sasl?");
self.sasl(&sasl).await?;
}
StreamFeature::Bind => {
self.bind().await?;
return Ok(());
}
x => println!("{:?}", x),
}
}
}
pub async fn watch(&mut self) -> Result<()> {
loop {
let element = Element::read(&mut self.reader).await?;
println!("{:#?}", element);
}
}
pub async fn sasl(&mut self, mechanisms: &Vec<String>) -> Result<()> {
println!("{:?}", mechanisms);
let sasl = SASLClient::new(self.jabber.auth.clone());
let mut offered_mechs: Vec<&Mechname> = Vec::new();
for mechanism in mechanisms {
offered_mechs.push(Mechname::parse(mechanism.as_bytes())?)
}
println!("{:?}", offered_mechs);
let mut session = sasl.start_suggested(&offered_mechs)?;
let selected_mechanism = session.get_mechname().as_str().to_owned();
println!("selected mech: {:?}", selected_mechanism);
let mut data: Option<Vec<u8>> = None;
if !session.are_we_first() {
// if not first mention the mechanism then get challenge data
// mention mechanism
let auth = Auth {
mechanism: selected_mechanism.as_str(),
sasl_data: "=",
};
Into::<Element>::into(auth).write(&mut self.writer).await?;
// get challenge data
let challenge = &Element::read(&mut self.reader).await?;
let challenge: Challenge = challenge.try_into()?;
println!("challenge: {:?}", challenge);
data = Some(challenge.sasl_data.to_owned());
println!("we didn't go first");
} else {
// if first, mention mechanism and send data
let mut sasl_data = Vec::new();
session.step64(None, &mut sasl_data).unwrap();
let auth = Auth {
mechanism: selected_mechanism.as_str(),
sasl_data: str::from_utf8(&sasl_data)?,
};
println!("{:?}", auth);
Into::<Element>::into(auth).write(&mut self.writer).await?;
let server_response = Element::read(&mut self.reader).await?;
println!("server_response: {:#?}", server_response);
match TryInto::<Challenge>::try_into(&server_response) {
Ok(challenge) => data = Some(challenge.sasl_data.to_owned()),
Err(_) => {
let success = TryInto::<Success>::try_into(&server_response)?;
if let Some(sasl_data) = success.sasl_data {
data = Some(sasl_data.to_owned())
}
}
}
println!("we went first");
}
// stepping the authentication exchange to completion
if data != None {
println!("data: {:?}", data);
let mut sasl_data = Vec::new();
while {
// decide if need to send more data over
let state = session
.step64(data.as_deref(), &mut sasl_data)
.expect("step errored!");
state.is_running()
} {
// While we aren't finished, receive more data from the other party
let response = Response {
sasl_data: str::from_utf8(&sasl_data)?,
};
println!("response: {:?}", response);
Into::<Element>::into(response)
.write(&mut self.writer)
.await?;
let server_response = Element::read(&mut self.reader).await?;
println!("server_response: {:?}", server_response);
match TryInto::<Challenge>::try_into(&server_response) {
Ok(challenge) => data = Some(challenge.sasl_data.to_owned()),
Err(_) => {
let success = TryInto::<Success>::try_into(&server_response)?;
if let Some(sasl_data) = success.sasl_data {
data = Some(sasl_data.to_owned())
}
}
}
}
}
self.start_stream().await?;
Ok(())
}
pub async fn bind(&mut self) -> Result<()> {
match &self.jabber.jid.resourcepart {
Some(resource) => {
println!("setting resource");
let bind = Bind {
resource: Some(resource.clone()),
jid: None,
};
let result: Bind = IQ::set(self, None, None, bind).await?.try_into()?;
if let Some(jid) = result.jid {
println!("{}", jid);
self.jabber.jid = jid;
return Ok(());
}
}
None => {
println!("not setting resource");
let bind = Bind {
resource: None,
jid: None,
};
let result: Bind = IQ::set(self, None, None, bind).await?.try_into()?;
if let Some(jid) = result.jid {
println!("{}", jid);
self.jabber.jid = jid;
return Ok(());
}
}
}
Err(JabberError::BindError)
}
}

View File

@ -1,38 +0,0 @@
// pub mod encrypted;
pub mod unencrypted;
// use async_trait::async_trait;
// use crate::stanza::stream::StreamFeature;
use crate::JabberError;
use crate::Result;
pub enum JabberClientType<'j> {
// Encrypted(encrypted::JabberClient<'j>),
Unencrypted(unencrypted::JabberClient<'j>),
}
impl<'j> JabberClientType<'j> {
/// ensures an encrypted jabber client
pub async fn ensure_tls(self) -> Result<encrypted::JabberClient<'j>> {
match self {
Self::Encrypted(c) => Ok(c),
Self::Unencrypted(mut c) => {
c.start_stream().await?;
let features = c.get_features().await?;
if features.contains(&StreamFeature::StartTls) {
Ok(c.starttls().await?)
} else {
Err(JabberError::StartTlsUnavailable)
}
}
}
}
}
// TODO: jabber client trait over both client types using macro
// #[async_trait]
// pub trait JabberTrait {
// async fn start_stream(&mut self) -> Result<()>;
// async fn get_features(&self) -> Result<Vec<StreamFeatures>>;
// }

View File

@ -1,180 +0,0 @@
use std::str;
use quick_xml::{
events::{BytesStart, Event},
name::QName,
se, NsReader, Writer,
};
use tokio::io::{BufReader, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio_native_tls::native_tls::TlsConnector;
use try_map::FallibleMapExt;
use crate::error::JabberError;
use crate::stanza::stream::Stream;
use crate::stanza::DECLARATION;
use crate::Jabber;
use crate::Result;
pub struct JabberClient<'j> {
reader: NsReader<BufReader<ReadHalf<TcpStream>>>,
writer: Writer<WriteHalf<TcpStream>>,
jabber: &'j mut Jabber<'j>,
}
impl<'j> JabberClient<'j> {
pub fn new(
reader: NsReader<BufReader<ReadHalf<TcpStream>>>,
writer: Writer<WriteHalf<TcpStream>>,
jabber: &'j mut Jabber<'j>,
) -> Self {
Self {
reader,
writer,
jabber,
}
}
pub async fn start_stream(&mut self) -> Result<()> {
// client to server
// declaration
self.writer.write_event_async(DECLARATION).await?;
// opening stream element
let server = &self.jabber.server.to_owned().try_into()?;
let stream_element = Stream::new_client(None, server, None, "en");
se::to_writer_with_root(&mut self.writer, "stream:stream", &stream_element);
// server to client
// may or may not send a declaration
let buf = Vec::new();
let mut first_event = self.reader.read_resolved_event_into_async(&mut buf).await?;
match first_event {
(quick_xml::name::ResolveResult::Unbound, Event::Decl(e)) => {
if let Ok(version) = e.version() {
if version.as_ref() == b"1.0" {
first_event = self.reader.read_resolved_event_into_async(&mut buf).await?
} else {
// todo: error
todo!()
}
} else {
first_event = self.reader.read_resolved_event_into_async(&mut buf).await?
}
}
_ => (),
}
// receive stream element and validate
let stream_response: Stream;
match first_event {
(quick_xml::name::ResolveResult::Bound(ns), Event::Start(e)) => {
if ns.0 == crate::stanza::stream::XMLNS.as_bytes() {
// stream_response = Stream::new(
// e.try_get_attribute("from")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("to")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("id")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("version")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("lang")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// );
return Ok(());
} else {
return Err(JabberError::BadStream);
}
}
// TODO: errors for incorrect namespace
(quick_xml::name::ResolveResult::Unbound, Event::Decl(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Start(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::End(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Empty(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Text(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::CData(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Comment(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Decl(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::PI(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::DocType(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Eof) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Start(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::End(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Empty(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Text(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::CData(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Comment(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::PI(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::DocType(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Eof) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::End(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Empty(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Text(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::CData(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Comment(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Decl(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::PI(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::DocType(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Eof) => todo!(),
}
}
// pub async fn get_features(&mut self) -> Result<Vec<StreamFeature>> {
// Element::read(&mut self.reader).await?.try_into()
// }
// pub async fn starttls(mut self) -> Result<super::encrypted::JabberClient<'j>> {
// let mut starttls_element = BytesStart::new("starttls");
// starttls_element.push_attribute(("xmlns", "urn:ietf:params:xml:ns:xmpp-tls"));
// self.writer
// .write_event_async(Event::Empty(starttls_element))
// .await
// .unwrap();
// let mut buf = Vec::new();
// match self.reader.read_event_into_async(&mut buf).await.unwrap() {
// Event::Empty(e) => match e.name() {
// QName(b"proceed") => {
// let connector = TlsConnector::new().unwrap();
// let stream = self
// .reader
// .into_inner()
// .into_inner()
// .unsplit(self.writer.into_inner());
// if let Ok(tlsstream) = tokio_native_tls::TlsConnector::from(connector)
// .connect(&self.jabber.server, stream)
// .await
// {
// let (read, write) = tokio::io::split(tlsstream);
// let reader = Reader::from_reader(BufReader::new(read));
// let writer = Writer::new(write);
// let mut client =
// super::encrypted::JabberClient::new(reader, writer, self.jabber);
// client.start_stream().await?;
// return Ok(client);
// }
// }
// QName(_) => return Err(JabberError::TlsNegotiation),
// },
// _ => return Err(JabberError::TlsNegotiation),
// }
// Err(JabberError::TlsNegotiation)
// }
}

162
src/connection.rs Normal file
View File

@ -0,0 +1,162 @@
use std::net::{IpAddr, SocketAddr};
use std::str;
use std::str::FromStr;
use tokio::net::TcpStream;
use tokio_native_tls::native_tls::TlsConnector;
// TODO: use rustls
use tokio_native_tls::TlsStream;
use crate::Jabber;
use crate::JabberError;
use crate::Result;
pub type Tls = TlsStream<TcpStream>;
pub type Unencrypted = TcpStream;
pub enum Connection {
Encrypted(Jabber<Tls>),
Unencrypted(Jabber<Unencrypted>),
}
impl Connection {
pub async fn ensure_tls(self) -> Result<Jabber<Tls>> {
match self {
Connection::Encrypted(j) => Ok(j),
Connection::Unencrypted(j) => Ok(j.starttls().await?),
}
}
// pub async fn connect_user<J: TryInto<JID>>(jid: J, password: String) -> Result<Self> {
// let server = jid.domainpart.clone();
// let auth = SASLConfig::with_credentials(None, jid.localpart.clone().unwrap(), password)?;
// println!("auth: {:?}", auth);
// Self::connect(&server, jid.try_into()?, Some(auth)).await
// }
async fn connect(server: &str) -> Result<Self> {
let sockets = Self::get_sockets(&server).await;
for (socket_addr, tls) in sockets {
match tls {
true => {
if let Ok(connection) = Self::connect_tls(socket_addr, &server).await {
let (readhalf, writehalf) = tokio::io::split(connection);
return Ok(Self::Encrypted(Jabber::new(
readhalf,
writehalf,
None,
None,
server.to_owned(),
)));
}
}
false => {
if let Ok(connection) = Self::connect_unencrypted(socket_addr).await {
let (readhalf, writehalf) = tokio::io::split(connection);
return Ok(Self::Unencrypted(Jabber::new(
readhalf,
writehalf,
None,
None,
server.to_owned(),
)));
}
}
}
}
Err(JabberError::Connection)
}
async fn get_sockets(domain: &str) -> Vec<(SocketAddr, bool)> {
let mut socket_addrs = Vec::new();
// if it's a socket/ip then just return that
// socket
if let Ok(socket_addr) = SocketAddr::from_str(domain) {
match socket_addr.port() {
5223 => socket_addrs.push((socket_addr, true)),
_ => socket_addrs.push((socket_addr, false)),
}
return socket_addrs;
}
// ip
if let Ok(ip) = IpAddr::from_str(domain) {
socket_addrs.push((SocketAddr::new(ip, 5222), false));
socket_addrs.push((SocketAddr::new(ip, 5223), true));
return socket_addrs;
}
// otherwise resolve
if let Ok(resolver) = trust_dns_resolver::AsyncResolver::tokio_from_system_conf() {
if let Ok(lookup) = resolver
.srv_lookup(format!("_xmpp-client._tcp.{}", domain))
.await
{
for srv in lookup {
resolver
.lookup_ip(srv.target().to_owned())
.await
.map(|ips| {
for ip in ips {
socket_addrs.push((SocketAddr::new(ip, srv.port()), false))
}
});
}
}
if let Ok(lookup) = resolver
.srv_lookup(format!("_xmpps-client._tcp.{}", domain))
.await
{
for srv in lookup {
resolver
.lookup_ip(srv.target().to_owned())
.await
.map(|ips| {
for ip in ips {
socket_addrs.push((SocketAddr::new(ip, srv.port()), true))
}
});
}
}
// in case cannot connect through SRV records
resolver.lookup_ip(domain).await.map(|ips| {
for ip in ips {
socket_addrs.push((SocketAddr::new(ip, 5222), false));
socket_addrs.push((SocketAddr::new(ip, 5223), true));
}
});
}
socket_addrs
}
/// establishes a connection to the server
pub async fn connect_tls(socket_addr: SocketAddr, domain_name: &str) -> Result<Tls> {
let socket = TcpStream::connect(socket_addr)
.await
.map_err(|_| JabberError::Connection)?;
let connector = TlsConnector::new().map_err(|_| JabberError::Connection)?;
tokio_native_tls::TlsConnector::from(connector)
.connect(domain_name, socket)
.await
.map_err(|_| JabberError::Connection)
}
pub async fn connect_unencrypted(socket_addr: SocketAddr) -> Result<Unencrypted> {
TcpStream::connect(socket_addr)
.await
.map_err(|_| JabberError::Connection)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn connect() {
Connection::connect("blos.sm").await.unwrap();
}
}

View File

@ -3,10 +3,7 @@ use std::str::Utf8Error;
use quick_xml::events::attributes::AttrError; use quick_xml::events::attributes::AttrError;
use rsasl::mechname::MechanismNameError; use rsasl::mechname::MechanismNameError;
use crate::{ use crate::jid::ParseError;
jid::ParseError,
stanza::{self, ElementError, ElementParseError},
};
#[derive(Debug)] #[derive(Debug)]
pub enum JabberError { pub enum JabberError {
@ -22,14 +19,12 @@ pub enum JabberError {
NoType, NoType,
IDMismatch, IDMismatch,
BindError, BindError,
ElementParse(ElementParseError),
ParseError, ParseError,
UnexpectedEnd, UnexpectedEnd,
UnexpectedElement, UnexpectedElement,
UnexpectedText, UnexpectedText,
XML(quick_xml::Error), XML(quick_xml::Error),
SASL(SASLError), SASL(SASLError),
Element(ElementError<'static>),
JID(ParseError), JID(ParseError),
} }
@ -71,12 +66,6 @@ impl From<quick_xml::Error> for JabberError {
} }
} }
impl From<stanza::ElementError<'static>> for JabberError {
fn from(e: stanza::ElementError<'static>) -> Self {
Self::Element(e)
}
}
impl From<AttrError> for JabberError { impl From<AttrError> for JabberError {
fn from(e: AttrError) -> Self { fn from(e: AttrError) -> Self {
Self::XML(e.into()) Self::XML(e.into())
@ -88,9 +77,3 @@ impl From<ParseError> for JabberError {
Self::JID(e) Self::JID(e)
} }
} }
impl From<ElementParseError> for JabberError {
fn from(e: ElementParseError) -> Self {
Self::ElementParse(e)
}
}

View File

@ -1,141 +1,205 @@
use std::marker::PhantomData;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use quick_xml::{NsReader, Writer}; use quick_xml::{events::Event, se::Serializer, NsReader, Writer};
use rsasl::prelude::SASLConfig; use rsasl::prelude::SASLConfig;
use tokio::io::BufReader; use serde::Serialize;
use tokio::net::TcpStream; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio_native_tls::native_tls::TlsConnector;
use crate::client::JabberClientType; use crate::connection::{Tls, Unencrypted};
use crate::jid::JID; use crate::error::JabberError;
use crate::{client, JabberClient}; use crate::stanza::stream::Stream;
use crate::{JabberError, Result}; use crate::stanza::DECLARATION;
use crate::Result;
use crate::JID;
pub struct Jabber<'j> { pub struct Jabber<S>
pub jid: JID, where
pub auth: Arc<SASLConfig>, S: AsyncRead + AsyncWrite + Unpin,
pub server: String, {
_marker: PhantomData<&'j ()>, reader: NsReader<BufReader<ReadHalf<S>>>,
writer: Writer<WriteHalf<S>>,
jid: Option<JID>,
auth: Option<Arc<SASLConfig>>,
server: String,
} }
impl<'j> Jabber<'j> { impl<S> Jabber<S>
pub fn user(jid: JID, password: String) -> Result<Self> { where
let server = jid.domainpart.clone(); S: AsyncRead + AsyncWrite + Unpin,
let auth = SASLConfig::with_credentials(None, jid.localpart.clone().unwrap(), password)?; {
println!("auth: {:?}", auth); pub fn new(
Ok(Self { reader: ReadHalf<S>,
writer: WriteHalf<S>,
jid: Option<JID>,
auth: Option<Arc<SASLConfig>>,
server: String,
) -> Self {
let reader = NsReader::from_reader(BufReader::new(reader));
let writer = Writer::new(writer);
Self {
reader,
writer,
jid, jid,
auth, auth,
server, server,
_marker: PhantomData, }
}) }
} }
pub async fn login(&'j mut self) -> Result<JabberClient<'j>> { impl<S> Jabber<S>
let mut client = self.connect().await?.ensure_tls().await?; where
client.start_stream().await?; S: AsyncRead + AsyncWrite + Unpin,
client.negotiate().await?; Writer<tokio::io::WriteHalf<S>>: AsyncWriteExt,
Ok(client) Writer<tokio::io::WriteHalf<S>>: AsyncWrite,
}
async fn get_sockets(&self) -> Vec<(SocketAddr, bool)> {
let mut socket_addrs = Vec::new();
// if it's a socket/ip then just return that
// socket
if let Ok(socket_addr) = SocketAddr::from_str(&self.jid.domainpart) {
match socket_addr.port() {
5223 => socket_addrs.push((socket_addr, true)),
_ => socket_addrs.push((socket_addr, false)),
}
return socket_addrs;
}
// ip
if let Ok(ip) = IpAddr::from_str(&self.jid.domainpart) {
socket_addrs.push((SocketAddr::new(ip, 5222), false));
socket_addrs.push((SocketAddr::new(ip, 5223), true));
return socket_addrs;
}
// otherwise resolve
if let Ok(resolver) = trust_dns_resolver::AsyncResolver::tokio_from_system_conf() {
if let Ok(lookup) = resolver
.srv_lookup(format!("_xmpp-client._tcp.{}", self.jid.domainpart))
.await
{ {
for srv in lookup { pub async fn start_stream(&mut self) -> Result<()> {
resolver // client to server
.lookup_ip(srv.target().to_owned())
.await // declaration
.map(|ips| { self.writer.write_event_async(DECLARATION.clone()).await?;
for ip in ips {
socket_addrs.push((SocketAddr::new(ip, srv.port()), false)) // opening stream element
let server = &self.server.to_owned().try_into()?;
let stream_element = Stream::new_client(None, server, None, "en");
// TODO: nicer function to serialize to xml writer
let mut buffer = String::new();
let ser = Serializer::new(&mut buffer);
stream_element.serialize(ser).unwrap();
self.writer.write_all(buffer.as_bytes());
// server to client
// may or may not send a declaration
let mut buf = Vec::new();
let mut first_event = self.reader.read_resolved_event_into_async(&mut buf).await?;
match first_event {
(quick_xml::name::ResolveResult::Unbound, Event::Decl(e)) => {
if let Ok(version) = e.version() {
if version.as_ref() == b"1.0" {
first_event = self.reader.read_resolved_event_into_async(&mut buf).await?
} else {
// todo: error
todo!()
} }
}); } else {
first_event = self.reader.read_resolved_event_into_async(&mut buf).await?
} }
} }
if let Ok(lookup) = resolver _ => (),
.srv_lookup(format!("_xmpps-client._tcp.{}", self.jid.domainpart)) }
.await
{ // receive stream element and validate
for srv in lookup { let stream_response: Stream;
resolver match first_event {
.lookup_ip(srv.target().to_owned()) (quick_xml::name::ResolveResult::Bound(ns), Event::Start(e)) => {
.await if ns.0 == crate::stanza::stream::XMLNS.as_bytes() {
.map(|ips| { // stream_response = Stream::new(
for ip in ips { // e.try_get_attribute("from")?.try_map(|attribute| {
socket_addrs.push((SocketAddr::new(ip, srv.port()), true)) // str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("to")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("id")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("version")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// e.try_get_attribute("lang")?.try_map(|attribute| {
// str::from_utf8(attribute.value.as_ref())?
// .try_into()?
// .as_ref()
// })?,
// );
return Ok(());
} else {
return Err(JabberError::BadStream);
}
}
// TODO: errors for incorrect namespace
(quick_xml::name::ResolveResult::Unbound, Event::Decl(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Start(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::End(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Empty(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Text(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::CData(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Comment(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Decl(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::PI(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::DocType(_)) => todo!(),
(quick_xml::name::ResolveResult::Unknown(_), Event::Eof) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Start(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::End(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Empty(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Text(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::CData(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Comment(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::PI(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::DocType(_)) => todo!(),
(quick_xml::name::ResolveResult::Unbound, Event::Eof) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::End(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Empty(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Text(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::CData(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Comment(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Decl(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::PI(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::DocType(_)) => todo!(),
(quick_xml::name::ResolveResult::Bound(_), Event::Eof) => todo!(),
} }
});
} }
} }
// in case cannot connect through SRV records // pub async fn get_features(&mut self) -> Result<Vec<StreamFeature>> {
resolver.lookup_ip(&self.jid.domainpart).await.map(|ips| { // Element::read(&mut self.reader).await?.try_into()
for ip in ips { // }
socket_addrs.push((SocketAddr::new(ip, 5222), false));
socket_addrs.push((SocketAddr::new(ip, 5223), true));
}
});
}
socket_addrs
}
/// establishes a connection to the server impl Jabber<Unencrypted> {
pub async fn connect(&'j mut self) -> Result<JabberClientType> { pub async fn starttls(mut self) -> Result<Jabber<Tls>> {
for (socket_addr, is_tls) in self.get_sockets().await { todo!()
println!("trying {}", socket_addr);
match is_tls {
true => {
let socket = TcpStream::connect(socket_addr).await.unwrap();
let connector = TlsConnector::new().unwrap();
if let Ok(stream) = tokio_native_tls::TlsConnector::from(connector)
.connect(&self.server, socket)
.await
{
let (read, write) = tokio::io::split(stream);
let reader = NsReader::from_reader(BufReader::new(read));
let writer = Writer::new(write);
let client = client::encrypted::JabberClient::new(reader, writer, self);
return Ok(JabberClientType::Encrypted(client));
}
}
false => {
if let Ok(stream) = TcpStream::connect(socket_addr).await {
let (read, write) = tokio::io::split(stream);
let reader = NsReader::from_reader(BufReader::new(read));
let writer = Writer::new(write);
let client = client::unencrypted::JabberClient::new(reader, writer, self);
return Ok(JabberClientType::Unencrypted(client));
}
}
}
}
Err(JabberError::Connection)
} }
// let mut starttls_element = BytesStart::new("starttls");
// starttls_element.push_attribute(("xmlns", "urn:ietf:params:xml:ns:xmpp-tls"));
// self.writer
// .write_event_async(Event::Empty(starttls_element))
// .await
// .unwrap();
// let mut buf = Vec::new();
// match self.reader.read_event_into_async(&mut buf).await.unwrap() {
// Event::Empty(e) => match e.name() {
// QName(b"proceed") => {
// let connector = TlsConnector::new().unwrap();
// let stream = self
// .reader
// .into_inner()
// .into_inner()
// .unsplit(self.writer.into_inner());
// if let Ok(tlsstream) = tokio_native_tls::TlsConnector::from(connector)
// .connect(&self.jabber.server, stream)
// .await
// {
// let (read, write) = tokio::io::split(tlsstream);
// let reader = Reader::from_reader(BufReader::new(read));
// let writer = Writer::new(write);
// let mut client =
// super::encrypted::JabberClient::new(reader, writer, self.jabber);
// client.start_stream().await?;
// return Ok(client);
// }
// }
// QName(_) => return Err(JabberError::TlsNegotiation),
// },
// _ => return Err(JabberError::TlsNegotiation),
// }
// Err(JabberError::TlsNegotiation)
// }
} }

View File

@ -2,38 +2,30 @@
#![feature(let_chains)] #![feature(let_chains)]
// TODO: logging (dropped errors) // TODO: logging (dropped errors)
pub mod client; pub mod connection;
pub mod error; pub mod error;
pub mod jabber; pub mod jabber;
pub mod jid; pub mod jid;
pub mod stanza; pub mod stanza;
// pub use client::encrypted::JabberClient; #[macro_use]
extern crate lazy_static;
pub use connection::Connection;
pub use error::JabberError; pub use error::JabberError;
pub use jabber::Jabber; pub use jabber::Jabber;
pub use jid::JID; pub use jid::JID;
pub type Result<T> = std::result::Result<T, JabberError>; pub type Result<T> = std::result::Result<T, JabberError>;
pub async fn login<J: TryInto<JID>, P: AsRef<str>>(jid: J, password: P) -> Result<Connection> {
todo!()
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::str::FromStr;
use crate::Jabber;
use crate::JID;
#[tokio::test] #[tokio::test]
async fn login() { async fn test_login() {
Jabber::user( crate::login("test@blos.sm/clown", "slayed").await.unwrap();
JID::from_str("test@blos.sm/clown").unwrap(),
"slayed".to_owned(),
)
.unwrap()
.login()
.await
.unwrap()
.watch()
.await
.unwrap();
} }
} }

View File

@ -8,4 +8,6 @@ pub mod stream;
use quick_xml::events::{BytesDecl, Event}; use quick_xml::events::{BytesDecl, Event};
pub static DECLARATION: Event = Event::Decl(BytesDecl::new("1.0", None, None)); lazy_static! {
pub static ref DECLARATION: Event<'static> = Event::Decl(BytesDecl::new("1.0", None, None));
}

View File

@ -25,13 +25,13 @@ pub struct Stream<'s> {
xmlns_stream: &'s str, xmlns_stream: &'s str,
} }
impl Stream { impl<'s> Stream<'s> {
pub fn new( pub fn new(
from: Option<&JID>, from: Option<&'s JID>,
to: Option<&JID>, to: Option<&'s JID>,
id: Option<&str>, id: Option<&'s str>,
version: Option<&str>, version: Option<&'s str>,
lang: Option<&str>, lang: Option<&'s str>,
) -> Self { ) -> Self {
Self { Self {
from, from,
@ -46,7 +46,12 @@ impl Stream {
/// For initial stream headers, the initiating entity SHOULD include the 'xml:lang' attribute. /// For initial stream headers, the initiating entity SHOULD include the 'xml:lang' attribute.
/// For privacy, it is better to not set `from` when sending a client stanza over an unencrypted connection. /// For privacy, it is better to not set `from` when sending a client stanza over an unencrypted connection.
pub fn new_client(from: Option<&JID>, to: &JID, id: Option<&str>, lang: &str) -> Self { pub fn new_client(
from: Option<&'s JID>,
to: &'s JID,
id: Option<&'s str>,
lang: &'s str,
) -> Self {
Self { Self {
from, from,
to: Some(to), to: Some(to),