Use Discord heartbeat interval in audio. (#2765)
Reduce heartbeat interval to ensure Discord receives it within range. Refactor some AudioClient code. Co-authored-by: Quin Lynch <49576606+quinchs@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
8d5022acb8
commit
9cedfbcdd9
@@ -238,6 +238,15 @@ namespace Discord
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public const int MaxApplicationTagCount = 5;
|
public const int MaxApplicationTagCount = 5;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the factor to reduce the heartbeat interval.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// If a heartbeat takes longer than the interval estimated by Discord, the connection will be closed.
|
||||||
|
/// This factor is used to reduce the interval and ensure that Discord will get the heartbeat within the estimated interval.
|
||||||
|
/// </remarks>
|
||||||
|
internal const double HeartbeatIntervalFactor = 0.9;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Returns the maximum length of a voice channel status.
|
/// Returns the maximum length of a voice channel status.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -18,7 +18,10 @@ namespace Discord.Audio
|
|||||||
//TODO: Add audio reconnecting
|
//TODO: Add audio reconnecting
|
||||||
internal partial class AudioClient : IAudioClient
|
internal partial class AudioClient : IAudioClient
|
||||||
{
|
{
|
||||||
internal struct StreamPair
|
private static readonly int ConnectionTimeoutMs = 30000; // 30 seconds
|
||||||
|
private static readonly int KeepAliveIntervalMs = 5000; // 5 seconds
|
||||||
|
|
||||||
|
private struct StreamPair
|
||||||
{
|
{
|
||||||
public AudioInStream Reader;
|
public AudioInStream Reader;
|
||||||
public AudioOutStream Writer;
|
public AudioOutStream Writer;
|
||||||
@@ -40,6 +43,7 @@ namespace Discord.Audio
|
|||||||
private readonly ConcurrentDictionary<ulong, StreamPair> _streams;
|
private readonly ConcurrentDictionary<ulong, StreamPair> _streams;
|
||||||
|
|
||||||
private Task _heartbeatTask, _keepaliveTask;
|
private Task _heartbeatTask, _keepaliveTask;
|
||||||
|
private int _heartbeatInterval;
|
||||||
private long _lastMessageTime;
|
private long _lastMessageTime;
|
||||||
private string _url, _sessionId, _token;
|
private string _url, _sessionId, _token;
|
||||||
private ulong _userId;
|
private ulong _userId;
|
||||||
@@ -71,7 +75,7 @@ namespace Discord.Audio
|
|||||||
ApiClient.ReceivedPacket += ProcessPacketAsync;
|
ApiClient.ReceivedPacket += ProcessPacketAsync;
|
||||||
|
|
||||||
_stateLock = new SemaphoreSlim(1, 1);
|
_stateLock = new SemaphoreSlim(1, 1);
|
||||||
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
|
_connection = new ConnectionManager(_stateLock, _audioLogger, ConnectionTimeoutMs,
|
||||||
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
|
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
|
||||||
_connection.Connected += () => _connectedEvent.InvokeAsync();
|
_connection.Connected += () => _connectedEvent.InvokeAsync();
|
||||||
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
|
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
|
||||||
@@ -113,8 +117,8 @@ namespace Discord.Audio
|
|||||||
private async Task OnConnectingAsync()
|
private async Task OnConnectingAsync()
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
|
||||||
await ApiClient.ConnectAsync("wss://" + _url + "?v=" + DiscordConfig.VoiceAPIVersion).ConfigureAwait(false);
|
await ApiClient.ConnectAsync($"wss://{_url}?v={DiscordConfig.VoiceAPIVersion}").ConfigureAwait(false);
|
||||||
await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false);
|
await _audioLogger.DebugAsync($"Listening on port {ApiClient.UdpPort}").ConfigureAwait(false);
|
||||||
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
|
||||||
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
|
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
|
||||||
|
|
||||||
@@ -128,13 +132,13 @@ namespace Discord.Audio
|
|||||||
|
|
||||||
//Wait for tasks to complete
|
//Wait for tasks to complete
|
||||||
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
|
||||||
var heartbeatTask = _heartbeatTask;
|
|
||||||
if (heartbeatTask != null)
|
if (_heartbeatTask != null)
|
||||||
await heartbeatTask.ConfigureAwait(false);
|
await _heartbeatTask.ConfigureAwait(false);
|
||||||
_heartbeatTask = null;
|
_heartbeatTask = null;
|
||||||
var keepaliveTask = _keepaliveTask;
|
|
||||||
if (keepaliveTask != null)
|
if (_keepaliveTask != null)
|
||||||
await keepaliveTask.ConfigureAwait(false);
|
await _keepaliveTask.ConfigureAwait(false);
|
||||||
_keepaliveTask = null;
|
_keepaliveTask = null;
|
||||||
|
|
||||||
while (_heartbeatTimes.TryDequeue(out _))
|
while (_heartbeatTimes.TryDequeue(out _))
|
||||||
@@ -194,11 +198,12 @@ namespace Discord.Audio
|
|||||||
{
|
{
|
||||||
if (_streams.TryGetValue(id, out StreamPair streamPair))
|
if (_streams.TryGetValue(id, out StreamPair streamPair))
|
||||||
return streamPair.Reader;
|
return streamPair.Reader;
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
internal async Task RemoveInputStreamAsync(ulong userId)
|
internal async Task RemoveInputStreamAsync(ulong userId)
|
||||||
{
|
{
|
||||||
if (_streams.TryRemove(userId, out var pair))
|
if (_streams.TryRemove(userId, out StreamPair pair))
|
||||||
{
|
{
|
||||||
await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
|
await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
|
||||||
pair.Reader.Dispose();
|
pair.Reader.Dispose();
|
||||||
@@ -236,8 +241,7 @@ namespace Discord.Audio
|
|||||||
ApiClient.SetUdpEndpoint(data.Ip, data.Port);
|
ApiClient.SetUdpEndpoint(data.Ip, data.Port);
|
||||||
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);
|
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);
|
||||||
|
|
||||||
|
_heartbeatTask = RunHeartbeatAsync(_heartbeatInterval, _connection.CancelToken);
|
||||||
_heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case VoiceOpCode.SessionDescription:
|
case VoiceOpCode.SessionDescription:
|
||||||
@@ -250,10 +254,10 @@ namespace Discord.Audio
|
|||||||
|
|
||||||
SecretKey = data.SecretKey;
|
SecretKey = data.SecretKey;
|
||||||
_isSpeaking = false;
|
_isSpeaking = false;
|
||||||
await ApiClient.SendSetSpeaking(false).ConfigureAwait(false);
|
await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false);
|
||||||
_keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken);
|
_keepaliveTask = RunKeepaliveAsync(_connection.CancelToken);
|
||||||
|
|
||||||
var _ = _connection.CompleteAsync();
|
_ = _connection.CompleteAsync();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case VoiceOpCode.HeartbeatAck:
|
case VoiceOpCode.HeartbeatAck:
|
||||||
@@ -270,6 +274,14 @@ namespace Discord.Audio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case VoiceOpCode.Hello:
|
||||||
|
{
|
||||||
|
await _audioLogger.DebugAsync("Received Hello").ConfigureAwait(false);
|
||||||
|
var data = (payload as JToken).ToObject<HelloEvent>(_serializer);
|
||||||
|
|
||||||
|
_heartbeatInterval = data.HeartbeatInterval;
|
||||||
|
}
|
||||||
|
break;
|
||||||
case VoiceOpCode.Speaking:
|
case VoiceOpCode.Speaking:
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
|
||||||
@@ -291,13 +303,12 @@ namespace Discord.Audio
|
|||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
|
await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
await _audioLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false);
|
await _audioLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private async Task ProcessPacketAsync(byte[] packet)
|
private async Task ProcessPacketAsync(byte[] packet)
|
||||||
@@ -358,21 +369,20 @@ namespace Discord.Audio
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
|
if (!RTPReadStream.TryReadSsrc(packet, 0, out uint ssrc))
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (!_ssrcMap.TryGetValue(ssrc, out var userId))
|
else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId))
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
|
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (!_streams.TryGetValue(userId, out var pair))
|
else if (!_streams.TryGetValue(userId, out StreamPair pair))
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
|
await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
|
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
|
||||||
@@ -380,7 +390,7 @@ namespace Discord.Audio
|
|||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
|
||||||
return;
|
}
|
||||||
}
|
}
|
||||||
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
|
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
@@ -389,19 +399,20 @@ namespace Discord.Audio
|
|||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
await _audioLogger.WarningAsync("Failed to process UDP packet", ex).ConfigureAwait(false);
|
await _audioLogger.WarningAsync("Failed to process UDP packet", ex).ConfigureAwait(false);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
|
private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
|
||||||
{
|
{
|
||||||
|
int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor);
|
||||||
|
|
||||||
// TODO: Clean this up when Discord's session patch is live
|
// TODO: Clean this up when Discord's session patch is live
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
|
||||||
while (!cancelToken.IsCancellationRequested)
|
while (!cancelToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
var now = Environment.TickCount;
|
int now = Environment.TickCount;
|
||||||
|
|
||||||
//Did server respond to our last heartbeat?
|
//Did server respond to our last heartbeat?
|
||||||
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
|
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
|
||||||
@@ -421,7 +432,8 @@ namespace Discord.Audio
|
|||||||
await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false);
|
await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
|
int delay = Math.Max(0, delayInterval - Latency);
|
||||||
|
await Task.Delay(delay, cancelToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
@@ -434,14 +446,14 @@ namespace Discord.Audio
|
|||||||
await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
|
await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken)
|
private async Task RunKeepaliveAsync(CancellationToken cancelToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false);
|
||||||
while (!cancelToken.IsCancellationRequested)
|
while (!cancelToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
var now = Environment.TickCount;
|
int now = Environment.TickCount;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -454,7 +466,7 @@ namespace Discord.Audio
|
|||||||
await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
|
await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
|
await Task.Delay(KeepAliveIntervalMs, cancelToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
|
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3200,6 +3200,8 @@ namespace Discord.WebSocket
|
|||||||
|
|
||||||
private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
|
private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
|
||||||
{
|
{
|
||||||
|
int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _gatewayLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
|
await _gatewayLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
|
||||||
@@ -3227,7 +3229,8 @@ namespace Discord.WebSocket
|
|||||||
await _gatewayLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false);
|
await _gatewayLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
|
int delay = Math.Max(0, delayInterval - Latency);
|
||||||
|
await Task.Delay(delay, cancelToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
await _gatewayLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
|
await _gatewayLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user