diff --git a/src/Discord.Net.WebSocket/API/Voice/SpeakingParams.cs b/src/Discord.Net.WebSocket/API/Voice/SpeakingParams.cs index e03bfc75..5d4c782d 100644 --- a/src/Discord.Net.WebSocket/API/Voice/SpeakingParams.cs +++ b/src/Discord.Net.WebSocket/API/Voice/SpeakingParams.cs @@ -5,8 +5,10 @@ namespace Discord.API.Voice internal class SpeakingParams { [JsonProperty("speaking")] - public bool IsSpeaking { get; set; } + public int Speaking { get; set; } [JsonProperty("delay")] public int Delay { get; set; } + [JsonProperty("ssrc")] + public uint Ssrc { get; set; } } } diff --git a/src/Discord.Net.WebSocket/Audio/AudioClient.cs b/src/Discord.Net.WebSocket/Audio/AudioClient.cs index 0dc20115..1682d735 100644 --- a/src/Discord.Net.WebSocket/Audio/AudioClient.cs +++ b/src/Discord.Net.WebSocket/Audio/AudioClient.cs @@ -1,7 +1,6 @@ using Discord.API.Voice; using Discord.Audio.Streams; using Discord.Logging; -using Discord.Net; using Discord.Net.Converters; using Discord.WebSocket; using Newtonsoft.Json; @@ -45,7 +44,7 @@ namespace Discord.Audio private readonly SemaphoreSlim _stateLock; private readonly ConcurrentQueue _heartbeatTimes; private readonly ConcurrentQueue> _keepaliveTimes; - private readonly ConcurrentDictionary _ssrcMap; + private readonly SsrcMap _ssrcMap; private readonly ConcurrentDictionary _streams; private Task _heartbeatTask, _keepaliveTask; @@ -54,7 +53,7 @@ namespace Discord.Audio private string _url, _sessionId, _token; private ulong _userId; private uint _ssrc; - private bool _isSpeaking; + private bool? _isSpeaking; private StopReason _stopReason; private bool _resuming; @@ -90,7 +89,7 @@ namespace Discord.Audio _connection.Disconnected += (exception, _) => _disconnectedEvent.InvokeAsync(exception); _heartbeatTimes = new ConcurrentQueue(); _keepaliveTimes = new ConcurrentQueue>(); - _ssrcMap = new ConcurrentDictionary(); + _ssrcMap = new SsrcMap(); _streams = new ConcurrentDictionary(); _serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() }; @@ -146,12 +145,15 @@ namespace Discord.Audio //Wait for READY await _connection.WaitAsync().ConfigureAwait(false); + _ssrcMap.UserSpeakingChanged += OnUserSpeakingChanged; } private async Task OnDisconnectingAsync(Exception ex) { await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false); await ApiClient.DisconnectAsync().ConfigureAwait(false); + _ssrcMap.UserSpeakingChanged -= OnUserSpeakingChanged; + if (_stopReason == StopReason.Unknown && ex.InnerException is WebSocketException exception) { await _audioLogger.WarningAsync( @@ -207,6 +209,11 @@ namespace Discord.Audio } } + private async void OnUserSpeakingChanged(ulong userId, bool isSpeaking) + { + await _speakingUpdatedEvent.InvokeAsync(userId, isSpeaking); + } + private async Task ClearHeartBeaters() { //Wait for tasks to complete @@ -332,8 +339,7 @@ namespace Discord.Audio throw new InvalidOperationException($"Discord selected an unexpected mode: {data.Mode}"); SecretKey = data.SecretKey; - _isSpeaking = false; - await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false); + await SetSpeakingAsync(false); _keepaliveTask = RunKeepaliveAsync(_connection.CancelToken); _ = _connection.CompleteAsync(); @@ -366,7 +372,7 @@ namespace Discord.Audio await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false); var data = (payload as JToken).ToObject(_serializer); - _ssrcMap[data.Ssrc] = data.UserId; + _ssrcMap.AddClient(data.Ssrc, data.UserId, data.Speaking); await _speakingUpdatedEvent.InvokeAsync(data.UserId, data.Speaking); } @@ -468,7 +474,7 @@ namespace Discord.Audio { await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false); } - else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId)) + else if (!_ssrcMap.TryUpdateUser(ssrc, out ulong userId)) { await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false); } @@ -582,7 +588,7 @@ namespace Discord.Audio if (_isSpeaking != value) { _isSpeaking = value; - await ApiClient.SendSetSpeaking(value).ConfigureAwait(false); + await ApiClient.SendSetSpeaking(value, _ssrc).ConfigureAwait(false); } } diff --git a/src/Discord.Net.WebSocket/Audio/SsrcMap.cs b/src/Discord.Net.WebSocket/Audio/SsrcMap.cs new file mode 100644 index 00000000..e6718976 --- /dev/null +++ b/src/Discord.Net.WebSocket/Audio/SsrcMap.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Concurrent; +using System.Timers; + +namespace Discord.Audio +{ + internal class SsrcMap + { + // The delay after a packet is received from a user until he is marked as not speaking anymore. + public static readonly TimeSpan Delay = TimeSpan.FromMilliseconds(100); + + private readonly ConcurrentDictionary _clients; + + public event Action UserSpeakingChanged; + + public SsrcMap() + { + _clients = new ConcurrentDictionary(); + } + + public void AddClient(uint ssrc, ulong userId, bool isSpeaking) + { + if (_clients.TryGetValue(ssrc, out ClientData client)) + { + client.SpeakingChanged -= OnUserSpeakingChanged; + } + + client = new ClientData(userId, isSpeaking); + client.SpeakingChanged += OnUserSpeakingChanged; + _clients[ssrc] = client; + } + + public bool TryUpdateUser(uint ssrc, out ulong userId) + { + bool exists = false; + userId = 0; + + if (_clients.TryGetValue(ssrc, out ClientData client)) + { + exists = true; + userId = client.UserId; + client.ActivateSpeaking(); + } + + return exists; + } + + private void OnUserSpeakingChanged(ClientData client) + { + UserSpeakingChanged?.Invoke(client.UserId, client.IsSpeaking); + } + + public void Clear() + { + _clients.Clear(); + } + + private class ClientData + { + public ulong UserId { get; } + public Timer Timer { get; } + public bool IsSpeaking { get; private set; } + + public event Action SpeakingChanged; + + public ClientData(ulong userId, bool isSpeaking) + { + UserId = userId; + Timer = new Timer(Delay); + Timer.AutoReset = false; + Timer.Elapsed += OnTimerElapsed; + IsSpeaking = isSpeaking; + + if (IsSpeaking) Timer.Start(); + } + + private void OnTimerElapsed(object sender, ElapsedEventArgs e) + { + if (IsSpeaking) + { + IsSpeaking = false; + SpeakingChanged?.Invoke(this); + } + } + + public void ActivateSpeaking() + { + if (!IsSpeaking) + { + IsSpeaking = true; + SpeakingChanged?.Invoke(this); + } + + // Restart timer + Timer.Stop(); + Timer.Start(); + } + } + } +} diff --git a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs index 23317467..7a4c3e0c 100644 --- a/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs +++ b/src/Discord.Net.WebSocket/Audio/Streams/RTPReadStream.cs @@ -8,7 +8,6 @@ namespace Discord.Audio.Streams public class RTPReadStream : AudioOutStream { private readonly AudioStream _next; - private readonly byte[] _buffer, _nonce; public override bool CanRead => true; public override bool CanSeek => false; @@ -17,8 +16,6 @@ namespace Discord.Audio.Streams public RTPReadStream(AudioStream next, int bufferSize = 4000) { _next = next; - _buffer = new byte[bufferSize]; - _nonce = new byte[24]; } /// The token has had cancellation requested. diff --git a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs index e0529134..2383c2c7 100644 --- a/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs +++ b/src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs @@ -164,12 +164,13 @@ namespace Discord.Audio }); } - public Task SendSetSpeaking(bool value) + public Task SendSetSpeaking(bool value, uint ssrc) { return SendAsync(VoiceOpCode.Speaking, new SpeakingParams { - IsSpeaking = value, - Delay = 0 + Speaking = value ? 1 : 0, + Delay = 0, + Ssrc = ssrc }); }