From bd12a784c2203c6babd251bae4967ac922fe2b4a Mon Sep 17 00:00:00 2001 From: Alexandr Stelnykovych Date: Fri, 6 Jun 2025 18:45:13 +0300 Subject: [PATCH] (UI; tauri-websocket): Enhance error handling and connection management in WebSocket implementation --- .../tauri/tauri-websocket-subject.ts | 372 +++++++++--------- .../portmaster-api/src/lib/portapi.service.ts | 2 +- 2 files changed, 189 insertions(+), 185 deletions(-) diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts index f6b3e8b2..a3b40fb4 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts @@ -1,10 +1,11 @@ import WebSocket, { ConnectionConfig, Message } from '@tauri-apps/plugin-websocket'; -import { Subject, Observable } from 'rxjs'; +import { Subject, Observable, merge, mergeMap, throwError } from 'rxjs'; import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; import { NgZone } from '@angular/core'; -const LOG_PREFIX = '[tauri_ws]'; - +const LOG_PREFIX = '[tauri_ws]'; +const PING_INTERVAL_MS = 10000; // Send a ping every PING_INTERVAL_MS milliseconds +const PONG_TIMEOUT_MS = 5000; // Wait PONG_TIMEOUT_MS milliseconds for a pong response /** * Creates a WebSocket connection using the Tauri WebSocket API and wraps it in an RxJS WebSocketSubject-compatible interface. * @@ -33,224 +34,227 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo let wsConnection: WebSocket | null = null; const messageSubject = new Subject(); - const observable$ = messageSubject.asObservable(); - - // A queue for messages that need to be sent before the connection is established - const pendingMessages: T[] = []; + const errorSubject = new Subject(); // Added for error propagation - // Function to establish a WebSocket connection - const connect = (): void => { - WebSocket.connect(opts.url) - .then((ws) => { - wsConnection = ws; - console.log(`${LOG_PREFIX} Connection established`); - - // Run inside NgZone to ensure Angular detects this change - ngZone.run(() => { - // Notify that connection is open - opts.openObserver?.next(undefined as unknown as Event); - // Send any pending messages - while (pendingMessages.length > 0) { - const message = pendingMessages.shift(); - if (message) webSocketSubject.next(message); - } - }); + // Combined stream with both messages and errors + const observable$ = merge( + messageSubject.asObservable(), + errorSubject.pipe( + mergeMap(err => throwError(() => err)) + ) + ); - setupMessageListener(ws); - }) - .catch((error: Error) => { - console.error(`${LOG_PREFIX} Connection failed:`, error); - reconnect(); - }); - }; - - // Function to reconnect - let reconnectTimeout: ReturnType | null = null; - const reconnect = () => { - if (reconnectTimeout) { - clearTimeout(reconnectTimeout); - } - - // Notify close observer - ngZone.run(() => { - opts.closeObserver?.next(undefined as unknown as CloseEvent); - }) - - // Remove the existing listener if it exists - removeListener(); - - // Close the existing connection if it exists - wsConnection?.disconnect().catch(err => console.warn(`${LOG_PREFIX} Error closing connection during reconnect:`, err)); - wsConnection = null; - - // Connect again after a delay - console.log(`${LOG_PREFIX} Attempting to reconnect in 2 seconds...`); - reconnectTimeout = setTimeout(() => { - reconnectTimeout = null; + ////////////////////////////////////////////////////////////// + // Track subscriptions + ////////////////////////////////////////////////////////////// + let subscriptionCount = 0; + + // Wrapper with subscription tracking + const trackedObservable$ = new Observable(subscriber => { + subscriptionCount++; + + // If this is the first subscription, connect to WebSocket + if (subscriptionCount === 1) { + ngZone.runOutsideAngular(() => { connect(); - }, 2000); - }; - - // Function to check if connection alive, and reconnect, if necessary - let pingTimeoutId: ReturnType | null = null; - const checkConnectionAliveOrReconnect = () => { - if (!pingTimeoutId && wsConnection) { - console.log(`${LOG_PREFIX} Verifying connection status with ping...`); - wsConnection.send( {type: 'Ping', data: [1]} ).then(() => { - pingTimeoutId = setTimeout(() => { // The timeout will be stopped if we receive a Pong response - console.error(`${LOG_PREFIX} No response to ping - connection appears dead`); - pingTimeoutId = null; - reconnect(); - }, 5000); - }).catch(err => { - console.error(`${LOG_PREFIX} Failed to send ping - connection is dead:`, err); - reconnect(); }); } - } - // Function to remove the message listener + const subscription = observable$.subscribe({ + next: value => subscriber.next(value), + error: err => subscriber.error(err), + complete: () => subscriber.complete() + }); + + // Cleanup function - called when unsubscribed + return () => { + subscriptionCount--; + subscription.unsubscribe(); + + // If this was the last subscription, close the WebSocket connection + if (subscriptionCount === 0) { + disconnect(); + } + }; + }); + + ////////////////////////////////////////////////////////////// + // Function to establish a WebSocket connection + ////////////////////////////////////////////////////////////// let listenerRemovalFn: (() => void) | null = null; // Store the removal function for the ws listener - const removeListener = () => { - if (listenerRemovalFn) { - try { - listenerRemovalFn(); - listenerRemovalFn = null; // Clear the reference - } catch (err) { - console.error(`${LOG_PREFIX} Error removing listener:`, err); - } + + const connect = (): void => { + console.log(`${LOG_PREFIX} Connecting to WebSocket: ${opts.url}`); + WebSocket.connect(opts.url) + .then((ws) => { + wsConnection = ws; + console.log(`${LOG_PREFIX} Connection established`); + opts.openObserver?.next(undefined as unknown as Event); + listenerRemovalFn = ws.addListener(messagesListener); + startHealthChecks(); + }) + .catch((error: Error) => { + console.error(`${LOG_PREFIX} Connection failed:`, error); + errorSubject.next(error); + }); + }; + + const disconnect = (): void => { + stopHealthChecks(); + + if (listenerRemovalFn) { + try { + listenerRemovalFn(); + } catch (err) { + console.error(`${LOG_PREFIX} Error removing listener:`, err); } + listenerRemovalFn = null; // Clear the reference + } + + const currentWs = wsConnection; + wsConnection = null; + + if (!currentWs) return; + + console.log(`${LOG_PREFIX} Closing WebSocket connection.`); + opts.closeObserver?.next(undefined as unknown as CloseEvent); + currentWs.disconnect().catch(err => console.warn(`${LOG_PREFIX} Error closing connection:`, err)); } - // Function to set up the message listener - const setupMessageListener = (ws: WebSocket) => { - listenerRemovalFn = ws.addListener((message: Message) => { - // Process message inside ngZone to trigger change detection - try { - switch (message.type) { - case 'Text': - try { - const deserializedMessage = deserializer({ data: message.data as string } as any); - ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection - } catch (err) { - console.error(`${LOG_PREFIX} Error deserializing text message:`, err); - } - break; - - case 'Binary': - try { - const uint8Array = new Uint8Array(message.data as number[]); - const deserializedMessage = deserializer({ data: uint8Array.buffer } as any); - ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection - } catch (err) { - console.error(`${LOG_PREFIX} Error deserializing binary message:`, err); - } - break; - - case 'Close': - console.log(`${LOG_PREFIX} Connection closed by server`); - reconnect(); // Auto-reconnect on server-initiated close - break; - - case 'Ping': - break; + ////////////////////////////////////////////////////////////// + // Function to check if connection alive + ////////////////////////////////////////////////////////////// + let healthCheckIntervalId: ReturnType | null = null; + let pongTimeoutId: ReturnType | null = null; - case 'Pong': - console.log(`${LOG_PREFIX} Received pong response - connection is alive`); - if (pingTimeoutId) { - // Clear the timeout since we got a response - clearTimeout(pingTimeoutId); - pingTimeoutId = null; - } - break; - - // All other message types are unexpected. Proceed with reconnect. - default: - console.warn(`${LOG_PREFIX} Received unexpected message: '${message}'`); - checkConnectionAliveOrReconnect(); - break; - } - } catch (error) { - console.error(`${LOG_PREFIX} Error processing message: `, error); + const startHealthChecks = () => { + stopHealthChecks(); // Ensure no multiple intervals are running + healthCheckIntervalId = setInterval(() => { + if (!wsConnection) { + stopHealthChecks(); + return; } - }); + if (pongTimeoutId) { + // Ping already in flight, waiting for pong. + return; + } + + wsConnection.send({ type: 'Ping', data: [] }) + .then(() => { + pongTimeoutId = setTimeout(() => { + console.error(`${LOG_PREFIX} No Pong received. Connection is likely dead.`); + errorSubject.next(new Error('Connection timed out')); + stopHealthChecks(); + }, PONG_TIMEOUT_MS); + }) + .catch(err => { + console.error(`${LOG_PREFIX} Ping send failed:`, err); + errorSubject.next(new Error(`Ping send failed: ${err}`)); + stopHealthChecks(); + }); + }, PING_INTERVAL_MS); + }; + + const stopHealthChecks = () => { + if (healthCheckIntervalId) { + clearInterval(healthCheckIntervalId); + healthCheckIntervalId = null; + } + if (pongTimeoutId) { + clearTimeout(pongTimeoutId); + pongTimeoutId = null; + } }; ////////////////////////////////////////////////////////////// - // Connect to WebSocket + // Messages listener ////////////////////////////////////////////////////////////// - console.log(`${LOG_PREFIX} Connecting to WebSocket:`, opts.url); - - // Connect outside of Angular zone for better performance - ngZone.runOutsideAngular(() => { - connect(); - }); + const messagesListener = (message: Message) => { + try { + switch (message.type) { + case 'Text': + try { + const deserializedMessage = deserializer({ data: message.data as string } as any); + ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection + } catch (err) { + console.error(`${LOG_PREFIX} Error deserializing text message:`, err); + } + break; + + case 'Binary': + try { + const uint8Array = new Uint8Array(message.data as number[]); + const deserializedMessage = deserializer({ data: uint8Array.buffer } as any); + ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection + } catch (err) { + console.error(`${LOG_PREFIX} Error deserializing binary message:`, err); + } + break; + + case 'Close': + console.warn(`${LOG_PREFIX} Connection closed by server: ${message}`); + errorSubject.next(new Error(`Connection closed by server: ${message}`)); + break; + + case 'Ping': + break; + case 'Pong': + // Pong received, clear the timeout. + if (pongTimeoutId) { + clearTimeout(pongTimeoutId); + pongTimeoutId = null; + } + break; + + // All other message types are unexpected. Proceed with reconnect. + default: + console.warn(`${LOG_PREFIX} Received unexpected message: '${message}'`); + break; + } + } catch (error) { + console.error(`${LOG_PREFIX} Error processing message: `, error); + } + } + ////////////////////////////////////////////////////////////// - // RxJS WebSocketSubject-compatible implementation + // RxJS WebSocketSubject-compatible interface ////////////////////////////////////////////////////////////// const webSocketSubject = { - // Standard Observer interface methods + asObservable: () => trackedObservable$, next: (message: T) => { // Run outside NgZone for better performance during send operations ngZone.runOutsideAngular(() => { - // If the connection is not established yet, queue the message if (!wsConnection) { - if (pendingMessages.length >= 100) { - console.error(`${LOG_PREFIX} Too many pending messages, skipping message`); - return; - } - pendingMessages.push(message); - console.warn(`${LOG_PREFIX} Connection not established yet, message queued`); - return; - } - - // Serialize the message using the provided serializer - let serializedMessage: any; + errorSubject.next(new Error('Connection not established')); + return; + } try { - serializedMessage = serializer(message); + const serializedMessage = serializer(message); // 'string' type is enough here, since default serializer for portmaster message returns string if (typeof serializedMessage !== 'string') - throw new Error('Serialized message is not a string'); + throw new Error('Serialized message is not a string'); + + wsConnection?.send(serializedMessage).catch((err: Error) => { + console.error(`${LOG_PREFIX} Error sending message:`, err); + errorSubject.next(err); + }); } catch (error) { console.error(`${LOG_PREFIX} Error serializing message:`, error); return; - } - - // Send the serialized message through the WebSocket connection - wsConnection?.send(serializedMessage).catch((err: Error) => { - console.error(`${LOG_PREFIX} Error sending message:`, err); - checkConnectionAliveOrReconnect(); - }); + } }); }, - complete: () => { if (wsConnection) { - console.log(`${LOG_PREFIX} Closing connection`); - - // Run inside NgZone to ensure Angular detects this change - ngZone.run(() => { - opts.closingObserver?.next(); - wsConnection!.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err)); - wsConnection = null; - opts.closeObserver?.next(undefined as unknown as CloseEvent); - }); - - // Remove the existing listener if it exists - removeListener(); - } - + opts.closingObserver?.next(); + disconnect(); + } messageSubject.complete(); }, - - // RxJS Observable methods required for compatibility - pipe: function(): Observable { - // @ts-ignore - Ignore the parameter type mismatch - return observable$.pipe(...arguments); - }, + subscribe: trackedObservable$.subscribe.bind(trackedObservable$), + pipe: trackedObservable$.pipe.bind(trackedObservable$), }; - // Cast to WebSocketSubject return webSocketSubject as unknown as WebSocketSubject; } \ No newline at end of file diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts index 4afed993..32953b18 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts @@ -801,7 +801,7 @@ export class PortapiService { } }, error: (err) => { - console.error(err, attrs); + console.error("[portapi] request error:", err.message || err, attrs); observer.error(err); }, complete: () => {