Add WebSocket ping/pong keep-alive mechanism
Implements RFC 6455 compliant ping/pong health checking to detect dead connections: - Send ping frames every 10 seconds - Monitor pong responses with 5-second timeout after each ping
This commit is contained in:
1
desktop/tauri/src-tauri/Cargo.lock
generated
1
desktop/tauri/src-tauri/Cargo.lock
generated
@@ -3817,6 +3817,7 @@ name = "portmaster"
|
|||||||
version = "2.0.25"
|
version = "2.0.25"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"assert_matches",
|
"assert_matches",
|
||||||
|
"bytes",
|
||||||
"cached",
|
"cached",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap_lex",
|
"clap_lex",
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ tokio = { version = "1.44.2", features = ["macros"] }
|
|||||||
cached = "0.46.1"
|
cached = "0.46.1"
|
||||||
notify-rust = "4.10.0"
|
notify-rust = "4.10.0"
|
||||||
assert_matches = "1.5.0"
|
assert_matches = "1.5.0"
|
||||||
|
bytes = "1.5"
|
||||||
tokio-websockets = { version = "0.5.0", features = ["client", "ring", "rand"] }
|
tokio-websockets = { version = "0.5.0", features = ["client", "ring", "rand"] }
|
||||||
sha = "1.0.3"
|
sha = "1.0.3"
|
||||||
http = "1.0.0"
|
http = "1.0.0"
|
||||||
|
|||||||
@@ -5,7 +5,9 @@ use std::collections::HashMap;
|
|||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
use tokio::time::{interval, Duration, Instant};
|
||||||
use tokio_websockets::{ClientBuilder, Error};
|
use tokio_websockets::{ClientBuilder, Error};
|
||||||
|
use bytes::Bytes;
|
||||||
|
|
||||||
use super::message::*;
|
use super::message::*;
|
||||||
use super::types::*;
|
use super::types::*;
|
||||||
@@ -51,9 +53,36 @@ pub async fn connect(uri: &str) -> Result<PortAPI, Error> {
|
|||||||
tauri::async_runtime::spawn(async move {
|
tauri::async_runtime::spawn(async move {
|
||||||
let subscribers: SubscriberMap = RwLock::new(HashMap::new());
|
let subscribers: SubscriberMap = RwLock::new(HashMap::new());
|
||||||
let next_id = AtomicUsize::new(0);
|
let next_id = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
// Ping/pong keep-alive mechanism
|
||||||
|
let mut ping_interval = interval(Duration::from_secs(10)); // Send ping every 10 seconds
|
||||||
|
let mut timeout_check = interval(Duration::from_secs(1)); // Check for timeout every 1 second
|
||||||
|
let mut last_ping = Instant::now();
|
||||||
|
let mut last_pong = Instant::now();
|
||||||
|
const PONG_TIMEOUT: Duration = Duration::from_secs(5); // Declare connection dead if no pong within 5 seconds after ping
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
_ = ping_interval.tick() => {
|
||||||
|
// Send ping frame
|
||||||
|
if let Err(err) = client.send(tokio_websockets::Message::ping(Bytes::new())).await {
|
||||||
|
error!("failed to send ping: {}", err);
|
||||||
|
dispatch.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
last_ping = Instant::now();
|
||||||
|
// debug!("sent websocket ping");
|
||||||
|
},
|
||||||
|
|
||||||
|
_ = timeout_check.tick() => {
|
||||||
|
// Check if pong timeout expired after last ping
|
||||||
|
if last_ping > last_pong && last_ping.elapsed() > PONG_TIMEOUT {
|
||||||
|
warn!("no pong received for {:?} after ping, connection appears dead", PONG_TIMEOUT);
|
||||||
|
dispatch.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
msg = client.next() => {
|
msg = client.next() => {
|
||||||
let msg = match msg {
|
let msg = match msg {
|
||||||
Some(msg) => msg,
|
Some(msg) => msg,
|
||||||
@@ -73,6 +102,18 @@ pub async fn connect(uri: &str) -> Result<PortAPI, Error> {
|
|||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
|
// Handle pong frames
|
||||||
|
if msg.is_pong() {
|
||||||
|
last_pong = Instant::now();
|
||||||
|
// debug!("received websocket pong");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg.is_ping() {
|
||||||
|
// debug!("received websocket ping");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let text = unsafe {
|
let text = unsafe {
|
||||||
std::str::from_utf8_unchecked(msg.as_payload())
|
std::str::from_utf8_unchecked(msg.as_payload())
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user