Reworked internal task engine for DiscordClient and WebSocket. Several other minor async fixes.

This commit is contained in:
RogueException
2015-12-11 19:07:55 -04:00
parent 2b8184f1ae
commit 5760e94d81
23 changed files with 523 additions and 489 deletions

View File

@@ -5,7 +5,7 @@
using Discord.API.Converters;
using Newtonsoft.Json;
namespace Discord.Audio.API
namespace Discord.API
{
internal sealed class VoiceServerUpdateEvent
{

View File

@@ -95,8 +95,7 @@ namespace Discord.Audio
else
{
var logger = Client.Log().CreateLogger("Voice");
var voiceSocket = new VoiceWebSocket(Client.Config, _config, logger);
_defaultClient = new DiscordAudioClient(this, 0, logger, _client.WebSocket, voiceSocket);
_defaultClient = new DiscordAudioClient(this, 0, logger, _client.WebSocket);
}
_talkingUsers = new ConcurrentDictionary<User, bool>();
@@ -145,27 +144,26 @@ namespace Discord.Audio
return Task.FromResult(_defaultClient);
}
var client = _voiceClients.GetOrAdd(server.Id, (Func<long, DiscordAudioClient>)(_ =>
var client = _voiceClients.GetOrAdd(server.Id, _ =>
{
int id = unchecked(++_nextClientId);
var logger = Client.Log().CreateLogger($"Voice #{id}");
Net.WebSockets.GatewaySocket gatewaySocket = null;
var voiceSocket = new VoiceWebSocket(Client.Config, _config, logger);
var voiceClient = new DiscordAudioClient((AudioService)(this), (int)id, (Logger)logger, (Net.WebSockets.GatewaySocket)gatewaySocket, (VoiceWebSocket)voiceSocket);
GatewaySocket gatewaySocket = null;
var voiceClient = new DiscordAudioClient(this, id, logger, gatewaySocket);
voiceClient.SetServerId(server.Id);
voiceSocket.OnPacket += (s, e) =>
voiceClient.VoiceSocket.OnPacket += (s, e) =>
{
RaiseOnPacket(e);
};
voiceSocket.IsSpeaking += (s, e) =>
voiceClient.VoiceSocket.IsSpeaking += (s, e) =>
{
var user = Client.GetUser(server, e.UserId);
RaiseUserIsSpeakingUpdated(user, e.IsSpeaking);
};
return voiceClient;
}));
});
//await client.Connect(gatewaySocket.Host, _client.Token).ConfigureAwait(false);
return Task.FromResult(client);
}

View File

@@ -1,4 +1,5 @@
using Discord.Net.WebSockets;
using Discord.API;
using Discord.Net.WebSockets;
using System;
using System.Threading.Tasks;
@@ -10,22 +11,29 @@ namespace Discord.Audio
public int Id => _id;
private readonly AudioService _service;
private readonly GatewaySocket _gatewaySocket;
private readonly VoiceWebSocket _voiceSocket;
private readonly Logger _logger;
public long? ServerId => _voiceSocket.ServerId;
public long? ChannelId => _voiceSocket.ChannelId;
public GatewaySocket GatewaySocket => _gatewaySocket;
private readonly GatewaySocket _gatewaySocket;
public DiscordAudioClient(AudioService service, int id, Logger logger, GatewaySocket gatewaySocket, VoiceWebSocket voiceSocket)
public VoiceWebSocket VoiceSocket => _voiceSocket;
private readonly VoiceWebSocket _voiceSocket;
public string Token => _token;
private string _token;
public long? ServerId => _voiceSocket.ServerId;
public long? ChannelId => _voiceSocket.ChannelId;
public DiscordAudioClient(AudioService service, int id, Logger logger, GatewaySocket gatewaySocket)
{
_service = service;
_id = id;
_logger = logger;
_gatewaySocket = gatewaySocket;
_voiceSocket = voiceSocket;
_voiceSocket = new VoiceWebSocket(service.Client, this, logger);
/*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
/*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
_voiceSocket.Disconnected += async (s, e) =>
{
_voiceSocket.CurrentServerId;
@@ -37,7 +45,7 @@ namespace Discord.Audio
await socket.Reconnect().ConfigureAwait(false);
};*/
/*_voiceSocket.IsSpeaking += (s, e) =>
/*_voiceSocket.IsSpeaking += (s, e) =>
{
if (_voiceSocket.State == WebSocketState.Connected)
{
@@ -54,27 +62,28 @@ namespace Discord.Audio
}
};*/
/*this.Connected += (s, e) =>
/*this.Connected += (s, e) =>
{
_voiceSocket.ParentCancelToken = _cancelToken;
};*/
_gatewaySocket.ReceivedDispatch += async (s, e) =>
_gatewaySocket.ReceivedDispatch += async (s, e) =>
{
try
{
switch (e.Type)
{
case "VOICE_SERVER_UPDATE":
{
long serverId = IdConvert.ToLong(e.Payload.Value<string>("guild_id"));
{
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_gatewaySocket.Serializer);
long serverId = data.ServerId;
if (serverId == ServerId)
{
var client = _service.Client;
string token = e.Payload.Value<string>("token");
_token = data.Token;
_voiceSocket.Host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0];
await _voiceSocket.Connect(client.CurrentUser.Id, _gatewaySocket.SessionId, token/*, client.CancelToken*/).ConfigureAwait(false);
await _voiceSocket.Connect().ConfigureAwait(false);
}
}
break;

View File

@@ -26,7 +26,8 @@ namespace Discord.Net.WebSockets
//private readonly Random _rand;
private readonly int _targetAudioBufferLength;
private readonly ConcurrentDictionary<uint, OpusDecoder> _decoders;
private readonly AudioServiceConfig _audioConfig;
private readonly DiscordAudioClient _audioClient;
private readonly AudioServiceConfig _config;
private OpusEncoder _encoder;
private uint _ssrc;
private ConcurrentDictionary<uint, long> _ssrcMapping;
@@ -37,8 +38,8 @@ namespace Discord.Net.WebSockets
private bool _isEncrypted;
private byte[] _secretKey, _encodingBuffer;
private ushort _sequence;
private long? _serverId, _channelId, _userId;
private string _sessionId, _token, _encryptionMode;
private long? _serverId, _channelId;
private string _encryptionMode;
private int _ping;
private Thread _sendThread, _receiveThread;
@@ -48,24 +49,21 @@ namespace Discord.Net.WebSockets
public int Ping => _ping;
internal VoiceBuffer OutputBuffer => _sendBuffer;
public VoiceWebSocket(DiscordConfig config, AudioServiceConfig audioConfig, Logger logger)
: base(config, logger)
public VoiceWebSocket(DiscordClient client, DiscordAudioClient audioClient, Logger logger)
: base(client, logger)
{
_audioConfig = audioConfig;
_decoders = new ConcurrentDictionary<uint, OpusDecoder>();
_targetAudioBufferLength = _audioConfig.BufferLength / 20; //20 ms frames
_audioClient = audioClient;
_config = client.Audio().Config;
_decoders = new ConcurrentDictionary<uint, OpusDecoder>();
_targetAudioBufferLength = _config.BufferLength / 20; //20 ms frames
_encodingBuffer = new byte[MaxOpusSize];
_ssrcMapping = new ConcurrentDictionary<uint, long>();
_encoder = new OpusEncoder(48000, _audioConfig.Channels, 20, _audioConfig.Bitrate, OpusApplication.Audio);
_sendBuffer = new VoiceBuffer((int)Math.Ceiling(_audioConfig.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize);
_encoder = new OpusEncoder(48000, _config.Channels, 20, _config.Bitrate, OpusApplication.Audio);
_sendBuffer = new VoiceBuffer((int)Math.Ceiling(_config.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize);
}
public async Task Connect(long userId, string sessionId, string token)
{
_userId = userId;
_sessionId = sessionId;
_token = token;
public async Task Connect()
{
await BeginConnect().ConfigureAwait(false);
}
public async Task Reconnect()
@@ -73,12 +71,12 @@ namespace Discord.Net.WebSockets
try
{
var cancelToken = ParentCancelToken.Value;
await Task.Delay(_config.ReconnectDelay, cancelToken).ConfigureAwait(false);
await Task.Delay(_client.Config.ReconnectDelay, cancelToken).ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
try
{
await Connect(_userId.Value, _sessionId, _token).ConfigureAwait(false);
await Connect().ConfigureAwait(false);
break;
}
catch (OperationCanceledException) { throw; }
@@ -86,29 +84,26 @@ namespace Discord.Net.WebSockets
{
_logger.Error("Reconnect failed", ex);
//Net is down? We can keep trying to reconnect until the user runs Disconnect()
await Task.Delay(_config.FailedReconnectDelay, cancelToken).ConfigureAwait(false);
await Task.Delay(_client.Config.FailedReconnectDelay, cancelToken).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException) { }
}
public Task Disconnect()
{
return SignalDisconnect(wait: true);
}
public Task Disconnect() => _taskManager.Stop();
protected override async Task Run()
{
_udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
List<Task> tasks = new List<Task>();
if ((_audioConfig.Mode & AudioMode.Outgoing) != 0)
if ((_config.Mode & AudioMode.Outgoing) != 0)
{
_sendThread = new Thread(new ThreadStart(() => SendVoiceAsync(_cancelToken)));
_sendThread.IsBackground = true;
_sendThread.Start();
}
if ((_audioConfig.Mode & AudioMode.Incoming) != 0)
if ((_config.Mode & AudioMode.Incoming) != 0)
{
_receiveThread = new Thread(new ThreadStart(() => ReceiveVoiceAsync(_cancelToken)));
_receiveThread.IsBackground = true;
@@ -120,9 +115,9 @@ namespace Discord.Net.WebSockets
#if !DOTNET5_4
tasks.Add(WatcherAsync());
#endif
await RunTasks(tasks.ToArray());
await Cleanup();
tasks.AddRange(_engine.GetTasks(_cancelToken));
tasks.Add(HeartbeatAsync(_cancelToken));
await _taskManager.Start(tasks, _cancelTokenSource).ConfigureAwait(false);
}
protected override Task Cleanup()
{
@@ -141,12 +136,6 @@ namespace Discord.Net.WebSockets
}
ClearPCMFrames();
if (!_wasDisconnectUnexpected)
{
_userId = null;
_sessionId = null;
_token = null;
}
_udp = null;
return base.Cleanup();
@@ -161,7 +150,7 @@ namespace Discord.Net.WebSockets
int packetLength, resultOffset, resultLength;
IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
if ((_audioConfig.Mode & AudioMode.Incoming) != 0)
if ((_config.Mode & AudioMode.Incoming) != 0)
{
decodingBuffer = new byte[MaxOpusSize];
nonce = new byte[24];
@@ -188,7 +177,7 @@ namespace Discord.Net.WebSockets
if (packetLength > 0 && endpoint.Equals(_endpoint))
{
if (_state != (int)WebSocketState.Connected)
if (_state != (int)ConnectionState.Connected)
{
if (packetLength != 70)
return;
@@ -197,8 +186,8 @@ namespace Discord.Net.WebSockets
int port = packet[68] | packet[69] << 8;
SendSelectProtocol(ip, port);
if ((_audioConfig.Mode & AudioMode.Incoming) == 0)
return;
if ((_config.Mode & AudioMode.Incoming) == 0)
return; //We dont need this thread anymore
}
else
{
@@ -258,7 +247,7 @@ namespace Discord.Net.WebSockets
{
try
{
while (!cancelToken.IsCancellationRequested && _state != (int)WebSocketState.Connected)
while (!cancelToken.IsCancellationRequested && _state != (int)ConnectionState.Connected)
Thread.Sleep(1);
if (cancelToken.IsCancellationRequested)
@@ -410,14 +399,15 @@ namespace Discord.Net.WebSockets
{
case VoiceOpCodes.Ready:
{
if (_state != (int)WebSocketState.Connected)
if (_state != (int)ConnectionState.Connected)
{
var payload = (msg.Payload as JToken).ToObject<VoiceReadyEvent>(_serializer);
_heartbeatInterval = payload.HeartbeatInterval;
_ssrc = payload.SSRC;
_endpoint = new IPEndPoint((await Dns.GetHostAddressesAsync(Host.Replace("wss://", "")).ConfigureAwait(false)).FirstOrDefault(), payload.Port);
var address = (await Dns.GetHostAddressesAsync(Host.Replace("wss://", "")).ConfigureAwait(false)).FirstOrDefault();
_endpoint = new IPEndPoint(address, payload.Port);
if (_audioConfig.EnableEncryption)
if (_config.EnableEncryption)
{
if (payload.Modes.Contains(EncryptedMode))
{
@@ -458,7 +448,7 @@ namespace Discord.Net.WebSockets
var payload = (msg.Payload as JToken).ToObject<JoinServerEvent>(_serializer);
_secretKey = payload.SecretKey;
SendIsTalking(true);
await EndConnect();
EndConnect();
}
break;
case VoiceOpCodes.Speaking:
@@ -507,9 +497,9 @@ namespace Discord.Net.WebSockets
{
var msg = new IdentifyCommand();
msg.Payload.ServerId = _serverId.Value;
msg.Payload.SessionId = _sessionId;
msg.Payload.Token = _token;
msg.Payload.UserId = _userId.Value;
msg.Payload.SessionId = _client.SessionId;
msg.Payload.Token = _audioClient.Token;
msg.Payload.UserId = _client.UserId.Value;
QueueMessage(msg);
}