(UI; tauri-websocket): Enhance error handling and connection management in WebSocket implementation

This commit is contained in:
Alexandr Stelnykovych
2025-06-06 18:45:13 +03:00
parent 2e5076b3bc
commit bd12a784c2
2 changed files with 189 additions and 185 deletions

View File

@@ -1,10 +1,11 @@
import WebSocket, { ConnectionConfig, Message } from '@tauri-apps/plugin-websocket';
import { Subject, Observable } from 'rxjs';
import { Subject, Observable, merge, mergeMap, throwError } from 'rxjs';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { NgZone } from '@angular/core';
const LOG_PREFIX = '[tauri_ws]';
const LOG_PREFIX = '[tauri_ws]';
const PING_INTERVAL_MS = 10000; // Send a ping every PING_INTERVAL_MS milliseconds
const PONG_TIMEOUT_MS = 5000; // Wait PONG_TIMEOUT_MS milliseconds for a pong response
/**
* Creates a WebSocket connection using the Tauri WebSocket API and wraps it in an RxJS WebSocketSubject-compatible interface.
*
@@ -33,224 +34,227 @@ export function createTauriWsConnection<T>(opts: WebSocketSubjectConfig<T>, ngZo
let wsConnection: WebSocket | null = null;
const messageSubject = new Subject<T>();
const observable$ = messageSubject.asObservable();
// A queue for messages that need to be sent before the connection is established
const pendingMessages: T[] = [];
const errorSubject = new Subject<any>(); // Added for error propagation
// Function to establish a WebSocket connection
const connect = (): void => {
WebSocket.connect(opts.url)
.then((ws) => {
wsConnection = ws;
console.log(`${LOG_PREFIX} Connection established`);
// Run inside NgZone to ensure Angular detects this change
ngZone.run(() => {
// Notify that connection is open
opts.openObserver?.next(undefined as unknown as Event);
// Send any pending messages
while (pendingMessages.length > 0) {
const message = pendingMessages.shift();
if (message) webSocketSubject.next(message);
}
});
// Combined stream with both messages and errors
const observable$ = merge(
messageSubject.asObservable(),
errorSubject.pipe(
mergeMap(err => throwError(() => err))
)
);
setupMessageListener(ws);
})
.catch((error: Error) => {
console.error(`${LOG_PREFIX} Connection failed:`, error);
reconnect();
});
};
// Function to reconnect
let reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
const reconnect = () => {
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
}
// Notify close observer
ngZone.run(() => {
opts.closeObserver?.next(undefined as unknown as CloseEvent);
})
// Remove the existing listener if it exists
removeListener();
// Close the existing connection if it exists
wsConnection?.disconnect().catch(err => console.warn(`${LOG_PREFIX} Error closing connection during reconnect:`, err));
wsConnection = null;
// Connect again after a delay
console.log(`${LOG_PREFIX} Attempting to reconnect in 2 seconds...`);
reconnectTimeout = setTimeout(() => {
reconnectTimeout = null;
//////////////////////////////////////////////////////////////
// Track subscriptions
//////////////////////////////////////////////////////////////
let subscriptionCount = 0;
// Wrapper with subscription tracking
const trackedObservable$ = new Observable<T>(subscriber => {
subscriptionCount++;
// If this is the first subscription, connect to WebSocket
if (subscriptionCount === 1) {
ngZone.runOutsideAngular(() => {
connect();
}, 2000);
};
// Function to check if connection alive, and reconnect, if necessary
let pingTimeoutId: ReturnType<typeof setTimeout> | null = null;
const checkConnectionAliveOrReconnect = () => {
if (!pingTimeoutId && wsConnection) {
console.log(`${LOG_PREFIX} Verifying connection status with ping...`);
wsConnection.send( {type: 'Ping', data: [1]} ).then(() => {
pingTimeoutId = setTimeout(() => { // The timeout will be stopped if we receive a Pong response
console.error(`${LOG_PREFIX} No response to ping - connection appears dead`);
pingTimeoutId = null;
reconnect();
}, 5000);
}).catch(err => {
console.error(`${LOG_PREFIX} Failed to send ping - connection is dead:`, err);
reconnect();
});
}
}
// Function to remove the message listener
const subscription = observable$.subscribe({
next: value => subscriber.next(value),
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
// Cleanup function - called when unsubscribed
return () => {
subscriptionCount--;
subscription.unsubscribe();
// If this was the last subscription, close the WebSocket connection
if (subscriptionCount === 0) {
disconnect();
}
};
});
//////////////////////////////////////////////////////////////
// Function to establish a WebSocket connection
//////////////////////////////////////////////////////////////
let listenerRemovalFn: (() => void) | null = null; // Store the removal function for the ws listener
const removeListener = () => {
if (listenerRemovalFn) {
try {
listenerRemovalFn();
listenerRemovalFn = null; // Clear the reference
} catch (err) {
console.error(`${LOG_PREFIX} Error removing listener:`, err);
}
const connect = (): void => {
console.log(`${LOG_PREFIX} Connecting to WebSocket: ${opts.url}`);
WebSocket.connect(opts.url)
.then((ws) => {
wsConnection = ws;
console.log(`${LOG_PREFIX} Connection established`);
opts.openObserver?.next(undefined as unknown as Event);
listenerRemovalFn = ws.addListener(messagesListener);
startHealthChecks();
})
.catch((error: Error) => {
console.error(`${LOG_PREFIX} Connection failed:`, error);
errorSubject.next(error);
});
};
const disconnect = (): void => {
stopHealthChecks();
if (listenerRemovalFn) {
try {
listenerRemovalFn();
} catch (err) {
console.error(`${LOG_PREFIX} Error removing listener:`, err);
}
listenerRemovalFn = null; // Clear the reference
}
const currentWs = wsConnection;
wsConnection = null;
if (!currentWs) return;
console.log(`${LOG_PREFIX} Closing WebSocket connection.`);
opts.closeObserver?.next(undefined as unknown as CloseEvent);
currentWs.disconnect().catch(err => console.warn(`${LOG_PREFIX} Error closing connection:`, err));
}
// Function to set up the message listener
const setupMessageListener = (ws: WebSocket) => {
listenerRemovalFn = ws.addListener((message: Message) => {
// Process message inside ngZone to trigger change detection
try {
switch (message.type) {
case 'Text':
try {
const deserializedMessage = deserializer({ data: message.data as string } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing text message:`, err);
}
break;
case 'Binary':
try {
const uint8Array = new Uint8Array(message.data as number[]);
const deserializedMessage = deserializer({ data: uint8Array.buffer } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing binary message:`, err);
}
break;
case 'Close':
console.log(`${LOG_PREFIX} Connection closed by server`);
reconnect(); // Auto-reconnect on server-initiated close
break;
case 'Ping':
break;
//////////////////////////////////////////////////////////////
// Function to check if connection alive
//////////////////////////////////////////////////////////////
let healthCheckIntervalId: ReturnType<typeof setInterval> | null = null;
let pongTimeoutId: ReturnType<typeof setTimeout> | null = null;
case 'Pong':
console.log(`${LOG_PREFIX} Received pong response - connection is alive`);
if (pingTimeoutId) {
// Clear the timeout since we got a response
clearTimeout(pingTimeoutId);
pingTimeoutId = null;
}
break;
// All other message types are unexpected. Proceed with reconnect.
default:
console.warn(`${LOG_PREFIX} Received unexpected message: '${message}'`);
checkConnectionAliveOrReconnect();
break;
}
} catch (error) {
console.error(`${LOG_PREFIX} Error processing message: `, error);
const startHealthChecks = () => {
stopHealthChecks(); // Ensure no multiple intervals are running
healthCheckIntervalId = setInterval(() => {
if (!wsConnection) {
stopHealthChecks();
return;
}
});
if (pongTimeoutId) {
// Ping already in flight, waiting for pong.
return;
}
wsConnection.send({ type: 'Ping', data: [] })
.then(() => {
pongTimeoutId = setTimeout(() => {
console.error(`${LOG_PREFIX} No Pong received. Connection is likely dead.`);
errorSubject.next(new Error('Connection timed out'));
stopHealthChecks();
}, PONG_TIMEOUT_MS);
})
.catch(err => {
console.error(`${LOG_PREFIX} Ping send failed:`, err);
errorSubject.next(new Error(`Ping send failed: ${err}`));
stopHealthChecks();
});
}, PING_INTERVAL_MS);
};
const stopHealthChecks = () => {
if (healthCheckIntervalId) {
clearInterval(healthCheckIntervalId);
healthCheckIntervalId = null;
}
if (pongTimeoutId) {
clearTimeout(pongTimeoutId);
pongTimeoutId = null;
}
};
//////////////////////////////////////////////////////////////
// Connect to WebSocket
// Messages listener
//////////////////////////////////////////////////////////////
console.log(`${LOG_PREFIX} Connecting to WebSocket:`, opts.url);
// Connect outside of Angular zone for better performance
ngZone.runOutsideAngular(() => {
connect();
});
const messagesListener = (message: Message) => {
try {
switch (message.type) {
case 'Text':
try {
const deserializedMessage = deserializer({ data: message.data as string } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing text message:`, err);
}
break;
case 'Binary':
try {
const uint8Array = new Uint8Array(message.data as number[]);
const deserializedMessage = deserializer({ data: uint8Array.buffer } as any);
ngZone.run(() => { messageSubject.next(deserializedMessage); }); // inside ngZone to trigger change detection
} catch (err) {
console.error(`${LOG_PREFIX} Error deserializing binary message:`, err);
}
break;
case 'Close':
console.warn(`${LOG_PREFIX} Connection closed by server: ${message}`);
errorSubject.next(new Error(`Connection closed by server: ${message}`));
break;
case 'Ping':
break;
case 'Pong':
// Pong received, clear the timeout.
if (pongTimeoutId) {
clearTimeout(pongTimeoutId);
pongTimeoutId = null;
}
break;
// All other message types are unexpected. Proceed with reconnect.
default:
console.warn(`${LOG_PREFIX} Received unexpected message: '${message}'`);
break;
}
} catch (error) {
console.error(`${LOG_PREFIX} Error processing message: `, error);
}
}
//////////////////////////////////////////////////////////////
// RxJS WebSocketSubject-compatible implementation
// RxJS WebSocketSubject-compatible interface
//////////////////////////////////////////////////////////////
const webSocketSubject = {
// Standard Observer interface methods
asObservable: () => trackedObservable$,
next: (message: T) => {
// Run outside NgZone for better performance during send operations
ngZone.runOutsideAngular(() => {
// If the connection is not established yet, queue the message
if (!wsConnection) {
if (pendingMessages.length >= 100) {
console.error(`${LOG_PREFIX} Too many pending messages, skipping message`);
return;
}
pendingMessages.push(message);
console.warn(`${LOG_PREFIX} Connection not established yet, message queued`);
return;
}
// Serialize the message using the provided serializer
let serializedMessage: any;
errorSubject.next(new Error('Connection not established'));
return;
}
try {
serializedMessage = serializer(message);
const serializedMessage = serializer(message);
// 'string' type is enough here, since default serializer for portmaster message returns string
if (typeof serializedMessage !== 'string')
throw new Error('Serialized message is not a string');
throw new Error('Serialized message is not a string');
wsConnection?.send(serializedMessage).catch((err: Error) => {
console.error(`${LOG_PREFIX} Error sending message:`, err);
errorSubject.next(err);
});
} catch (error) {
console.error(`${LOG_PREFIX} Error serializing message:`, error);
return;
}
// Send the serialized message through the WebSocket connection
wsConnection?.send(serializedMessage).catch((err: Error) => {
console.error(`${LOG_PREFIX} Error sending message:`, err);
checkConnectionAliveOrReconnect();
});
}
});
},
complete: () => {
if (wsConnection) {
console.log(`${LOG_PREFIX} Closing connection`);
// Run inside NgZone to ensure Angular detects this change
ngZone.run(() => {
opts.closingObserver?.next();
wsConnection!.disconnect().catch((err: Error) => console.error(`${LOG_PREFIX} Error closing connection:`, err));
wsConnection = null;
opts.closeObserver?.next(undefined as unknown as CloseEvent);
});
// Remove the existing listener if it exists
removeListener();
}
opts.closingObserver?.next();
disconnect();
}
messageSubject.complete();
},
// RxJS Observable methods required for compatibility
pipe: function(): Observable<any> {
// @ts-ignore - Ignore the parameter type mismatch
return observable$.pipe(...arguments);
},
subscribe: trackedObservable$.subscribe.bind(trackedObservable$),
pipe: trackedObservable$.pipe.bind(trackedObservable$),
};
// Cast to WebSocketSubject<T>
return webSocketSubject as unknown as WebSocketSubject<T>;
}

View File

@@ -801,7 +801,7 @@ export class PortapiService {
}
},
error: (err) => {
console.error(err, attrs);
console.error("[portapi] request error:", err.message || err, attrs);
observer.error(err);
},
complete: () => {