fix connection channel getting lagged

This commit is contained in:
emilis 2025-10-04 09:26:37 +01:00
parent 5ad2831688
commit 48828cac8a
No known key found for this signature in database
13 changed files with 159 additions and 100 deletions

View File

@ -0,0 +1,60 @@
use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use werewolves_proto::player::PlayerId;
#[derive(Debug, Clone)]
pub struct ConnectUpdate {
updated: Arc<AtomicBool>,
connected: Arc<Mutex<Vec<PlayerId>>>,
}
impl ConnectUpdate {
pub fn new() -> Self {
Self {
updated: Arc::new(AtomicBool::new(false)),
connected: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn connect(&self, pid: PlayerId) {
let mut connected = self.connected.lock().await;
if connected.iter().any(|c| c == &pid) {
return;
}
connected.push(pid);
self.updated.store(true, Ordering::SeqCst);
}
pub async fn disconnect(&self, pid: PlayerId) {
let mut connected = self.connected.lock().await;
if let Some(idx) = connected
.iter()
.enumerate()
.find_map(|(idx, c)| (c == &pid).then_some(idx))
{
connected.swap_remove(idx);
self.updated.store(true, Ordering::SeqCst);
}
}
}
impl Future for ConnectUpdate {
type Output = Arc<[PlayerId]>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if self.updated.load(Ordering::SeqCst)
&& let Ok(connected) = self.connected.try_lock()
{
self.updated.store(false, Ordering::SeqCst);
std::task::Poll::Ready(connected.clone().into())
} else {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
}
}

View File

@ -1,25 +1,28 @@
use tokio::sync::broadcast::Receiver;
use werewolves_proto::{error::GameError, player::PlayerId};
use crate::{communication::Comms, runner::Message};
use crate::{
communication::{Comms, connect::ConnectUpdate},
runner::Message,
};
use super::{HostComms, player::PlayerIdComms};
pub struct LobbyComms {
comms: Comms,
// TODO: move this to not use a receiver
connect_recv: Receiver<(PlayerId, bool)>,
connect_recv: ConnectUpdate,
}
impl LobbyComms {
pub fn new(comms: Comms, connect_recv: Receiver<(PlayerId, bool)>) -> Self {
pub fn new(comms: Comms, connect_recv: ConnectUpdate) -> Self {
Self {
comms,
connect_recv,
}
}
pub fn into_inner(self) -> (Comms, Receiver<(PlayerId, bool)>) {
pub fn into_inner(self) -> (Comms, ConnectUpdate) {
(self.comms, self.connect_recv)
}
@ -41,12 +44,8 @@ impl LobbyComms {
Err(err) => Err(err),
}
}
r = self.connect_recv.recv() => {
match r {
Ok((player_id, true)) => Ok(Message::Connect(player_id)),
Ok((player_id, false)) => Ok(Message::Disconnect(player_id)),
Err(err) => Err(GameError::GenericError(format!("connect recv: {err}"))),
}
r = self.connect_recv.clone() => {
Ok(Message::ConnectedList(r))
}
}
}

View File

@ -5,6 +5,7 @@ use crate::{
runner::Message,
};
pub mod connect;
pub mod host;
pub mod lobby;
pub mod player;

View File

@ -13,7 +13,7 @@ use werewolves_proto::{
player::PlayerId,
};
use crate::LogError;
use crate::{LogError, communication::connect::ConnectUpdate};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectionId(PlayerId, Instant);
@ -66,13 +66,13 @@ impl JoinedPlayer {
#[derive(Debug, Clone)]
pub struct JoinedPlayers {
players: Arc<Mutex<HashMap<PlayerId, JoinedPlayer>>>,
connect_state_sender: Sender<(PlayerId, bool)>,
connect_state: ConnectUpdate,
}
impl JoinedPlayers {
pub fn new(connect_state_sender: Sender<(PlayerId, bool)>) -> Self {
pub fn new(connect_state: ConnectUpdate) -> Self {
Self {
connect_state_sender,
connect_state,
players: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -126,9 +126,7 @@ impl JoinedPlayers {
pub async fn disconnect(&self, connection: &ConnectionId) -> Option<JoinedPlayer> {
let mut map = self.players.lock().await;
self.connect_state_sender
.send((connection.0.clone(), false))
.log_warn();
self.connect_state.disconnect(connection.0.clone()).await;
if map
.get(connection.player_id())
@ -162,9 +160,8 @@ impl JoinedPlayers {
old.receiver
} else {
self.connect_state_sender
.send((player_id.clone(), true))
.log_warn();
self.connect_state.connect(player_id.clone()).await;
unsafe { map.get(&player_id).unwrap_unchecked() }
.receiver
.resubscribe()

View File

@ -1,8 +1,9 @@
use core::ops::Not;
use std::sync::Arc;
use crate::{
LogError,
communication::{Comms, lobby::LobbyComms},
communication::{Comms, connect::ConnectUpdate, lobby::LobbyComms},
connection::JoinedPlayers,
lobby::{Lobby, LobbyPlayers},
runner::{IdentifiedClientMessage, Message},
@ -23,7 +24,7 @@ type Result<T> = core::result::Result<T, GameError>;
pub struct GameRunner {
game: Game,
comms: Comms,
connect_recv: Receiver<(PlayerId, bool)>,
connect_recv: ConnectUpdate,
player_sender: LobbyPlayers,
roles_revealed: bool,
joined_players: JoinedPlayers,
@ -34,7 +35,7 @@ impl GameRunner {
game: Game,
comms: Comms,
player_sender: LobbyPlayers,
connect_recv: Receiver<(PlayerId, bool)>,
connect_recv: ConnectUpdate,
joined_players: JoinedPlayers,
) -> Self {
Self {
@ -117,6 +118,7 @@ impl GameRunner {
};
let mut last_err_log = tokio::time::Instant::now() - tokio::time::Duration::from_secs(60);
let mut connect_list: Arc<[PlayerId]> = Arc::new([]);
while acks.iter().any(|(_, ackd)| !*ackd) {
let msg = match self.comms.message().await {
Ok(msg) => msg,
@ -204,11 +206,14 @@ impl GameRunner {
public: _,
},
message: _,
})
| Message::Connect(player_id) => {
(notify_of_role)(&player_id, self.game.village(), &self.player_sender)
}) => (notify_of_role)(&player_id, self.game.village(), &self.player_sender),
Message::ConnectedList(c) => {
let newly_connected = c.iter().filter(|c| connect_list.contains(*c));
for connected in newly_connected {
(notify_of_role)(connected, self.game.village(), &self.player_sender)
}
connect_list = c;
}
Message::Disconnect(_) => {}
}
}
@ -330,7 +335,7 @@ impl GameEnd {
.send_if_present(&identity.player_id, ServerMessage::GameOver(result))
.log_debug();
}
Message::Connect(_) | Message::Disconnect(_) => {}
Message::ConnectedList(_) => {}
}
None
}

View File

@ -142,7 +142,7 @@ impl Lobby {
.players_in_lobby
.send_if_present(&player_id, ServerMessage::Reset);
}
Err((Message::Connect(_), _)) | Err((Message::Disconnect(_), _)) => {}
Err((Message::ConnectedList(_), _)) => {}
}
None
}
@ -299,7 +299,7 @@ impl Lobby {
self.send_lobby_info_to_clients().await;
self.send_lobby_info_to_host().await.log_debug();
}
Message::Connect(_) | Message::Disconnect(_) => self.send_lobby_info_to_host().await?,
Message::ConnectedList(_) => self.send_lobby_info_to_host().await?,
Message::Host(HostMessage::Echo(msg)) => {
self.comms()?.host().send(msg).log_warn();
}

View File

@ -22,7 +22,7 @@ use std::{env, io::Write, path::Path};
use tokio::sync::{broadcast, mpsc};
use crate::{
communication::{Comms, host::HostComms, player::PlayerIdComms},
communication::{Comms, connect::ConnectUpdate, host::HostComms, player::PlayerIdComms},
saver::FileSaver,
};
@ -92,18 +92,14 @@ async fn main() {
let (send, recv) = broadcast::channel(100);
let (server_send, host_recv) = broadcast::channel(100);
let (host_send, server_recv) = mpsc::channel(100);
let (connect_send, connect_recv) = broadcast::channel(100);
let joined_players = JoinedPlayers::new(connect_send);
let conn_update = ConnectUpdate::new();
let joined_players = JoinedPlayers::new(conn_update.clone());
let lobby_comms = LobbyComms::new(
Comms::new(
HostComms::new(server_send, server_recv),
PlayerIdComms::new(
//joined_players.clone(),
recv,
// connect_recv.resubscribe()
PlayerIdComms::new(recv),
),
),
connect_recv,
conn_update,
);
let jp_clone = joined_players.clone();

View File

@ -1,4 +1,5 @@
use core::time::Duration;
use std::sync::Arc;
use werewolves_proto::{
message::{ClientMessage, Identification, host::HostMessage},
@ -79,8 +80,7 @@ pub async fn run_game(joined_players: JoinedPlayers, comms: LobbyComms, mut save
pub enum Message {
Host(HostMessage),
Client(IdentifiedClientMessage),
Connect(PlayerId),
Disconnect(PlayerId),
ConnectedList(Arc<[PlayerId]>),
}
pub enum RunningState {

View File

@ -105,12 +105,24 @@ nav.debug-nav {
}
}
.player-list {
padding-bottom: 80px;
display: flex;
flex-direction: row;
flex-wrap: wrap;
gap: 10px;
justify-items: center;
}
.player {
flex-grow: 0;
display: flex;
justify-content: stretch;
margin: 0px;
min-width: 10rem;
max-width: 10vw;
max-height: 4rem;
height: 4rem;
text-align: center;
justify-content: center;
@ -120,12 +132,34 @@ nav.debug-nav {
filter: hue-rotate(90deg);
}
&.connected {}
block-size: max-content;
&.disconnected {
// background-color: $disconnected_color;
// border: 3px solid darken($disconnected_color, 20%);
&>button {
width: 100%;
height: 100%;
border: 1px solid $disconnected_color;
background-color: color.change($disconnected_color, $alpha: 0.15);
color: $disconnected_color;
&:hover {
filter: brightness(150%);
background-color: color.change($disconnected_color, $alpha: 0.15);
color: $disconnected_color;
}
}
&.connected {
&>button {
background-color: color.change($connected_color, $alpha: 0.15);
border: 1px solid $connected_color;
color: $connected_color;
&:hover {
filter: brightness(150%);
}
}
}
&.dead {
filter: grayscale(100%);
@ -228,7 +262,7 @@ button {
content: attr(reason);
position: absolute;
margin-top: 10px;
// top: 90%;
top: 90%;
// left: 0;
font: 'Cute Font';
// color: #000;
@ -464,18 +498,6 @@ bool_role {
}
player {
background-color: rgba(255, 107, 255, 0.7);
width: fit-content;
padding-left: 10px;
padding-right: 10px;
padding-top: 5px;
padding-bottom: 5px;
margin: 10px;
color: rgba(255, 255, 255, 0.9);
}
client {
list-style: none;
@ -738,16 +760,6 @@ error {
}
}
.player-list {
padding-bottom: 80px;
display: flex;
flex-direction: row;
flex-wrap: wrap;
gap: 10px;
// align-items: center;
justify-content: space-evenly;
}
.binary {
.button-container {
background-color: $village_color;
@ -911,3 +923,11 @@ input {
}
}
}
.big-screen {
align-content: center;
align-items: center;
justify-content: center;
height: 100vh;
position: fixed;
}

View File

@ -580,10 +580,9 @@ impl Component for Host {
HostEvent::SetBigScreenState(state) => {
self.big_screen = state;
if self.big_screen
&& let Some(root) = gloo::utils::document().document_element()
&& let Err(err) = root.set_attribute("style", "font-size: 3rem;")
&& let Ok(Some(root)) = gloo::utils::document().query_selector(".content")
{
log::error!("setting zoom: {err:?}");
root.set_class_name("content big-screen")
}
if state {

View File

@ -1,4 +1,3 @@
use core::num::NonZeroU8;
use std::rc::Rc;
use werewolves_proto::{message::PlayerState, player::PlayerId};
@ -21,7 +20,7 @@ pub fn Lobby(LobbyProps { players, on_action }: &LobbyProps) -> Html {
<div class="player-list">
{
players
.into_iter()
.iter()
.map(|p| html! {<LobbyPlayer on_action={on_action} player={p.clone()} />})
.collect::<Html>()
}

View File

@ -21,22 +21,24 @@ pub enum LobbyPlayerAction {
#[function_component]
pub fn LobbyPlayer(LobbyPlayerProps { player, on_action }: &LobbyPlayerProps) -> Html {
let class = if player.connected {
"connected"
} else {
"disconnected"
};
let open = use_state(|| false);
let class = player.connected.then_some("connected");
let pid = player.identification.player_id.clone();
let action_open = open.clone();
let action = |action: LobbyPlayerAction| {
let pid = pid.clone();
on_action
.as_ref()
.cloned()
.map(|on_action| Callback::from(move |_| on_action.emit((pid.clone(), action))))
.map(|on_action| {
Callback::from(move |_| {
on_action.emit((pid.clone(), action));
action_open.set(false);
})
})
.unwrap_or_default()
};
let number = use_state(String::new);
let open = use_state(|| false);
let number_open = use_state(|| false);
let submenu_open = open.clone();
let submenu = on_action.clone().map(|on_action| {

View File

@ -1,19 +0,0 @@
use yew::prelude::*;
// #[derive(Debug, PartialEq, Properties)]
// pub struct NotificationProps {
// pub text: String,
// pub callback: Callback<()>,
// }
// #[function_component]
// pub fn Notification(props: &NotificationProps) -> Html {
// let cb = props.callback.clone();
// let on_click = Callback::from(move |_| cb.clone().emit(()));
// html! {
// <stack>
// <h2>{props.text.clone()}</h2>
// <button class="confirm" onclick={on_click}>{"Ok"}</button>
// </stack>
// }
// }