[Feature] Voice reconnection and resuming (#2873)
* Voice receive fix (use system-selected port) * Update SocketGuild.cs * Reconnect voice after moved, resume voice connection, don't invoke Disconnected event when is going to reconnect * no more collection primitives * Disconnected event rallback & dispose audio client after finished * Update src/Discord.Net.WebSocket/Audio/AudioClient.cs * Update src/Discord.Net.WebSocket/Audio/AudioClient.cs ---------
This commit is contained in:
14
src/Discord.Net.WebSocket/API/Voice/ResumeParams.cs
Normal file
14
src/Discord.Net.WebSocket/API/Voice/ResumeParams.cs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
using Newtonsoft.Json;
|
||||||
|
|
||||||
|
namespace Discord.API.Voice
|
||||||
|
{
|
||||||
|
public class ResumeParams
|
||||||
|
{
|
||||||
|
[JsonProperty("server_id")]
|
||||||
|
public ulong ServerId { get; set; }
|
||||||
|
[JsonProperty("session_id")]
|
||||||
|
public string SessionId { get; set; }
|
||||||
|
[JsonProperty("token")]
|
||||||
|
public string Token { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
using Discord.API.Voice;
|
using Discord.API.Voice;
|
||||||
using Discord.Audio.Streams;
|
using Discord.Audio.Streams;
|
||||||
using Discord.Logging;
|
using Discord.Logging;
|
||||||
|
using Discord.Net;
|
||||||
using Discord.Net.Converters;
|
using Discord.Net.Converters;
|
||||||
using Discord.WebSocket;
|
using Discord.WebSocket;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
@@ -9,18 +10,23 @@ using System;
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Net.WebSockets;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace Discord.Audio
|
namespace Discord.Audio
|
||||||
{
|
{
|
||||||
//TODO: Add audio reconnecting
|
|
||||||
internal partial class AudioClient : IAudioClient
|
internal partial class AudioClient : IAudioClient
|
||||||
{
|
{
|
||||||
private static readonly int ConnectionTimeoutMs = 30000; // 30 seconds
|
private static readonly int ConnectionTimeoutMs = 30000; // 30 seconds
|
||||||
private static readonly int KeepAliveIntervalMs = 5000; // 5 seconds
|
private static readonly int KeepAliveIntervalMs = 5000; // 5 seconds
|
||||||
|
|
||||||
|
private static readonly int[] BlacklistedResumeCodes = new int[]
|
||||||
|
{
|
||||||
|
4001, 4002, 4003, 4004, 4005, 4006, 4009, 4012, 1014, 4016
|
||||||
|
};
|
||||||
|
|
||||||
private struct StreamPair
|
private struct StreamPair
|
||||||
{
|
{
|
||||||
public AudioInStream Reader;
|
public AudioInStream Reader;
|
||||||
@@ -49,6 +55,8 @@ namespace Discord.Audio
|
|||||||
private ulong _userId;
|
private ulong _userId;
|
||||||
private uint _ssrc;
|
private uint _ssrc;
|
||||||
private bool _isSpeaking;
|
private bool _isSpeaking;
|
||||||
|
private StopReason _stopReason;
|
||||||
|
private bool _resuming;
|
||||||
|
|
||||||
public SocketGuild Guild { get; }
|
public SocketGuild Guild { get; }
|
||||||
public DiscordVoiceAPIClient ApiClient { get; private set; }
|
public DiscordVoiceAPIClient ApiClient { get; private set; }
|
||||||
@@ -56,6 +64,7 @@ namespace Discord.Audio
|
|||||||
public int UdpLatency { get; private set; }
|
public int UdpLatency { get; private set; }
|
||||||
public ulong ChannelId { get; internal set; }
|
public ulong ChannelId { get; internal set; }
|
||||||
internal byte[] SecretKey { get; private set; }
|
internal byte[] SecretKey { get; private set; }
|
||||||
|
internal bool IsFinished { get; private set; }
|
||||||
|
|
||||||
private DiscordSocketClient Discord => Guild.Discord;
|
private DiscordSocketClient Discord => Guild.Discord;
|
||||||
public ConnectionState ConnectionState => _connection.State;
|
public ConnectionState ConnectionState => _connection.State;
|
||||||
@@ -78,7 +87,7 @@ namespace Discord.Audio
|
|||||||
_connection = new ConnectionManager(_stateLock, _audioLogger, ConnectionTimeoutMs,
|
_connection = new ConnectionManager(_stateLock, _audioLogger, ConnectionTimeoutMs,
|
||||||
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
|
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
|
||||||
_connection.Connected += () => _connectedEvent.InvokeAsync();
|
_connection.Connected += () => _connectedEvent.InvokeAsync();
|
||||||
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
|
_connection.Disconnected += (exception, _) => _disconnectedEvent.InvokeAsync(exception);
|
||||||
_heartbeatTimes = new ConcurrentQueue<long>();
|
_heartbeatTimes = new ConcurrentQueue<long>();
|
||||||
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
|
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
|
||||||
_ssrcMap = new ConcurrentDictionary<uint, ulong>();
|
_ssrcMap = new ConcurrentDictionary<uint, ulong>();
|
||||||
@@ -110,15 +119,30 @@ namespace Discord.Audio
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Task StopAsync()
|
public Task StopAsync()
|
||||||
=> _connection.StopAsync();
|
=> StopAsync(StopReason.Normal);
|
||||||
|
|
||||||
|
internal Task StopAsync(StopReason stopReason)
|
||||||
|
{
|
||||||
|
_stopReason = stopReason;
|
||||||
|
return _connection.StopAsync();
|
||||||
|
}
|
||||||
|
|
||||||
private async Task OnConnectingAsync()
|
private async Task OnConnectingAsync()
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
|
await _audioLogger.DebugAsync($"Connecting ApiClient. Voice server: wss://{_url}").ConfigureAwait(false);
|
||||||
await ApiClient.ConnectAsync($"wss://{_url}?v={DiscordConfig.VoiceAPIVersion}").ConfigureAwait(false);
|
await ApiClient.ConnectAsync($"wss://{_url}?v={DiscordConfig.VoiceAPIVersion}").ConfigureAwait(false);
|
||||||
await _audioLogger.DebugAsync($"Listening on port {ApiClient.UdpPort}").ConfigureAwait(false);
|
await _audioLogger.DebugAsync($"Listening on port {ApiClient.UdpPort}").ConfigureAwait(false);
|
||||||
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
|
|
||||||
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
|
if (!_resuming)
|
||||||
|
{
|
||||||
|
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
|
||||||
|
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
await _audioLogger.DebugAsync("Sending Resume").ConfigureAwait(false);
|
||||||
|
await ApiClient.SendResume(_token, _sessionId).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
//Wait for READY
|
//Wait for READY
|
||||||
await _connection.WaitAsync().ConfigureAwait(false);
|
await _connection.WaitAsync().ConfigureAwait(false);
|
||||||
@@ -128,6 +152,63 @@ namespace Discord.Audio
|
|||||||
await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false);
|
||||||
await ApiClient.DisconnectAsync().ConfigureAwait(false);
|
await ApiClient.DisconnectAsync().ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (_stopReason == StopReason.Unknown && ex.InnerException is WebSocketException exception)
|
||||||
|
{
|
||||||
|
await _audioLogger.WarningAsync(
|
||||||
|
$"Audio connection terminated with unknown reason. Code: {exception.ErrorCode} - {exception.Message}",
|
||||||
|
exception);
|
||||||
|
|
||||||
|
if (_resuming)
|
||||||
|
{
|
||||||
|
await _audioLogger.WarningAsync("Resume failed");
|
||||||
|
|
||||||
|
_resuming = false;
|
||||||
|
|
||||||
|
await FinishDisconnect(ex, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (BlacklistedResumeCodes.Contains(exception.ErrorCode))
|
||||||
|
{
|
||||||
|
await FinishDisconnect(ex, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await ClearHeartBeaters();
|
||||||
|
|
||||||
|
_resuming = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await FinishDisconnect(ex, _stopReason != StopReason.Moved);
|
||||||
|
|
||||||
|
if (_stopReason == StopReason.Normal)
|
||||||
|
{
|
||||||
|
await _audioLogger.DebugAsync("Sending Voice State").ConfigureAwait(false);
|
||||||
|
await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
_stopReason = StopReason.Unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task FinishDisconnect(Exception ex, bool wontTryReconnect)
|
||||||
|
{
|
||||||
|
await _audioLogger.DebugAsync("Finishing audio connection").ConfigureAwait(false);
|
||||||
|
|
||||||
|
await ClearHeartBeaters().ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (wontTryReconnect)
|
||||||
|
{
|
||||||
|
await _connection.StopAsync().ConfigureAwait(false);
|
||||||
|
|
||||||
|
await ClearInputStreamsAsync().ConfigureAwait(false);
|
||||||
|
|
||||||
|
IsFinished = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ClearHeartBeaters()
|
||||||
|
{
|
||||||
//Wait for tasks to complete
|
//Wait for tasks to complete
|
||||||
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
|
||||||
|
|
||||||
@@ -143,12 +224,11 @@ namespace Discord.Audio
|
|||||||
{ }
|
{ }
|
||||||
_lastMessageTime = 0;
|
_lastMessageTime = 0;
|
||||||
|
|
||||||
await ClearInputStreamsAsync().ConfigureAwait(false);
|
while (_keepaliveTimes.TryDequeue(out _))
|
||||||
|
{ }
|
||||||
await _audioLogger.DebugAsync("Sending Voice State").ConfigureAwait(false);
|
|
||||||
await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#region Streams
|
||||||
public AudioOutStream CreateOpusStream(int bufferMillis)
|
public AudioOutStream CreateOpusStream(int bufferMillis)
|
||||||
{
|
{
|
||||||
var outputStream = new OutputStream(ApiClient); //Ignores header
|
var outputStream = new OutputStream(ApiClient); //Ignores header
|
||||||
@@ -217,6 +297,7 @@ namespace Discord.Audio
|
|||||||
_ssrcMap.Clear();
|
_ssrcMap.Clear();
|
||||||
_streams.Clear();
|
_streams.Clear();
|
||||||
}
|
}
|
||||||
|
#endregion
|
||||||
|
|
||||||
private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
|
private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
|
||||||
{
|
{
|
||||||
@@ -285,7 +366,7 @@ namespace Discord.Audio
|
|||||||
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
|
||||||
|
|
||||||
var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer);
|
var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer);
|
||||||
_ssrcMap[data.Ssrc] = data.UserId; //TODO: Memory Leak: SSRCs are never cleaned up
|
_ssrcMap[data.Ssrc] = data.UserId;
|
||||||
|
|
||||||
await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking);
|
await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking);
|
||||||
}
|
}
|
||||||
@@ -299,6 +380,17 @@ namespace Discord.Audio
|
|||||||
await _clientDisconnectedEvent.InvokeAsync(data.UserId);
|
await _clientDisconnectedEvent.InvokeAsync(data.UserId);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case VoiceOpCode.Resumed:
|
||||||
|
{
|
||||||
|
await _audioLogger.DebugAsync($"Voice connection resumed: wss://{_url}");
|
||||||
|
_resuming = false;
|
||||||
|
|
||||||
|
_heartbeatTask = RunHeartbeatAsync(_heartbeatInterval, _connection.CancelToken);
|
||||||
|
_keepaliveTask = RunKeepaliveAsync(_connection.CancelToken);
|
||||||
|
|
||||||
|
_ = _connection.CompleteAsync();
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
|
await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
|
||||||
break;
|
break;
|
||||||
@@ -485,6 +577,49 @@ namespace Discord.Audio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Waits until all post-disconnect actions are done.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeout">Maximum time to wait.</param>
|
||||||
|
/// <returns>
|
||||||
|
/// A <see cref="Task"/> that represents an asynchronous process of waiting.
|
||||||
|
/// </returns>
|
||||||
|
internal async Task WaitForDisconnectAsync(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
if (ConnectionState == ConnectionState.Disconnected)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var completion = new TaskCompletionSource<Exception>();
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var _ = Task.Delay(timeout, cts.Token).ContinueWith(_ =>
|
||||||
|
{
|
||||||
|
completion.TrySetException(new TimeoutException("Exceeded maximum time to wait"));
|
||||||
|
cts.Dispose();
|
||||||
|
}, cts.Token);
|
||||||
|
|
||||||
|
_connection.Disconnected += HandleDisconnectSubscription;
|
||||||
|
|
||||||
|
await completion.Task.ConfigureAwait(false);
|
||||||
|
|
||||||
|
Task HandleDisconnectSubscription(Exception exception, bool reconnect)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
cts.Cancel();
|
||||||
|
completion.TrySetResult(exception);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_connection.Disconnected -= HandleDisconnectSubscription;
|
||||||
|
cts.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
internal void Dispose(bool disposing)
|
internal void Dispose(bool disposing)
|
||||||
{
|
{
|
||||||
if (disposing)
|
if (disposing)
|
||||||
@@ -496,5 +631,13 @@ namespace Discord.Audio
|
|||||||
}
|
}
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public void Dispose() => Dispose(true);
|
public void Dispose() => Dispose(true);
|
||||||
|
|
||||||
|
internal enum StopReason
|
||||||
|
{
|
||||||
|
Unknown = 0,
|
||||||
|
Normal,
|
||||||
|
Disconnected,
|
||||||
|
Moved
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -172,8 +172,8 @@ namespace Discord
|
|||||||
|
|
||||||
await _onDisconnecting(ex).ConfigureAwait(false);
|
await _onDisconnecting(ex).ConfigureAwait(false);
|
||||||
|
|
||||||
await _disconnectedEvent.InvokeAsync(ex, isReconnecting).ConfigureAwait(false);
|
|
||||||
State = ConnectionState.Disconnected;
|
State = ConnectionState.Disconnected;
|
||||||
|
await _disconnectedEvent.InvokeAsync(ex, isReconnecting).ConfigureAwait(false);
|
||||||
await _logger.InfoAsync("Disconnected").ConfigureAwait(false);
|
await _logger.InfoAsync("Disconnected").ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -166,6 +166,16 @@ namespace Discord.Audio
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task SendResume(string token, string sessionId)
|
||||||
|
{
|
||||||
|
return SendAsync(VoiceOpCode.Resume, new ResumeParams
|
||||||
|
{
|
||||||
|
ServerId = GuildId,
|
||||||
|
SessionId = sessionId,
|
||||||
|
Token = token
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public async Task ConnectAsync(string url)
|
public async Task ConnectAsync(string url)
|
||||||
{
|
{
|
||||||
await _connectionLock.WaitAsync().ConfigureAwait(false);
|
await _connectionLock.WaitAsync().ConfigureAwait(false);
|
||||||
|
|||||||
@@ -1687,7 +1687,7 @@ namespace Discord.WebSocket
|
|||||||
if (after.VoiceChannel != null && _audioClient.ChannelId != after.VoiceChannel?.Id)
|
if (after.VoiceChannel != null && _audioClient.ChannelId != after.VoiceChannel?.Id)
|
||||||
{
|
{
|
||||||
_audioClient.ChannelId = after.VoiceChannel.Id;
|
_audioClient.ChannelId = after.VoiceChannel.Id;
|
||||||
await RepopulateAudioStreamsAsync().ConfigureAwait(false);
|
await _audioClient.StopAsync(Audio.AudioClient.StopReason.Moved);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -1711,7 +1711,13 @@ namespace Discord.WebSocket
|
|||||||
if (_voiceStates.TryRemove(id, out SocketVoiceState voiceState))
|
if (_voiceStates.TryRemove(id, out SocketVoiceState voiceState))
|
||||||
{
|
{
|
||||||
if (_audioClient != null)
|
if (_audioClient != null)
|
||||||
|
{
|
||||||
await _audioClient.RemoveInputStreamAsync(id).ConfigureAwait(false); //User changed channels, end their stream
|
await _audioClient.RemoveInputStreamAsync(id).ConfigureAwait(false); //User changed channels, end their stream
|
||||||
|
|
||||||
|
if (id == CurrentUser.Id)
|
||||||
|
await _audioClient.StopAsync(Audio.AudioClient.StopReason.Disconnected);
|
||||||
|
}
|
||||||
|
|
||||||
return voiceState;
|
return voiceState;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
@@ -1755,7 +1761,7 @@ namespace Discord.WebSocket
|
|||||||
var audioClient = new AudioClient(this, Discord.GetAudioId(), channelId);
|
var audioClient = new AudioClient(this, Discord.GetAudioId(), channelId);
|
||||||
audioClient.Disconnected += async ex =>
|
audioClient.Disconnected += async ex =>
|
||||||
{
|
{
|
||||||
if (!promise.Task.IsCompleted)
|
if (promise.Task.IsCompleted && audioClient.IsFinished)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{ audioClient.Dispose(); }
|
{ audioClient.Dispose(); }
|
||||||
@@ -1866,6 +1872,21 @@ namespace Discord.WebSocket
|
|||||||
if (_audioClient != null)
|
if (_audioClient != null)
|
||||||
{
|
{
|
||||||
await RepopulateAudioStreamsAsync().ConfigureAwait(false);
|
await RepopulateAudioStreamsAsync().ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (_audioClient.ConnectionState != ConnectionState.Disconnected)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _audioClient.WaitForDisconnectAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (TimeoutException)
|
||||||
|
{
|
||||||
|
await Discord.LogManager.WarningAsync("Failed to wait for disconnect audio client in time", null).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await Task.Delay(TimeSpan.FromMilliseconds(5)).ConfigureAwait(false);
|
||||||
|
|
||||||
await _audioClient.StartAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false);
|
await _audioClient.StartAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user