diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts index a3b40fb4..dfbc5d2a 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts @@ -1,7 +1,6 @@ -import WebSocket, { ConnectionConfig, Message } from '@tauri-apps/plugin-websocket'; +import WebSocket, { Message } from '@tauri-apps/plugin-websocket'; import { Subject, Observable, merge, mergeMap, throwError } from 'rxjs'; import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; -import { NgZone } from '@angular/core'; const LOG_PREFIX = '[tauri_ws]'; const PING_INTERVAL_MS = 10000; // Send a ping every PING_INTERVAL_MS milliseconds @@ -11,7 +10,6 @@ const PONG_TIMEOUT_MS = 5000; // Wait PONG_TIMEOUT_MS milliseconds for a pon * * @template T - The type of messages sent and received through the WebSocket. * @param {WebSocketSubjectConfig} opts - Configuration options for the WebSocket connection. - * @param {NgZone} ngZone - Angular's NgZone to ensure change detection runs properly. * @returns {WebSocketSubject} - An RxJS WebSocketSubject-compatible object for interacting with the WebSocket. * @throws {Error} If the `serializer` or `deserializer` functions are not provided. * @@ -25,7 +23,7 @@ const PONG_TIMEOUT_MS = 5000; // Wait PONG_TIMEOUT_MS milliseconds for a pon * closingObserver: { next: () => console.log('Connection closing') }, * }, ngZone); */ -export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZone: NgZone): WebSocketSubject { +export function createTauriWsConnection(opts: WebSocketSubjectConfig): WebSocketSubject { if (!opts.serializer) throw new Error(`${LOG_PREFIX} Messages Serializer not provided!`); if (!opts.deserializer) throw new Error(`${LOG_PREFIX} Messages Deserializer not provided!`); @@ -55,9 +53,7 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo // If this is the first subscription, connect to WebSocket if (subscriptionCount === 1) { - ngZone.runOutsideAngular(() => { connect(); - }); } const subscription = observable$.subscribe({ @@ -88,6 +84,7 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo WebSocket.connect(opts.url) .then((ws) => { wsConnection = ws; + lastMessageReceivedTime = 0; console.log(`${LOG_PREFIX} Connection established`); opts.openObserver?.next(undefined as unknown as Event); listenerRemovalFn = ws.addListener(messagesListener); @@ -101,7 +98,7 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo const disconnect = (): void => { stopHealthChecks(); - + if (listenerRemovalFn) { try { listenerRemovalFn(); @@ -166,16 +163,32 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo } }; + ////////////////////////////////////////////////////////////// + // Track last message received time to detect inactivity timeout + // If no message received for INACTIVITY_TIMEOUT_MS, + // assume connection was paused due to OS hibernation or similar. + ////////////////////////////////////////////////////////////// + const INACTIVITY_TIMEOUT_MS = 1000 * 60 * 15; // 15 minutes inactivity timeout + let lastMessageReceivedTime: number = 0; + const checkReceivedMessageTime = () => { + if ((Date.now() - lastMessageReceivedTime) > INACTIVITY_TIMEOUT_MS && lastMessageReceivedTime>0) { + console.error(`${LOG_PREFIX} Inactivity timeout reached. Assuming connection was paused.`); + errorSubject.next(new Error('Inactivity timeout')); + } + lastMessageReceivedTime = Date.now(); + } + ////////////////////////////////////////////////////////////// // Messages listener ////////////////////////////////////////////////////////////// const messagesListener = (message: Message) => { + checkReceivedMessageTime(); // Update last message received time try { switch (message.type) { case 'Text': try { const deserializedMessage = deserializer({ data: message.data as string } as any); - ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection + messageSubject.next(deserializedMessage); } catch (err) { console.error(`${LOG_PREFIX} Error deserializing text message:`, err); } @@ -185,7 +198,7 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo try { const uint8Array = new Uint8Array(message.data as number[]); const deserializedMessage = deserializer({ data: uint8Array.buffer } as any); - ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection + messageSubject.next(deserializedMessage); } catch (err) { console.error(`${LOG_PREFIX} Error deserializing binary message:`, err); } @@ -223,27 +236,24 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo const webSocketSubject = { asObservable: () => trackedObservable$, next: (message: T) => { - // Run outside NgZone for better performance during send operations - ngZone.runOutsideAngular(() => { - if (!wsConnection) { - errorSubject.next(new Error('Connection not established')); - return; - } - try { - const serializedMessage = serializer(message); - // 'string' type is enough here, since default serializer for portmaster message returns string - if (typeof serializedMessage !== 'string') - throw new Error('Serialized message is not a string'); + if (!wsConnection) { + errorSubject.next(new Error('Connection not established')); + return; + } + try { + const serializedMessage = serializer(message); + // 'string' type is enough here, since default serializer for portmaster message returns string + if (typeof serializedMessage !== 'string') + throw new Error('Serialized message is not a string'); - wsConnection?.send(serializedMessage).catch((err: Error) => { - console.error(`${LOG_PREFIX} Error sending message:`, err); - errorSubject.next(err); - }); - } catch (error) { - console.error(`${LOG_PREFIX} Error serializing message:`, error); - return; - } - }); + wsConnection?.send(serializedMessage).catch((err: Error) => { + console.error(`${LOG_PREFIX} Error sending message:`, err); + errorSubject.next(err); + }); + } catch (error) { + console.error(`${LOG_PREFIX} Error serializing message:`, error); + return; + } }, complete: () => { if (wsConnection) { diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts index 32953b18..a5676dd7 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/portapi.service.ts @@ -15,6 +15,7 @@ import { retryWhen, takeWhile, tap, + bufferTime, } from 'rxjs/operators'; import { WebSocketSubject } from 'rxjs/webSocket'; import { @@ -111,27 +112,34 @@ export class PortapiService { ) ) ) - ) + ), + // Buffer all incoming messages for X ms. This creates batches (arrays of messages). + bufferTime(25), + // Don't process empty batches that can occur during idle periods. + filter(batch => batch.length > 0), ) .subscribe( - (msg) => { - const observer = this._streams$.get(msg.id); - if (!observer) { - // it's expected that we receive done messages from time to time here - // as portmaster sends a "done" message after we "cancel" a subscription - // and we already remove the observer from _streams$ if the subscription - // is unsubscribed. So just hide that warning message for "done" - if (msg.type !== 'done') { - console.warn( - `Received message for unknown request id ${msg.id} (type=${msg.type})`, - msg - ); + // The subscriber now receives an array of messages (a batch). + (batch) => { + // Re-enter the Angular Zone ONCE for the entire batch. + this.ngZone.run(() => { + for (const msg of batch) { + const observer = this._streams$.get(msg.id); + if (!observer) { + // it's expected that we receive done messages from time to time here + // as portmaster sends a "done" message after we "cancel" a subscription + // and we already remove the observer from _streams$ if the subscription + // is unsubscribed. So just hide that warning message for "done" + if (msg.type !== 'done') { + console.warn(`Received message for unknown request id ${msg.id} (type=${msg.type})`, msg); + } + continue; + } + + // forward the message to the actual stream. + observer.next(msg as ReplyMessage); } - return; - } - - // forward the message to the actual stream. - observer.next(msg as ReplyMessage); + }); }, console.error, () => { diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts index eaf7ce95..4724687e 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts @@ -1,11 +1,11 @@ -import { Injectable, NgZone } from '@angular/core'; +import { Injectable } from '@angular/core'; import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; import { createTauriWsConnection } from './platform-specific/tauri/tauri-websocket-subject'; import { IsTauriEnvironment } from './platform-specific/utils'; @Injectable() export class WebsocketService { - constructor(private ngZone: NgZone) { } + constructor() { } /** * createConnection creates a new websocket connection using opts. @@ -13,9 +13,9 @@ export class WebsocketService { * @param opts Options for the websocket connection. */ createConnection(opts: WebSocketSubjectConfig): WebSocketSubject { - if (IsTauriEnvironment()) { + if (IsTauriEnvironment()) { console.log('[portmaster-api] Running under Tauri - Using Tauri WebSocket'); - return createTauriWsConnection(opts, this.ngZone); + return createTauriWsConnection(opts); } console.log('[portmaster-api] Running in browser - Using RxJS WebSocket');