diff --git a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts index 97965a79..6c44a5ac 100644 --- a/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts +++ b/desktop/angular/projects/safing/portmaster-api/src/lib/platform-specific/tauri/tauri-websocket-subject.ts @@ -19,6 +19,9 @@ const LOG_PREFIX = '[tauri_ws]'; * url: 'ws://example.com', * serializer: JSON.stringify, * deserializer: JSON.parse, + * openObserver: { next: () => console.log('Connection opened') }, + * closeObserver: { next: () => console.log('Connection closed') }, + * closingObserver: { next: () => console.log('Connection closing') }, * }, ngZone); */ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZone: NgZone): WebSocketSubject { @@ -35,81 +38,142 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo // A queue for messages that need to be sent before the connection is established const pendingMessages: T[] = []; - const notifySubjectError = (descriptionToLog: string, error: Error | any | null = null) => { - if (!descriptionToLog) return; - if (!error) error = new Error(descriptionToLog); - console.error(`${LOG_PREFIX} ${descriptionToLog}:`, error); - - // Run inside NgZone to ensure Angular detects this change - ngZone.run(() => { - // This completes the observable and prevents further messages from being processed. - messageSubject.error(error); - }); - } + // Function to establish a WebSocket connection + const connect = (): void => { + WebSocket.connect(opts.url) + .then((ws) => { + wsConnection = ws; + console.log(`${LOG_PREFIX} Connection established`); + + // Run inside NgZone to ensure Angular detects this change + ngZone.run(() => { + // Notify that connection is open + opts.openObserver?.next(undefined as unknown as Event); + // Send any pending messages + while (pendingMessages.length > 0) { + const message = pendingMessages.shift(); + if (message) webSocketSubject.next(message); + } + }); - ////////////////////////////////////////////////////////////// - // RxJS WebSocketSubject-compatible implementation - ////////////////////////////////////////////////////////////// - const webSocketSubject = { - // Standard Observer interface methods - next: (message: T) => { - if (!wsConnection) { - if (pendingMessages.length >= 1000) { - console.error(`${LOG_PREFIX} Too many pending messages, skipping message`); - return; - } - pendingMessages.push(message); - console.log(`${LOG_PREFIX} Connection not established yet, message queued`); - return; - } - - let serializedMessage: any; - try { - serializedMessage = serializer(message); - // 'string' type is enough here, since default serializer for portmaster message returns string - if (typeof serializedMessage !== 'string') - throw new Error('Serialized message is not a string'); - } catch (error) { - console.error(`${LOG_PREFIX} Error serializing message:`, error); - return; + setupMessageListener(ws); + }) + .catch((error: Error) => { + console.error(`${LOG_PREFIX} Connection failed:`, error); + reconnect(); + }); + }; + + // Function to reconnect + let reconnectTimeout: ReturnType | null = null; + const reconnect = () => { + if (reconnectTimeout) { + clearTimeout(reconnectTimeout); } - // Run outside NgZone for better performance during send operations - ngZone.runOutsideAngular(() => { - try { - wsConnection!.send(serializedMessage).catch((err: Error) => { - notifySubjectError('Error sending text message', err); - }); - } catch (error) { - notifySubjectError('Error sending message', error); - } - }); - }, + // Notify close observer + ngZone.run(() => { + opts.closeObserver?.next(undefined as unknown as CloseEvent); + }) + + // Remove the existing listener if it exists + removeListener(); + + // Close the existing connection if it exists + wsConnection?.disconnect().catch(err => console.warn(`${LOG_PREFIX} Error closing connection during reconnect:`, err)); + wsConnection = null; + + // Connect again after a delay + console.log(`${LOG_PREFIX} Attempting to reconnect in 1 second...`); + reconnectTimeout = setTimeout(() => { + reconnectTimeout = null; + connect(); + }, 1000); + }; - complete: () => { - if (wsConnection) { - console.log(`${LOG_PREFIX} Closing connection`); - - // Run inside NgZone to ensure Angular detects this change - ngZone.run(() => { - if (opts.closingObserver?.next) { - opts.closingObserver.next(undefined); + // Function to remove the message listener + let listenerRemovalFn: (() => void) | null = null; // Store the removal function for the ws listener + const removeListener = () => { + if (listenerRemovalFn) { + try { + listenerRemovalFn(); + listenerRemovalFn = null; // Clear the reference + } catch (err) { + console.error(`${LOG_PREFIX} Error removing listener:`, err); } - - wsConnection!.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err)); - wsConnection = null; - messageSubject.complete(); - }); - } else { - messageSubject.complete(); } - }, + } - // RxJS Observable methods required for compatibility - pipe: function(): Observable { - // @ts-ignore - Ignore the parameter type mismatch - return observable$.pipe(...arguments); - }, + // Function to set up the message listener + const setupMessageListener = (ws: WebSocket) => { + let pingTimeoutId: ReturnType | null = null; + + listenerRemovalFn = ws.addListener((message: Message) => { + // Process message inside ngZone to trigger change detection + try { + switch (message.type) { + case 'Text': + try { + const deserializedMessage = deserializer({ data: message.data as string } as any); + ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection + } catch (err) { + console.error(`${LOG_PREFIX} Error deserializing text message:`, err); + } + break; + + case 'Binary': + try { + const uint8Array = new Uint8Array(message.data as number[]); + const deserializedMessage = deserializer({ data: uint8Array.buffer } as any); + ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection + } catch (err) { + console.error(`${LOG_PREFIX} Error deserializing binary message:`, err); + } + break; + + case 'Close': + console.log(`${LOG_PREFIX} Connection closed by server`); + reconnect(); // Auto-reconnect on server-initiated close + break; + + case 'Ping': + break; + + case 'Pong': + console.log(`${LOG_PREFIX} Received pong response - connection is alive`); + if (pingTimeoutId) { + // Clear the timeout since we got a response + clearTimeout(pingTimeoutId); + pingTimeoutId = null; + } + break; + + // All other message types are unexpected. Proceed with reconnect. + default: + console.warn(`${LOG_PREFIX} Received unexpected message: '${message}'`); + + // Don't immediately reconnect - first verify if the connection is actually dead. + // If we don't receive a pong response within 2 seconds, we consider the connection dead. + if (!pingTimeoutId && wsConnection) { + console.log(`${LOG_PREFIX} Verifying connection status with ping...`); + wsConnection.send( {type: 'Ping', data: [1]} ).then(() => { + pingTimeoutId = setTimeout(() => { + console.error(`${LOG_PREFIX} No response to ping - connection appears dead`); + pingTimeoutId = null; + reconnect(); + }, 2000); + }).catch(err => { + console.error(`${LOG_PREFIX} Failed to send ping - connection is dead:`, err); + reconnect(); + }); + } + + break; + } + } catch (error) { + console.error(`${LOG_PREFIX} Error processing message: `, error); + } + }); }; ////////////////////////////////////////////////////////////// @@ -119,103 +183,73 @@ export function createTauriWsConnection(opts: WebSocketSubjectConfig, ngZo // Connect outside of Angular zone for better performance ngZone.runOutsideAngular(() => { - WebSocket.connect(opts.url) - .then((ws) => { - wsConnection = ws; - console.log(`${LOG_PREFIX} Connection established`); - - // Run inside NgZone to ensure Angular detects this connection event - ngZone.run(() => { - // Create a mock Event for the openObserver - if (opts.openObserver) { - const mockEvent = new Event('open') as Event; - opts.openObserver.next(mockEvent); - } - - // Send any pending messages - while (pendingMessages.length > 0) { - const message = pendingMessages.shift(); - if (message) webSocketSubject.next(message); - } - }); - - try { - // Add a single listener for ALL message types according to Tauri WebSocket API - ws.addListener((message: Message) => { - // Process message inside ngZone to trigger change detection - ngZone.run(() => { - try { - // Handle different message types from Tauri - switch (message.type) { - case 'Text': - const textData = message.data as string; - try { - const deserializedMessage = deserializer({ data: textData } as any); - messageSubject.next(deserializedMessage); - } catch (err) { - notifySubjectError('Error deserializing text message', err); - } - break; - - case 'Binary': - const binaryData = message.data as number[]; - try { - const uint8Array = new Uint8Array(binaryData); - const buffer = uint8Array.buffer; - const deserializedMessage = deserializer({ data: buffer } as any); - messageSubject.next(deserializedMessage); - } catch (err) { - notifySubjectError('Error deserializing binary message', err); - } - break; - - case 'Close': - // Handle close message - const closeData = message.data as { code: number; reason: string } | null; - console.log(`${LOG_PREFIX} Connection closed by server`, closeData); - - if (opts.closeObserver) { - const closeEvent = { - code: closeData?.code || 1000, - reason: closeData?.reason || '', - wasClean: true, - type: 'close', - target: null - } as unknown as CloseEvent; - - opts.closeObserver.next(closeEvent); - } - - messageSubject.complete(); - wsConnection = null; - break; - - case 'Ping': - console.log(`${LOG_PREFIX} Received ping`); - break; - - case 'Pong': - console.log(`${LOG_PREFIX} Received pong`); - break; - } - } catch (error) { - console.error(`${LOG_PREFIX} Error processing message:`, error); - // Don't error the subject on message processing errors to keep connection alive - } - }); - }); - - console.log(`${LOG_PREFIX} Listener added successfully`); - - } catch (error) { - notifySubjectError('Error adding message listener', error); - } - }) - .catch((error: Error) => { - notifySubjectError('Connection failed', error); - }); + connect(); }); + ////////////////////////////////////////////////////////////// + // RxJS WebSocketSubject-compatible implementation + ////////////////////////////////////////////////////////////// + const webSocketSubject = { + // Standard Observer interface methods + next: (message: T) => { + // Run outside NgZone for better performance during send operations + ngZone.runOutsideAngular(() => { + // If the connection is not established yet, queue the message + if (!wsConnection) { + if (pendingMessages.length >= 100) { + console.error(`${LOG_PREFIX} Too many pending messages, skipping message`); + return; + } + pendingMessages.push(message); + console.warn(`${LOG_PREFIX} Connection not established yet, message queued`); + return; + } + + // Serialize the message using the provided serializer + let serializedMessage: any; + try { + serializedMessage = serializer(message); + // 'string' type is enough here, since default serializer for portmaster message returns string + if (typeof serializedMessage !== 'string') + throw new Error('Serialized message is not a string'); + } catch (error) { + console.error(`${LOG_PREFIX} Error serializing message:`, error); + return; + } + + // Send the serialized message through the WebSocket connection + wsConnection?.send(serializedMessage).catch((err: Error) => { + console.error(`${LOG_PREFIX} Error sending message:`, err); + }); + }); + }, + + complete: () => { + if (wsConnection) { + console.log(`${LOG_PREFIX} Closing connection`); + + // Run inside NgZone to ensure Angular detects this change + ngZone.run(() => { + opts.closingObserver?.next(); + wsConnection!.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err)); + wsConnection = null; + opts.closeObserver?.next(undefined as unknown as CloseEvent); + }); + + // Remove the existing listener if it exists + removeListener(); + } + + messageSubject.complete(); + }, + + // RxJS Observable methods required for compatibility + pipe: function(): Observable { + // @ts-ignore - Ignore the parameter type mismatch + return observable$.pipe(...arguments); + }, + }; + // Cast to WebSocketSubject return webSocketSubject as unknown as WebSocketSubject; } \ No newline at end of file