From c2b217d5535da1b453c3bc759b60a27198b53e95 Mon Sep 17 00:00:00 2001 From: Jimmy Stridh Date: Mon, 22 Jun 2026 12:14:23 +0200 Subject: [PATCH] fix: stop sync poller on stopped engineio sockets --- socketio/src/client/client.rs | 113 +++++++++++++++++++++++++----- socketio/src/client/raw_client.rs | 4 ++ 2 files changed, 99 insertions(+), 18 deletions(-) diff --git a/socketio/src/client/client.rs b/socketio/src/client/client.rs index fe924307..b4dbdd55 100644 --- a/socketio/src/client/client.rs +++ b/socketio/src/client/client.rs @@ -218,30 +218,65 @@ impl Client { // `Result::Ok`, the server receives a close frame so it's safe to // terminate for packet in self_clone.iter() { - let should_reconnect = match packet { - Err(Error::IncompleteResponseFromEngineIo(_)) => { - //TODO: 0.3.X handle errors - //TODO: logging error - true - } - Ok(Packet { - packet_type: PacketId::Disconnect, - .. - }) => match self_clone.builder.lock() { - Ok(builder) => builder.reconnect_on_disconnect, - Err(_) => false, - }, - _ => false, + let (reconnect, reconnect_on_disconnect) = match self_clone.builder.lock() { + Ok(builder) => (builder.reconnect, builder.reconnect_on_disconnect), + Err(_) => (false, false), }; - if should_reconnect { - let _ = self_clone.disconnect(); - let _ = self_clone.reconnect(); + + match poll_action(&packet, reconnect, reconnect_on_disconnect) { + PollAction::Continue => {} + PollAction::Stop => { + if matches!(packet, Err(Error::StoppedEngineIoSocket)) { + if let Ok(client) = self_clone.client.read() { + let _ = client.notify_close(); + } + } + break; + } + PollAction::Reconnect => { + let _ = self_clone.disconnect(); + let _ = self_clone.reconnect(); + } } } }); } } +#[derive(Debug, PartialEq, Eq)] +enum PollAction { + Continue, + Reconnect, + Stop, +} + +fn poll_action( + packet: &Result, + reconnect: bool, + reconnect_on_disconnect: bool, +) -> PollAction { + match packet { + Err(Error::IncompleteResponseFromEngineIo(_) | Error::StoppedEngineIoSocket) => { + if reconnect { + PollAction::Reconnect + } else { + PollAction::Stop + } + } + Ok(Packet { + packet_type: PacketId::Disconnect, + .. + }) => { + if reconnect_on_disconnect { + PollAction::Reconnect + } else { + PollAction::Stop + } + } + _ => PollAction::Continue, + } +} + pub(crate) struct Iter { socket: Arc>, } @@ -266,7 +301,6 @@ impl Iterator for Iter { } } } - #[cfg(test)] mod test { use std::{ @@ -282,6 +316,49 @@ mod test { use std::time::{Duration, SystemTime}; use url::Url; + #[test] + fn stopped_engineio_socket_stops_polling_when_reconnect_is_disabled() { + let packet = Err(Error::StoppedEngineIoSocket); + + assert_eq!(poll_action(&packet, false, false), PollAction::Stop); + } + + #[test] + fn stopped_engineio_socket_reconnects_when_reconnect_is_enabled() { + let packet = Err(Error::StoppedEngineIoSocket); + + assert_eq!(poll_action(&packet, true, false), PollAction::Reconnect); + } + + #[test] + fn socketio_disconnect_obeys_reconnect_on_disconnect() { + let packet = Ok(Packet::new( + PacketId::Disconnect, + "/".to_owned(), + None, + None, + 0, + None, + )); + + assert_eq!(poll_action(&packet, true, false), PollAction::Stop); + assert_eq!(poll_action(&packet, false, true), PollAction::Reconnect); + } + + #[test] + fn ordinary_packets_keep_polling() { + let packet = Ok(Packet::new( + PacketId::Event, + "/".to_owned(), + Some("[\"message\"]".to_owned()), + None, + 0, + None, + )); + + assert_eq!(poll_action(&packet, false, false), PollAction::Continue); + } + #[test] #[serial(reconnect)] fn socket_io_reconnect_integration() -> Result<()> { diff --git a/socketio/src/client/raw_client.rs b/socketio/src/client/raw_client.rs index 9c8ecef2..f8482fae 100644 --- a/socketio/src/client/raw_client.rs +++ b/socketio/src/client/raw_client.rs @@ -270,6 +270,10 @@ impl RawClient { Ok(()) } + pub(crate) fn notify_close(&self) -> Result<()> { + self.callback(&Event::Close, "") + } + /// Handles the incoming acks and classifies what callbacks to call and how. #[inline] fn handle_ack(&self, socket_packet: &Packet) -> Result<()> {