Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 95 additions & 18 deletions socketio/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,30 +218,65 @@ impl Client {
// `Result::Ok`, the server receives a close frame so it's safe to
// terminate
for packet in self_clone.iter() {
let should_reconnect = match packet {
Err(Error::IncompleteResponseFromEngineIo(_)) => {
//TODO: 0.3.X handle errors
//TODO: logging error
true
}
Ok(Packet {
packet_type: PacketId::Disconnect,
..
}) => match self_clone.builder.lock() {
Ok(builder) => builder.reconnect_on_disconnect,
Err(_) => false,
},
_ => false,
let (reconnect, reconnect_on_disconnect) = match self_clone.builder.lock() {
Ok(builder) => (builder.reconnect, builder.reconnect_on_disconnect),
Err(_) => (false, false),
};
if should_reconnect {
let _ = self_clone.disconnect();
let _ = self_clone.reconnect();

match poll_action(&packet, reconnect, reconnect_on_disconnect) {
PollAction::Continue => {}
PollAction::Stop => {
if matches!(packet, Err(Error::StoppedEngineIoSocket)) {
if let Ok(client) = self_clone.client.read() {
let _ = client.notify_close();
}
}
break;
}
PollAction::Reconnect => {
let _ = self_clone.disconnect();
let _ = self_clone.reconnect();
}
}
}
});
}
}

#[derive(Debug, PartialEq, Eq)]
enum PollAction {
Continue,
Reconnect,
Stop,
}

fn poll_action(
packet: &Result<Packet>,
reconnect: bool,
reconnect_on_disconnect: bool,
) -> PollAction {
match packet {
Err(Error::IncompleteResponseFromEngineIo(_) | Error::StoppedEngineIoSocket) => {
if reconnect {
PollAction::Reconnect
} else {
PollAction::Stop
}
}
Ok(Packet {
packet_type: PacketId::Disconnect,
..
}) => {
if reconnect_on_disconnect {
PollAction::Reconnect
} else {
PollAction::Stop
}
}
_ => PollAction::Continue,
}
}

pub(crate) struct Iter {
socket: Arc<RwLock<RawClient>>,
}
Expand All @@ -266,7 +301,6 @@ impl Iterator for Iter {
}
}
}

#[cfg(test)]
mod test {
use std::{
Expand All @@ -282,6 +316,49 @@ mod test {
use std::time::{Duration, SystemTime};
use url::Url;

#[test]
fn stopped_engineio_socket_stops_polling_when_reconnect_is_disabled() {
let packet = Err(Error::StoppedEngineIoSocket);

assert_eq!(poll_action(&packet, false, false), PollAction::Stop);
}

#[test]
fn stopped_engineio_socket_reconnects_when_reconnect_is_enabled() {
let packet = Err(Error::StoppedEngineIoSocket);

assert_eq!(poll_action(&packet, true, false), PollAction::Reconnect);
}

#[test]
fn socketio_disconnect_obeys_reconnect_on_disconnect() {
let packet = Ok(Packet::new(
PacketId::Disconnect,
"/".to_owned(),
None,
None,
0,
None,
));

assert_eq!(poll_action(&packet, true, false), PollAction::Stop);
assert_eq!(poll_action(&packet, false, true), PollAction::Reconnect);
}

#[test]
fn ordinary_packets_keep_polling() {
let packet = Ok(Packet::new(
PacketId::Event,
"/".to_owned(),
Some("[\"message\"]".to_owned()),
None,
0,
None,
));

assert_eq!(poll_action(&packet, false, false), PollAction::Continue);
}

#[test]
#[serial(reconnect)]
fn socket_io_reconnect_integration() -> Result<()> {
Expand Down
4 changes: 4 additions & 0 deletions socketio/src/client/raw_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ impl RawClient {
Ok(())
}

pub(crate) fn notify_close(&self) -> Result<()> {
self.callback(&Event::Close, "")
}

/// Handles the incoming acks and classifies what callbacks to call and how.
#[inline]
fn handle_ack(&self, socket_packet: &Packet) -> Result<()> {
Expand Down