refactor(UI; tauri-websocket): Simplify Tauri WebSocket connection by removing NgZone dependency and adding inactivity timeout handling

This commit is contained in:
Alexandr Stelnykovych
2025-06-09 10:17:45 +03:00
parent 795d99cc12
commit 93f87f4dc4
3 changed files with 69 additions and 51 deletions

View File

@@ -1,7 +1,6 @@
import WebSocket, { ConnectionConfig, Message } from '@tauri-apps/plugin-websocket';
import WebSocket, { Message } from '@tauri-apps/plugin-websocket';
import { Subject, Observable, merge, mergeMap, throwError } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { NgZone } from '@angular/core';
const LOG_PREFIX = '[tauri_ws]';
const PING_INTERVAL_MS = 10000; // Send a ping every PING_INTERVAL_MS milliseconds
@@ -11,7 +10,6 @@ const PONG_TIMEOUT_MS = 5000; // Wait PONG_TIMEOUT_MS milliseconds for a pon
*
* @template T - The type of messages sent and received through the WebSocket.
* @param {WebSocketSubjectConfig<T>} opts - Configuration options for the WebSocket connection.
* @param {NgZone} ngZone - Angular's NgZone to ensure change detection runs properly.
* @returns {WebSocketSubject<T>} - An RxJS WebSocketSubject-compatible object for interacting with the WebSocket.
* @throws {Error} If the `serializer` or `deserializer` functions are not provided.
*
@@ -25,7 +23,7 @@ const PONG_TIMEOUT_MS = 5000; // Wait PONG_TIMEOUT_MS milliseconds for a pon
* closingObserver: { next: () => console.log('Connection closing') },
* }, ngZone);
*/
export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZone: NgZone): WebSocketSubject<T> {
export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>): WebSocketSubject<T> {
if (!opts.serializer) throw new Error(`${LOG_PREFIX} Messages Serializer not provided!`);
if (!opts.deserializer) throw new Error(`${LOG_PREFIX} Messages Deserializer not provided!`);
@@ -55,9 +53,7 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
// If this is the first subscription, connect to WebSocket
if (subscriptionCount === 1) {
ngZone.runOutsideAngular(() => {
connect();
});
}
const subscription = observable$.subscribe({
@@ -88,6 +84,7 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
WebSocket.connect(opts.url)
.then((ws) => {
wsConnection = ws;
lastMessageReceivedTime = 0;
console.log(`${LOG_PREFIX} Connection established`);
opts.openObserver?.next(undefined as unknown as Event);
listenerRemovalFn = ws.addListener(messagesListener);
@@ -101,7 +98,7 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
const disconnect = (): void => {
stopHealthChecks();
if (listenerRemovalFn) {
try {
listenerRemovalFn();
@@ -166,16 +163,32 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
}
};
//////////////////////////////////////////////////////////////
// Track last message received time to detect inactivity timeout
// If no message received for INACTIVITY_TIMEOUT_MS,
// assume connection was paused due to OS hibernation or similar.
//////////////////////////////////////////////////////////////
const INACTIVITY_TIMEOUT_MS = 1000 * 60 * 15; // 15 minutes inactivity timeout
let lastMessageReceivedTime: number = 0;
const checkReceivedMessageTime = () => {
if ((Date.now() - lastMessageReceivedTime) > INACTIVITY_TIMEOUT_MS && lastMessageReceivedTime>0) {
console.error(`${LOG_PREFIX} Inactivity timeout reached. Assuming connection was paused.`);
errorSubject.next(new Error('Inactivity timeout'));
}
lastMessageReceivedTime = Date.now();
}
//////////////////////////////////////////////////////////////
// Messages listener
//////////////////////////////////////////////////////////////
const messagesListener = (message: Message) => {
checkReceivedMessageTime(); // Update last message received time
try {
switch (message.type) {
case 'Text':
try {
const deserializedMessage = deserializer({ data: message.data as string } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
messageSubject.next(deserializedMessage);
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing text message:`, err);
}
@@ -185,7 +198,7 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
try {
const uint8Array = new Uint8Array(message.data as number[]);
const deserializedMessage = deserializer({ data: uint8Array.buffer } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
messageSubject.next(deserializedMessage);
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing binary message:`, err);
}
@@ -223,27 +236,24 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
const webSocketSubject = {
asObservable: () => trackedObservable$,
next: (message: T) => {
// Run outside NgZone for better performance during send operations
ngZone.runOutsideAngular(() => {
if (!wsConnection) {
errorSubject.next(new Error('Connection not established'));
return;
}
try {
const serializedMessage = serializer(message);
// 'string' type is enough here, since default serializer for portmaster message returns string
if (typeof serializedMessage !== 'string')
throw new Error('Serialized message is not a string');
if (!wsConnection) {
errorSubject.next(new Error('Connection not established'));
return;
}
try {
const serializedMessage = serializer(message);
// 'string' type is enough here, since default serializer for portmaster message returns string
if (typeof serializedMessage !== 'string')
throw new Error('Serialized message is not a string');
wsConnection?.send(serializedMessage).catch((err: Error) => {
console.error(`${LOG_PREFIX} Error sending message:`, err);
errorSubject.next(err);
});
} catch (error) {
console.error(`${LOG_PREFIX} Error serializing message:`, error);
return;
}
});
wsConnection?.send(serializedMessage).catch((err: Error) => {
console.error(`${LOG_PREFIX} Error sending message:`, err);
errorSubject.next(err);
});
} catch (error) {
console.error(`${LOG_PREFIX} Error serializing message:`, error);
return;
}
},
complete: () => {
if (wsConnection) {

View File

@@ -15,6 +15,7 @@ import {
retryWhen,
takeWhile,
tap,
bufferTime,
} from 'rxjs/operators';
import { WebSocketSubject } from 'rxjs/webSocket';
import {
@@ -111,27 +112,34 @@ export class PortapiService {
)
)
)
)
),
// Buffer all incoming messages for X ms. This creates batches (arrays of messages).
bufferTime(25),
// Don't process empty batches that can occur during idle periods.
filter(batch => batch.length > 0),
)
.subscribe(
(msg) => {
const observer = this._streams$.get(msg.id);
if (!observer) {
// it's expected that we receive done messages from time to time here
// as portmaster sends a "done" message after we "cancel" a subscription
// and we already remove the observer from _streams$ if the subscription
// is unsubscribed. So just hide that warning message for "done"
if (msg.type !== 'done') {
console.warn(
`Received message for unknown request id ${msg.id} (type=${msg.type})`,
msg
);
// The subscriber now receives an array of messages (a batch).
(batch) => {
// Re-enter the Angular Zone ONCE for the entire batch.
this.ngZone.run(() => {
for (const msg of batch) {
const observer = this._streams$.get(msg.id);
if (!observer) {
// it's expected that we receive done messages from time to time here
// as portmaster sends a "done" message after we "cancel" a subscription
// and we already remove the observer from _streams$ if the subscription
// is unsubscribed. So just hide that warning message for "done"
if (msg.type !== 'done') {
console.warn(`Received message for unknown request id ${msg.id} (type=${msg.type})`, msg);
}
continue;
}
// forward the message to the actual stream.
observer.next(msg as ReplyMessage);
}
return;
}
// forward the message to the actual stream.
observer.next(msg as ReplyMessage);
});
},
console.error,
() => {

View File

@@ -1,11 +1,11 @@
import { Injectable, NgZone } from '@angular/core';
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { createTauriWsConnection } from './platform-specific/tauri/tauri-websocket-subject';
import { IsTauriEnvironment } from './platform-specific/utils';
@Injectable()
export class WebsocketService {
constructor(private ngZone: NgZone) { }
constructor() { }
/**
* createConnection creates a new websocket connection using opts.
@@ -13,9 +13,9 @@ export class WebsocketService {
* @param opts Options for the websocket connection.
*/
createConnection<T>(opts: WebSocketSubjectConfig<T>): WebSocketSubject<T> {
if (IsTauriEnvironment()) {
if (IsTauriEnvironment()) {
console.log('[portmaster-api] Running under Tauri - Using Tauri WebSocket');
return createTauriWsConnection<T>(opts, this.ngZone);
return createTauriWsConnection<T>(opts);
}
console.log('[portmaster-api] Running in browser - Using RxJS WebSocket');