Merge pull request #2078 from safing/fix/UI-websocket-shutdown-lifecycle

Fix/UI websocket shutdown lifecycle
This commit is contained in:
Alexandr Stelnykovych
2025-11-21 12:54:47 +02:00
committed by GitHub
6 changed files with 148 additions and 36 deletions

View File

@@ -3817,6 +3817,7 @@ name = "portmaster"
version = "2.0.25"
dependencies = [
"assert_matches",
"bytes",
"cached",
"chrono",
"clap_lex",

View File

@@ -42,6 +42,7 @@ tokio = { version = "1.44.2", features = ["macros"] }
cached = "0.46.1"
notify-rust = "4.10.0"
assert_matches = "1.5.0"
bytes = "1.5"
tokio-websockets = { version = "0.5.0", features = ["client", "ring", "rand"] }
sha = "1.0.3"
http = "1.0.0"

View File

@@ -87,10 +87,21 @@ impl portmaster::Handler for WsHandler {
error!("failed to close splash window: {}", err.to_string());
}
let handle = self.handle.clone();
tauri::async_runtime::spawn(async move {
traymenu::tray_handler(cli, handle).await;
});
// Cancel the previous tray handler task if it exists
let portmaster = self.handle.portmaster();
if let Ok(mut task_guard) = portmaster.tray_handler_task.lock() {
if let Some(old_task) = task_guard.take() {
debug!("Aborting previous tray handler task");
old_task.abort();
}
// Start new tray handler and store the task handle
let handle = self.handle.clone();
let task = tauri::async_runtime::spawn(async move {
traymenu::tray_handler(cli, handle).await;
});
*task_guard = Some(task);
}
}
fn on_disconnect(&mut self) {
@@ -252,40 +263,47 @@ fn main() {
.expect("error while running tauri application");
app.run(|handle, e| {
if let RunEvent::WindowEvent { label, event, .. } = e {
if label != "main" {
// We only have one window at most so any other label is unexpected
return;
}
match e {
RunEvent::WindowEvent { label, event, .. } => {
if label != "main" {
// We only have one window at most so any other label is unexpected
return;
}
// Do not let the user close the window, instead send an event to the main
// window so we can show the "will not stop portmaster" dialog and let the window
// close itself using
//
// window.__TAURI__.window.getCurrent().close()
//
// Note: the above javascript does NOT trigger the CloseRequested event so
// there's no need to handle that case here.
if let WindowEvent::CloseRequested { api, .. } = event {
debug!(
"window (label={}) close request received, forwarding to user-interface.",
label
);
// Do not let the user close the window, instead send an event to the main
// window so we can show the "will not stop portmaster" dialog and let the window
// close itself using
//
// window.__TAURI__.window.getCurrent().close()
//
// Note: the above javascript does NOT trigger the CloseRequested event so
// there's no need to handle that case here.
if let WindowEvent::CloseRequested { api, .. } = event {
debug!(
"window (label={}) close request received, forwarding to user-interface.",
label
);
// Manually save the window state on close attempt.
// This ensures the state is saved since we prevent the close event.
let _ = handle.save_window_state(WINDOW_STATE_FLAGS_TO_SAVE);
// Manually save the window state on close attempt.
// This ensures the state is saved since we prevent the close event.
let _ = handle.save_window_state(WINDOW_STATE_FLAGS_TO_SAVE);
api.prevent_close();
if let Some(window) = handle.get_webview_window(label.as_str()) {
let result = window.emit("exit-requested", "");
if let Err(err) = result {
error!("failed to emit event: {}", err.to_string());
api.prevent_close();
if let Some(window) = handle.get_webview_window(label.as_str()) {
let result = window.emit("exit-requested", "");
if let Err(err) = result {
error!("failed to emit event: {}", err.to_string());
}
} else {
error!("window was None");
}
} else {
error!("window was None");
}
}
RunEvent::ExitRequested { .. } => {
debug!("Application exit requested, shutting down websocket");
portmaster::websocket::shutdown_websocket();
}
_ => {}
}
});
}

View File

@@ -5,7 +5,9 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::time::{interval, Duration, Instant};
use tokio_websockets::{ClientBuilder, Error};
use bytes::Bytes;
use super::message::*;
use super::types::*;
@@ -51,9 +53,36 @@ pub async fn connect(uri: &str) -> Result<PortAPI, Error> {
tauri::async_runtime::spawn(async move {
let subscribers: SubscriberMap = RwLock::new(HashMap::new());
let next_id = AtomicUsize::new(0);
// Ping/pong keep-alive mechanism
let mut ping_interval = interval(Duration::from_secs(10)); // Send ping every 10 seconds
let mut timeout_check = interval(Duration::from_secs(1)); // Check for timeout every 1 second
let mut last_ping = Instant::now();
let mut last_pong = Instant::now();
const PONG_TIMEOUT: Duration = Duration::from_secs(5); // Declare connection dead if no pong within 5 seconds after ping
loop {
tokio::select! {
_ = ping_interval.tick() => {
// Send ping frame
if let Err(err) = client.send(tokio_websockets::Message::ping(Bytes::new())).await {
error!("failed to send ping: {}", err);
dispatch.close();
return;
}
last_ping = Instant::now();
// debug!("sent websocket ping");
},
_ = timeout_check.tick() => {
// Check if pong timeout expired after last ping
if last_ping > last_pong && last_ping.elapsed() > PONG_TIMEOUT {
warn!("no pong received for {:?} after ping, connection appears dead", PONG_TIMEOUT);
dispatch.close();
return;
}
},
msg = client.next() => {
let msg = match msg {
Some(msg) => msg,
@@ -73,6 +102,18 @@ pub async fn connect(uri: &str) -> Result<PortAPI, Error> {
return;
},
Ok(msg) => {
// Handle pong frames
if msg.is_pong() {
last_pong = Instant::now();
// debug!("received websocket pong");
continue;
}
if msg.is_ping() {
// debug!("received websocket ping");
continue;
}
let text = unsafe {
std::str::from_utf8_unchecked(msg.as_payload())
};

View File

@@ -18,7 +18,7 @@ pub mod commands;
// The websocket module spawns an async function on tauri's runtime that manages
// a persistent connection to the Portmaster websocket API and updates the tauri Portmaster
// Plugin instance.
mod websocket;
pub mod websocket;
// The notification module manages system notifications from portmaster.
mod notifications;
@@ -72,6 +72,9 @@ pub struct PortmasterInterface<R: Runtime> {
// whether or not the angular application should call window.show after it
// finished bootstrapping.
should_show_after_bootstrap: AtomicBool,
// handle to the tray handler task so we can abort it when reconnecting
pub tray_handler_task: Mutex<Option<tauri::async_runtime::JoinHandle<()>>>,
}
impl<R: Runtime> PortmasterInterface<R> {
@@ -300,6 +303,14 @@ impl<R: Runtime> PortmasterInterface<R> {
fn on_disconnect(&self) {
self.is_reachable.store(false, Ordering::Relaxed);
// Abort the tray handler task if it's running
if let Ok(mut task_guard) = self.tray_handler_task.lock() {
if let Some(task) = task_guard.take() {
debug!("Aborting tray handler task");
task.abort();
}
}
// clear the current api client reference.
{
let mut guard = self.api.lock().unwrap();
@@ -337,6 +348,7 @@ pub fn setup(app: AppHandle) {
handle_notifications: AtomicBool::new(false),
handle_prompts: AtomicBool::new(false),
should_show_after_bootstrap: AtomicBool::new(true),
tray_handler_task: Mutex::new(None),
};
app.manage(interface);

View File

@@ -1,9 +1,17 @@
use super::PortmasterExt;
use crate::portapi::client::connect;
use log::{debug, error, info, warn};
use std::sync::atomic::{AtomicBool, Ordering};
use tauri::{AppHandle, Runtime};
use tokio::time::{sleep, Duration};
static WEBSOCKET_SHUTDOWN: AtomicBool = AtomicBool::new(false);
/// Signals the websocket thread to stop reconnecting and shut down gracefully.
pub fn shutdown_websocket() {
WEBSOCKET_SHUTDOWN.store(true, Ordering::Release);
}
/// Starts a backround thread (via tauri::async_runtime) that connects to the Portmaster
/// Websocket database API.
pub fn start_websocket_thread<R: Runtime>(app: AppHandle<R>) {
@@ -11,6 +19,12 @@ pub fn start_websocket_thread<R: Runtime>(app: AppHandle<R>) {
tauri::async_runtime::spawn(async move {
loop {
// Check if we should shutdown before attempting to connect
if WEBSOCKET_SHUTDOWN.load(Ordering::Acquire) {
debug!("WebSocket thread shutting down gracefully");
break;
}
debug!("Trying to connect to websocket endpoint");
let api = connect("ws://127.0.0.1:817/api/database/v1").await;
@@ -23,12 +37,29 @@ pub fn start_websocket_thread<R: Runtime>(app: AppHandle<R>) {
portmaster.on_connect(cli.clone());
while !cli.is_closed() {
let _ = sleep(Duration::from_secs(1)).await;
// Monitor connection status
loop {
if WEBSOCKET_SHUTDOWN.load(Ordering::Acquire) {
debug!("Shutdown signal received, closing connection");
break;
}
if cli.is_closed() {
warn!("Connection to portmaster lost");
break;
}
sleep(Duration::from_secs(1)).await;
}
portmaster.on_disconnect();
// If shutdown was requested, exit the loop
if WEBSOCKET_SHUTDOWN.load(Ordering::Acquire) {
debug!("Exiting websocket thread after disconnect");
break;
}
warn!("lost connection to portmaster, retrying ....")
}
Err(err) => {
@@ -36,10 +67,18 @@ pub fn start_websocket_thread<R: Runtime>(app: AppHandle<R>) {
app.portmaster().on_disconnect();
// sleep and retry
// Check shutdown flag before sleeping
if WEBSOCKET_SHUTDOWN.load(Ordering::Acquire) {
debug!("Shutdown requested, not retrying connection");
break;
}
// Sleep and retry with constant 2 second delay
sleep(Duration::from_secs(2)).await;
}
}
}
info!("WebSocket thread terminated");
});
}