fix(UI; tauri-websocket): Enhance WebSocket connection handling with observers and improved error management

This commit is contained in:
Alexandr Stelnykovych
2025-06-03 14:24:29 +03:00
parent e5715a9550
commit cb39e11b32

View File

@@ -19,6 +19,9 @@ const LOG_PREFIX = '[tauri_ws]';
* url: 'ws://example.com', * url: 'ws://example.com',
* serializer: JSON.stringify, * serializer: JSON.stringify,
* deserializer: JSON.parse, * deserializer: JSON.parse,
* openObserver: { next: () => console.log('Connection opened') },
* closeObserver: { next: () => console.log('Connection closed') },
* closingObserver: { next: () => console.log('Connection closing') },
* }, ngZone); * }, ngZone);
*/ */
export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZone: NgZone): WebSocketSubject<T> { export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZone: NgZone): WebSocketSubject<T> {
@@ -35,81 +38,142 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
// A queue for messages that need to be sent before the connection is established // A queue for messages that need to be sent before the connection is established
const pendingMessages: T[] = []; const pendingMessages: T[] = [];
const notifySubjectError = (descriptionToLog: string, error: Error | any | null = null) => { // Function to establish a WebSocket connection
if (!descriptionToLog) return; const connect = (): void => {
if (!error) error = new Error(descriptionToLog); WebSocket.connect(opts.url)
console.error(`${LOG_PREFIX} ${descriptionToLog}:`, error); .then((ws) => {
wsConnection = ws;
// Run inside NgZone to ensure Angular detects this change console.log(`${LOG_PREFIX} Connection established`);
ngZone.run(() => {
// This completes the observable and prevents further messages from being processed. // Run inside NgZone to ensure Angular detects this change
messageSubject.error(error); ngZone.run(() => {
}); // Notify that connection is open
} opts.openObserver?.next(undefined as unknown as Event);
// Send any pending messages
while (pendingMessages.length > 0) {
const message = pendingMessages.shift();
if (message) webSocketSubject.next(message);
}
});
////////////////////////////////////////////////////////////// setupMessageListener(ws);
// RxJS WebSocketSubject-compatible implementation })
////////////////////////////////////////////////////////////// .catch((error: Error) => {
const webSocketSubject = { console.error(`${LOG_PREFIX} Connection failed:`, error);
// Standard Observer interface methods reconnect();
next: (message: T) => { });
if (!wsConnection) { };
if (pendingMessages.length >= 1000) {
console.error(`${LOG_PREFIX} Too many pending messages, skipping message`); // Function to reconnect
return; let reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
} const reconnect = () => {
pendingMessages.push(message); if (reconnectTimeout) {
console.log(`${LOG_PREFIX} Connection not established yet, message queued`); clearTimeout(reconnectTimeout);
return;
}
let serializedMessage: any;
try {
serializedMessage = serializer(message);
// 'string' type is enough here, since default serializer for portmaster message returns string
if (typeof serializedMessage !== 'string')
throw new Error('Serialized message is not a string');
} catch (error) {
console.error(`${LOG_PREFIX} Error serializing message:`, error);
return;
} }
// Run outside NgZone for better performance during send operations // Notify close observer
ngZone.runOutsideAngular(() => { ngZone.run(() => {
try { opts.closeObserver?.next(undefined as unknown as CloseEvent);
wsConnection!.send(serializedMessage).catch((err: Error) => { })
notifySubjectError('Error sending text message', err);
}); // Remove the existing listener if it exists
} catch (error) { removeListener();
notifySubjectError('Error sending message', error);
} // Close the existing connection if it exists
}); wsConnection?.disconnect().catch(err => console.warn(`${LOG_PREFIX} Error closing connection during reconnect:`, err));
}, wsConnection = null;
// Connect again after a delay
console.log(`${LOG_PREFIX} Attempting to reconnect in 1 second...`);
reconnectTimeout = setTimeout(() => {
reconnectTimeout = null;
connect();
}, 1000);
};
complete: () => { // Function to remove the message listener
if (wsConnection) { let listenerRemovalFn: (() => void) | null = null; // Store the removal function for the ws listener
console.log(`${LOG_PREFIX} Closing connection`); const removeListener = () => {
if (listenerRemovalFn) {
// Run inside NgZone to ensure Angular detects this change try {
ngZone.run(() => { listenerRemovalFn();
if (opts.closingObserver?.next) { listenerRemovalFn = null; // Clear the reference
opts.closingObserver.next(undefined); } catch (err) {
console.error(`${LOG_PREFIX} Error removing listener:`, err);
} }
wsConnection!.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err));
wsConnection = null;
messageSubject.complete();
});
} else {
messageSubject.complete();
} }
}, }
// RxJS Observable methods required for compatibility // Function to set up the message listener
pipe: function(): Observable<any> { const setupMessageListener = (ws: WebSocket) => {
// @ts-ignore - Ignore the parameter type mismatch let pingTimeoutId: ReturnType<typeof setTimeout> | null = null;
return observable$.pipe(...arguments);
}, listenerRemovalFn = ws.addListener((message: Message) => {
// Process message inside ngZone to trigger change detection
try {
switch (message.type) {
case 'Text':
try {
const deserializedMessage = deserializer({ data: message.data as string } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing text message:`, err);
}
break;
case 'Binary':
try {
const uint8Array = new Uint8Array(message.data as number[]);
const deserializedMessage = deserializer({ data: uint8Array.buffer } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing binary message:`, err);
}
break;
case 'Close':
console.log(`${LOG_PREFIX} Connection closed by server`);
reconnect(); // Auto-reconnect on server-initiated close
break;
case 'Ping':
break;
case 'Pong':
console.log(`${LOG_PREFIX} Received pong response - connection is alive`);
if (pingTimeoutId) {
// Clear the timeout since we got a response
clearTimeout(pingTimeoutId);
pingTimeoutId = null;
}
break;
// All other message types are unexpected. Proceed with reconnect.
default:
console.warn(`${LOG_PREFIX} Received unexpected message: '${message}'`);
// Don't immediately reconnect - first verify if the connection is actually dead.
// If we don't receive a pong response within 2 seconds, we consider the connection dead.
if (!pingTimeoutId && wsConnection) {
console.log(`${LOG_PREFIX} Verifying connection status with ping...`);
wsConnection.send( {type: 'Ping', data: [1]} ).then(() => {
pingTimeoutId = setTimeout(() => {
console.error(`${LOG_PREFIX} No response to ping - connection appears dead`);
pingTimeoutId = null;
reconnect();
}, 2000);
}).catch(err => {
console.error(`${LOG_PREFIX} Failed to send ping - connection is dead:`, err);
reconnect();
});
}
break;
}
} catch (error) {
console.error(`${LOG_PREFIX} Error processing message: `, error);
}
});
}; };
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
@@ -119,103 +183,73 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
// Connect outside of Angular zone for better performance // Connect outside of Angular zone for better performance
ngZone.runOutsideAngular(() => { ngZone.runOutsideAngular(() => {
WebSocket.connect(opts.url) connect();
.then((ws) => {
wsConnection = ws;
console.log(`${LOG_PREFIX} Connection established`);
// Run inside NgZone to ensure Angular detects this connection event
ngZone.run(() => {
// Create a mock Event for the openObserver
if (opts.openObserver) {
const mockEvent = new Event('open') as Event;
opts.openObserver.next(mockEvent);
}
// Send any pending messages
while (pendingMessages.length > 0) {
const message = pendingMessages.shift();
if (message) webSocketSubject.next(message);
}
});
try {
// Add a single listener for ALL message types according to Tauri WebSocket API
ws.addListener((message: Message) => {
// Process message inside ngZone to trigger change detection
ngZone.run(() => {
try {
// Handle different message types from Tauri
switch (message.type) {
case 'Text':
const textData = message.data as string;
try {
const deserializedMessage = deserializer({ data: textData } as any);
messageSubject.next(deserializedMessage);
} catch (err) {
notifySubjectError('Error deserializing text message', err);
}
break;
case 'Binary':
const binaryData = message.data as number[];
try {
const uint8Array = new Uint8Array(binaryData);
const buffer = uint8Array.buffer;
const deserializedMessage = deserializer({ data: buffer } as any);
messageSubject.next(deserializedMessage);
} catch (err) {
notifySubjectError('Error deserializing binary message', err);
}
break;
case 'Close':
// Handle close message
const closeData = message.data as { code: number; reason: string } | null;
console.log(`${LOG_PREFIX} Connection closed by server`, closeData);
if (opts.closeObserver) {
const closeEvent = {
code: closeData?.code || 1000,
reason: closeData?.reason || '',
wasClean: true,
type: 'close',
target: null
} as unknown as CloseEvent;
opts.closeObserver.next(closeEvent);
}
messageSubject.complete();
wsConnection = null;
break;
case 'Ping':
console.log(`${LOG_PREFIX} Received ping`);
break;
case 'Pong':
console.log(`${LOG_PREFIX} Received pong`);
break;
}
} catch (error) {
console.error(`${LOG_PREFIX} Error processing message:`, error);
// Don't error the subject on message processing errors to keep connection alive
}
});
});
console.log(`${LOG_PREFIX} Listener added successfully`);
} catch (error) {
notifySubjectError('Error adding message listener', error);
}
})
.catch((error: Error) => {
notifySubjectError('Connection failed', error);
});
}); });
//////////////////////////////////////////////////////////////
// RxJS WebSocketSubject-compatible implementation
//////////////////////////////////////////////////////////////
const webSocketSubject = {
// Standard Observer interface methods
next: (message: T) => {
// Run outside NgZone for better performance during send operations
ngZone.runOutsideAngular(() => {
// If the connection is not established yet, queue the message
if (!wsConnection) {
if (pendingMessages.length >= 100) {
console.error(`${LOG_PREFIX} Too many pending messages, skipping message`);
return;
}
pendingMessages.push(message);
console.warn(`${LOG_PREFIX} Connection not established yet, message queued`);
return;
}
// Serialize the message using the provided serializer
let serializedMessage: any;
try {
serializedMessage = serializer(message);
// 'string' type is enough here, since default serializer for portmaster message returns string
if (typeof serializedMessage !== 'string')
throw new Error('Serialized message is not a string');
} catch (error) {
console.error(`${LOG_PREFIX} Error serializing message:`, error);
return;
}
// Send the serialized message through the WebSocket connection
wsConnection?.send(serializedMessage).catch((err: Error) => {
console.error(`${LOG_PREFIX} Error sending message:`, err);
});
});
},
complete: () => {
if (wsConnection) {
console.log(`${LOG_PREFIX} Closing connection`);
// Run inside NgZone to ensure Angular detects this change
ngZone.run(() => {
opts.closingObserver?.next();
wsConnection!.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err));
wsConnection = null;
opts.closeObserver?.next(undefined as unknown as CloseEvent);
});
// Remove the existing listener if it exists
removeListener();
}
messageSubject.complete();
},
// RxJS Observable methods required for compatibility
pipe: function(): Observable<any> {
// @ts-ignore - Ignore the parameter type mismatch
return observable$.pipe(...arguments);
},
};
// Cast to WebSocketSubject<T> // Cast to WebSocketSubject<T>
return webSocketSubject as unknown as WebSocketSubject<T>; return webSocketSubject as unknown as WebSocketSubject<T>;
} }