From 76214bd9860983fada04f9d053b41871eaa30a36 Mon Sep 17 00:00:00 2001 From: Alexandr Stelnykovych Date: Fri, 21 Nov 2025 12:28:57 +0200 Subject: [PATCH] Add WebSocket ping/pong keep-alive mechanism Implements RFC 6455 compliant ping/pong health checking to detect dead connections: - Send ping frames every 10 seconds - Monitor pong responses with 5-second timeout after each ping --- desktop/tauri/src-tauri/Cargo.lock | 1 + desktop/tauri/src-tauri/Cargo.toml | 1 + desktop/tauri/src-tauri/src/portapi/client.rs | 41 +++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/desktop/tauri/src-tauri/Cargo.lock b/desktop/tauri/src-tauri/Cargo.lock index 7ad157a2..7f6330f4 100644 --- a/desktop/tauri/src-tauri/Cargo.lock +++ b/desktop/tauri/src-tauri/Cargo.lock @@ -3817,6 +3817,7 @@ name = "portmaster" version = "2.0.25" dependencies = [ "assert_matches", + "bytes", "cached", "chrono", "clap_lex", diff --git a/desktop/tauri/src-tauri/Cargo.toml b/desktop/tauri/src-tauri/Cargo.toml index fa03a7d7..c728d3aa 100644 --- a/desktop/tauri/src-tauri/Cargo.toml +++ b/desktop/tauri/src-tauri/Cargo.toml @@ -42,6 +42,7 @@ tokio = { version = "1.44.2", features = ["macros"] } cached = "0.46.1" notify-rust = "4.10.0" assert_matches = "1.5.0" +bytes = "1.5" tokio-websockets = { version = "0.5.0", features = ["client", "ring", "rand"] } sha = "1.0.3" http = "1.0.0" diff --git a/desktop/tauri/src-tauri/src/portapi/client.rs b/desktop/tauri/src-tauri/src/portapi/client.rs index b3b00d09..92688be0 100644 --- a/desktop/tauri/src-tauri/src/portapi/client.rs +++ b/desktop/tauri/src-tauri/src/portapi/client.rs @@ -5,7 +5,9 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; +use tokio::time::{interval, Duration, Instant}; use tokio_websockets::{ClientBuilder, Error}; +use bytes::Bytes; use super::message::*; use super::types::*; @@ -51,9 +53,36 @@ pub async fn connect(uri: &str) -> Result { tauri::async_runtime::spawn(async move { let subscribers: SubscriberMap = RwLock::new(HashMap::new()); let next_id = AtomicUsize::new(0); + + // Ping/pong keep-alive mechanism + let mut ping_interval = interval(Duration::from_secs(10)); // Send ping every 10 seconds + let mut timeout_check = interval(Duration::from_secs(1)); // Check for timeout every 1 second + let mut last_ping = Instant::now(); + let mut last_pong = Instant::now(); + const PONG_TIMEOUT: Duration = Duration::from_secs(5); // Declare connection dead if no pong within 5 seconds after ping loop { tokio::select! { + _ = ping_interval.tick() => { + // Send ping frame + if let Err(err) = client.send(tokio_websockets::Message::ping(Bytes::new())).await { + error!("failed to send ping: {}", err); + dispatch.close(); + return; + } + last_ping = Instant::now(); + // debug!("sent websocket ping"); + }, + + _ = timeout_check.tick() => { + // Check if pong timeout expired after last ping + if last_ping > last_pong && last_ping.elapsed() > PONG_TIMEOUT { + warn!("no pong received for {:?} after ping, connection appears dead", PONG_TIMEOUT); + dispatch.close(); + return; + } + }, + msg = client.next() => { let msg = match msg { Some(msg) => msg, @@ -73,6 +102,18 @@ pub async fn connect(uri: &str) -> Result { return; }, Ok(msg) => { + // Handle pong frames + if msg.is_pong() { + last_pong = Instant::now(); + // debug!("received websocket pong"); + continue; + } + + if msg.is_ping() { + // debug!("received websocket ping"); + continue; + } + let text = unsafe { std::str::from_utf8_unchecked(msg.as_payload()) };