Fix user status update when speaking (#3204)

This commit is contained in:
José Santos Garrido
2025-11-28 20:26:27 +01:00
committed by GitHub
parent e8c5436c40
commit 11a56bc96d
5 changed files with 122 additions and 16 deletions

View File

@@ -5,8 +5,10 @@ namespace Discord.API.Voice
internal class SpeakingParams internal class SpeakingParams
{ {
[JsonProperty("speaking")] [JsonProperty("speaking")]
public bool IsSpeaking { get; set; } public int Speaking { get; set; }
[JsonProperty("delay")] [JsonProperty("delay")]
public int Delay { get; set; } public int Delay { get; set; }
[JsonProperty("ssrc")]
public uint Ssrc { get; set; }
} }
} }

View File

@@ -1,7 +1,6 @@
using Discord.API.Voice; using Discord.API.Voice;
using Discord.Audio.Streams; using Discord.Audio.Streams;
using Discord.Logging; using Discord.Logging;
using Discord.Net;
using Discord.Net.Converters; using Discord.Net.Converters;
using Discord.WebSocket; using Discord.WebSocket;
using Newtonsoft.Json; using Newtonsoft.Json;
@@ -45,7 +44,7 @@ namespace Discord.Audio
private readonly SemaphoreSlim _stateLock; private readonly SemaphoreSlim _stateLock;
private readonly ConcurrentQueue<long> _heartbeatTimes; private readonly ConcurrentQueue<long> _heartbeatTimes;
private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes; private readonly ConcurrentQueue<KeyValuePair<ulong, int>> _keepaliveTimes;
private readonly ConcurrentDictionary<uint, ulong> _ssrcMap; private readonly SsrcMap _ssrcMap;
private readonly ConcurrentDictionary<ulong, StreamPair> _streams; private readonly ConcurrentDictionary<ulong, StreamPair> _streams;
private Task _heartbeatTask, _keepaliveTask; private Task _heartbeatTask, _keepaliveTask;
@@ -54,7 +53,7 @@ namespace Discord.Audio
private string _url, _sessionId, _token; private string _url, _sessionId, _token;
private ulong _userId; private ulong _userId;
private uint _ssrc; private uint _ssrc;
private bool _isSpeaking; private bool? _isSpeaking;
private StopReason _stopReason; private StopReason _stopReason;
private bool _resuming; private bool _resuming;
@@ -90,7 +89,7 @@ namespace Discord.Audio
_connection.Disconnected += (exception, _) => _disconnectedEvent.InvokeAsync(exception); _connection.Disconnected += (exception, _) => _disconnectedEvent.InvokeAsync(exception);
_heartbeatTimes = new ConcurrentQueue<long>(); _heartbeatTimes = new ConcurrentQueue<long>();
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>(); _keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
_ssrcMap = new ConcurrentDictionary<uint, ulong>(); _ssrcMap = new SsrcMap();
_streams = new ConcurrentDictionary<ulong, StreamPair>(); _streams = new ConcurrentDictionary<ulong, StreamPair>();
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
@@ -146,12 +145,15 @@ namespace Discord.Audio
//Wait for READY //Wait for READY
await _connection.WaitAsync().ConfigureAwait(false); await _connection.WaitAsync().ConfigureAwait(false);
_ssrcMap.UserSpeakingChanged += OnUserSpeakingChanged;
} }
private async Task OnDisconnectingAsync(Exception ex) private async Task OnDisconnectingAsync(Exception ex)
{ {
await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false); await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false);
await ApiClient.DisconnectAsync().ConfigureAwait(false); await ApiClient.DisconnectAsync().ConfigureAwait(false);
_ssrcMap.UserSpeakingChanged -= OnUserSpeakingChanged;
if (_stopReason == StopReason.Unknown && ex.InnerException is WebSocketException exception) if (_stopReason == StopReason.Unknown && ex.InnerException is WebSocketException exception)
{ {
await _audioLogger.WarningAsync( await _audioLogger.WarningAsync(
@@ -207,6 +209,11 @@ namespace Discord.Audio
} }
} }
private async void OnUserSpeakingChanged(ulong userId, bool isSpeaking)
{
await _speakingUpdatedEvent.InvokeAsync(userId, isSpeaking);
}
private async Task ClearHeartBeaters() private async Task ClearHeartBeaters()
{ {
//Wait for tasks to complete //Wait for tasks to complete
@@ -332,8 +339,7 @@ namespace Discord.Audio
throw new InvalidOperationException($"Discord selected an unexpected mode: {data.Mode}"); throw new InvalidOperationException($"Discord selected an unexpected mode: {data.Mode}");
SecretKey = data.SecretKey; SecretKey = data.SecretKey;
_isSpeaking = false; await SetSpeakingAsync(false);
await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false);
_keepaliveTask = RunKeepaliveAsync(_connection.CancelToken); _keepaliveTask = RunKeepaliveAsync(_connection.CancelToken);
_ = _connection.CompleteAsync(); _ = _connection.CompleteAsync();
@@ -366,7 +372,7 @@ namespace Discord.Audio
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false); await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer); var data = (payload as JToken).ToObject<SpeakingEvent>(_serializer);
_ssrcMap[data.Ssrc] = data.UserId; _ssrcMap.AddClient(data.Ssrc, data.UserId, data.Speaking);
await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking); await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking);
} }
@@ -468,7 +474,7 @@ namespace Discord.Audio
{ {
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false); await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
} }
else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId)) else if (!_ssrcMap.TryUpdateUser(ssrc, out ulong userId))
{ {
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false); await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
} }
@@ -582,7 +588,7 @@ namespace Discord.Audio
if (_isSpeaking != value) if (_isSpeaking != value)
{ {
_isSpeaking = value; _isSpeaking = value;
await ApiClient.SendSetSpeaking(value).ConfigureAwait(false); await ApiClient.SendSetSpeaking(value, _ssrc).ConfigureAwait(false);
} }
} }

View File

@@ -0,0 +1,100 @@
using System;
using System.Collections.Concurrent;
using System.Timers;
namespace Discord.Audio
{
internal class SsrcMap
{
// The delay after a packet is received from a user until he is marked as not speaking anymore.
public static readonly TimeSpan Delay = TimeSpan.FromMilliseconds(100);
private readonly ConcurrentDictionary<uint, ClientData> _clients;
public event Action<ulong, bool> UserSpeakingChanged;
public SsrcMap()
{
_clients = new ConcurrentDictionary<uint, ClientData>();
}
public void AddClient(uint ssrc, ulong userId, bool isSpeaking)
{
if (_clients.TryGetValue(ssrc, out ClientData client))
{
client.SpeakingChanged -= OnUserSpeakingChanged;
}
client = new ClientData(userId, isSpeaking);
client.SpeakingChanged += OnUserSpeakingChanged;
_clients[ssrc] = client;
}
public bool TryUpdateUser(uint ssrc, out ulong userId)
{
bool exists = false;
userId = 0;
if (_clients.TryGetValue(ssrc, out ClientData client))
{
exists = true;
userId = client.UserId;
client.ActivateSpeaking();
}
return exists;
}
private void OnUserSpeakingChanged(ClientData client)
{
UserSpeakingChanged?.Invoke(client.UserId, client.IsSpeaking);
}
public void Clear()
{
_clients.Clear();
}
private class ClientData
{
public ulong UserId { get; }
public Timer Timer { get; }
public bool IsSpeaking { get; private set; }
public event Action<ClientData> SpeakingChanged;
public ClientData(ulong userId, bool isSpeaking)
{
UserId = userId;
Timer = new Timer(Delay);
Timer.AutoReset = false;
Timer.Elapsed += OnTimerElapsed;
IsSpeaking = isSpeaking;
if (IsSpeaking) Timer.Start();
}
private void OnTimerElapsed(object sender, ElapsedEventArgs e)
{
if (IsSpeaking)
{
IsSpeaking = false;
SpeakingChanged?.Invoke(this);
}
}
public void ActivateSpeaking()
{
if (!IsSpeaking)
{
IsSpeaking = true;
SpeakingChanged?.Invoke(this);
}
// Restart timer
Timer.Stop();
Timer.Start();
}
}
}
}

View File

@@ -8,7 +8,6 @@ namespace Discord.Audio.Streams
public class RTPReadStream : AudioOutStream public class RTPReadStream : AudioOutStream
{ {
private readonly AudioStream _next; private readonly AudioStream _next;
private readonly byte[] _buffer, _nonce;
public override bool CanRead => true; public override bool CanRead => true;
public override bool CanSeek => false; public override bool CanSeek => false;
@@ -17,8 +16,6 @@ namespace Discord.Audio.Streams
public RTPReadStream(AudioStream next, int bufferSize = 4000) public RTPReadStream(AudioStream next, int bufferSize = 4000)
{ {
_next = next; _next = next;
_buffer = new byte[bufferSize];
_nonce = new byte[24];
} }
/// <exception cref="OperationCanceledException">The token has had cancellation requested.</exception> /// <exception cref="OperationCanceledException">The token has had cancellation requested.</exception>

View File

@@ -164,12 +164,13 @@ namespace Discord.Audio
}); });
} }
public Task SendSetSpeaking(bool value) public Task SendSetSpeaking(bool value, uint ssrc)
{ {
return SendAsync(VoiceOpCode.Speaking, new SpeakingParams return SendAsync(VoiceOpCode.Speaking, new SpeakingParams
{ {
IsSpeaking = value, Speaking = value ? 1 : 0,
Delay = 0 Delay = 0,
Ssrc = ssrc
}); });
} }