From bd20714db6b1f7475ad066b9ec97a2f7e69b0f15 Mon Sep 17 00:00:00 2001 From: Alexandr Stelnykovych Date: Fri, 18 Apr 2025 12:13:10 +0300 Subject: [PATCH] [desktop] Tauri WebSocket connection --- .../safing/portmaster-api/src/lib/module.ts | 4 +- .../tauri/tauri-websocket-subject.ts | 196 ++++++++++++++++++ .../src/lib/websocket.service.ts | 13 +- 3 files changed, 208 insertions(+), 5 deletions(-) create mode 100644 desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/module.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/module.ts index 467117ec..40e562a2 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/module.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/module.ts @@ -26,14 +26,14 @@ export interface ModuleConfig { export function HttpClientProviderFactory() { if (IsTauriEnvironment()) { - console.log("[app] running under tauri - using TauriHttpClient"); + console.log("[portmaster-api] Running under Tauri - using TauriHttpClient"); return provideHttpClient( withInterceptors([TauriHttpInterceptor]) ); } else { - console.log("[app] running in browser - using default HttpClient"); + console.log("[portmaster-api] Running in browser - using default HttpClient"); return provideHttpClient(); } } diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts new file mode 100644 index 00000000..712c0ec4 --- /dev/null +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts @@ -0,0 +1,196 @@ +import WebSocket, { Message } from '@tauri-apps/plugin-websocket'; +import { Subject, Observable } from 'rxjs'; +import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; + +const LOG_PREFIX = '[tauri_ws]'; + +/** + * Creates a WebSocket connection using the Tauri WebSocket API and wraps it in an RxJS WebSocketSubject-compatible interface. + * + * @template T - The type of messages sent and received through the WebSocket. + * @param {WebSocketSubjectConfig} opts - Configuration options for the WebSocket connection. + * @returns {WebSocketSubject} - An RxJS WebSocketSubject-compatible object for interacting with the WebSocket. + * @throws {Error} If the `serializer` or `deserializer` functions are not provided. + * + * @example + * const wsSubject = createTauriWsConnection({ + * url: 'ws://example.com', + * serializer: JSON.stringify, + * deserializer: JSON.parse, + * }); + */ +export function createTauriWsConnection(opts: WebSocketSubjectConfig): WebSocketSubject { + if (!opts.serializer) throw new Error(`${LOG_PREFIX} Messages Serializer not provided!`); + if (!opts.deserializer) throw new Error(`${LOG_PREFIX} Messages Deserializer not provided!`); + + const serializer = opts.serializer; + const deserializer = opts.deserializer; + + let wsConnection: WebSocket | null = null; + const messageSubject = new Subject(); + const observable$ = messageSubject.asObservable(); + + // A queue for messages that need to be sent before the connection is established + const pendingMessages: T[] = []; + + const notifySubjectError = (descriptionToLog: string, error: Error | any | null = null) => { + if (!descriptionToLog) return; + if (!error) error = new Error(descriptionToLog); + console.error(`${LOG_PREFIX} ${descriptionToLog}:`, error); + // This completes the observable and prevents further messages from being processed. + messageSubject.error(error); + } + + ////////////////////////////////////////////////////////////// + // RxJS WebSocketSubject-compatible implementation + ////////////////////////////////////////////////////////////// + const webSocketSubject = { + // Standard Observer interface methods + next: (message: T) => { + if (!wsConnection) { + if (pendingMessages.length >= 1000) { + console.error(`${LOG_PREFIX} Too many pending messages, skipping message`); + return; + } + pendingMessages.push(message); + console.log(`${LOG_PREFIX} Connection not established yet, message queued`); + return; + } + + let serializedMessage: any; + try { + serializedMessage = serializer(message); + // 'string' type is enough here, since default serializer for portmaster message returns string + if (typeof serializedMessage !== 'string') + throw new Error('Serialized message is not a string'); + } catch (error) { + console.error(`${LOG_PREFIX} Error serializing message:`, error); + return; + } + + try { + wsConnection.send(serializedMessage).catch((err: Error) => { + notifySubjectError('Error sending text message', err); + }); + } catch (error) { + notifySubjectError('Error sending message', error); + } + }, + + complete: () => { + if (wsConnection) { + console.log(`${LOG_PREFIX} Closing connection`); + opts.closingObserver?.next(undefined); + wsConnection.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err)); + wsConnection = null; + } + messageSubject.complete(); + }, + + // RxJS Observable methods required for compatibility + pipe: function(): Observable { + // @ts-ignore - Ignore the parameter type mismatch + return observable$.pipe(...arguments); + }, + }; + + ////////////////////////////////////////////////////////////// + // Connect to WebSocket + ////////////////////////////////////////////////////////////// + + const connectOptions: Record = {}; + console.log(`${LOG_PREFIX} Connecting to WebSocket:`, opts.url, connectOptions); + WebSocket.connect(opts.url, connectOptions) + .then((ws) => { + wsConnection = ws; + console.log(`${LOG_PREFIX} Connection established`); + + // Create a mock Event for the openObserver + if (opts.openObserver) { + const mockEvent = new Event('open') as Event; + opts.openObserver.next(mockEvent); + } + + // Send any pending messages + while (pendingMessages.length > 0) { + const message = pendingMessages.shift(); + if (message) webSocketSubject.next(message); + } + + try { + // Add a single listener for ALL message types according to Tauri WebSocket API + // The addListener method takes a single callback function that receives messages + ws.addListener((message: Message) => { + try { + // Handle different message types from Tauri + switch (message.type) { + case 'Text': + const textData = message.data as string; + try { + const deserializedMessage = deserializer({ data: textData } as any) ; + messageSubject.next(deserializedMessage); + } catch (err) { + notifySubjectError('Error deserializing text message', err); + } + break; + + case 'Binary': + const binaryData = message.data as number[]; + try { + const uint8Array = new Uint8Array(binaryData); + const buffer = uint8Array.buffer; + const deserializedMessage = deserializer({ data: buffer } as any) ; + messageSubject.next(deserializedMessage); + } catch (err) { + notifySubjectError('Error deserializing binary message', err); + } + break; + + case 'Close': + // Handle close message + const closeData = message.data as { code: number; reason: string } | null; + console.log(`${LOG_PREFIX} Connection closed by server`, closeData); + + if (opts.closeObserver) { + const closeEvent = { + code: closeData?.code || 1000, + reason: closeData?.reason || '', + wasClean: true, + type: 'close', + target: null + } as unknown as CloseEvent; + + opts.closeObserver.next(closeEvent); + } + + messageSubject.complete(); + wsConnection = null; + break; + + case 'Ping': + console.log(`${LOG_PREFIX} Received ping`); + break; + + case 'Pong': + console.log(`${LOG_PREFIX} Received pong`); + break; + } + } catch (error) { + console.error(`${LOG_PREFIX} Error processing message:`, error); + // Don't error the subject on message processing errors to keep connection alive + } + }); + + console.log(`${LOG_PREFIX} Listener added successfully`); + + } catch (error) { + notifySubjectError('Error adding message listener', error); + } + }) + .catch((error: Error) => { + notifySubjectError('Connection failed', error); + }); + + // Cast to WebSocketSubject + return webSocketSubject as unknown as WebSocketSubject; +} \ No newline at end of file diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts index c42efa8d..4724687e 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/websocket.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@angular/core'; import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; +import { createTauriWsConnection } from './platform-specific/tauri/tauri-websocket-subject'; +import { IsTauriEnvironment } from './platform-specific/utils'; @Injectable() export class WebsocketService { @@ -11,7 +13,12 @@ export class WebsocketService { * @param opts Options for the websocket connection. */ createConnection(opts: WebSocketSubjectConfig): WebSocketSubject { - return webSocket(opts); - } -} + if (IsTauriEnvironment()) { + console.log('[portmaster-api] Running under Tauri - Using Tauri WebSocket'); + return createTauriWsConnection(opts); + } + console.log('[portmaster-api] Running in browser - Using RxJS WebSocket'); + return webSocket(opts); + } +} \ No newline at end of file