Fixed audio and a few ConnectionManager issues

This commit is contained in:
RogueException
2017-02-25 16:06:42 -04:00
parent 3190d7e26d
commit 06dcac6a9f
7 changed files with 43 additions and 23 deletions

View File

@@ -59,28 +59,28 @@ namespace Discord.Audio
internal AudioClient(SocketGuild guild, int id)
{
Guild = guild;
_audioLogger = Discord.LogManager.CreateLogger($"Audio #{id}");
ApiClient = new DiscordVoiceAPIClient(guild.Id, Discord.WebSocketProvider, Discord.UdpSocketProvider);
ApiClient.SentGatewayMessage += async opCode => await _audioLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false);
ApiClient.SentDiscovery += async () => await _audioLogger.DebugAsync($"Sent Discovery").ConfigureAwait(false);
//ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
ApiClient.ReceivedEvent += ProcessMessageAsync;
ApiClient.ReceivedPacket += ProcessPacketAsync;
_stateLock = new SemaphoreSlim(1, 1);
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
_connection.Connected += () => _connectedEvent.InvokeAsync();
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
_heartbeatTimes = new ConcurrentQueue<long>();
_audioLogger = Discord.LogManager.CreateLogger($"Audio #{id}");
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
_serializer.Error += (s, e) =>
{
_audioLogger.WarningAsync(e.ErrorContext.Error).GetAwaiter().GetResult();
e.ErrorContext.Handled = true;
};
ApiClient = new DiscordVoiceAPIClient(guild.Id, Discord.WebSocketProvider, Discord.UdpSocketProvider);
ApiClient.SentGatewayMessage += async opCode => await _audioLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false);
ApiClient.SentDiscovery += async () => await _audioLogger.DebugAsync($"Sent Discovery").ConfigureAwait(false);
//ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
ApiClient.ReceivedEvent += ProcessMessageAsync;
ApiClient.ReceivedPacket += ProcessPacketAsync;
};
LatencyUpdated += async (old, val) => await _audioLogger.VerboseAsync($"Latency = {val} ms").ConfigureAwait(false);
}
@@ -98,25 +98,32 @@ namespace Discord.Audio
private async Task OnConnectingAsync()
{
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
await ApiClient.ConnectAsync("wss://" + _url).ConfigureAwait(false);
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
//Wait for READY
await _connection.WaitAsync().ConfigureAwait(false);
}
private async Task OnDisconnectingAsync(Exception ex)
{
//Disconnect from server
await _audioLogger.DebugAsync("Disconnecting ApiClient").ConfigureAwait(false);
await ApiClient.DisconnectAsync().ConfigureAwait(false);
//Wait for tasks to complete
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
var heartbeatTask = _heartbeatTask;
if (heartbeatTask != null)
await heartbeatTask.ConfigureAwait(false);
_heartbeatTask = null;
await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
long time;
while (_heartbeatTimes.TryDequeue(out time)) { }
_lastMessageTime = 0;
await _audioLogger.DebugAsync("Sending Voice State").ConfigureAwait(false);
await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
}
public AudioOutStream CreateOpusStream(int samplesPerFrame, int bufferMillis)

View File

@@ -0,0 +1,206 @@
using Discord.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
using Discord.Net;
namespace Discord
{
internal class ConnectionManager
{
public event Func<Task> Connected { add { _connectedEvent.Add(value); } remove { _connectedEvent.Remove(value); } }
private readonly AsyncEvent<Func<Task>> _connectedEvent = new AsyncEvent<Func<Task>>();
public event Func<Exception, bool, Task> Disconnected { add { _disconnectedEvent.Add(value); } remove { _disconnectedEvent.Remove(value); } }
private readonly AsyncEvent<Func<Exception, bool, Task>> _disconnectedEvent = new AsyncEvent<Func<Exception, bool, Task>>();
private readonly SemaphoreSlim _stateLock;
private readonly Logger _logger;
private readonly int _connectionTimeout;
private readonly Func<Task> _onConnecting;
private readonly Func<Exception, Task> _onDisconnecting;
private TaskCompletionSource<bool> _connectionPromise, _readyPromise;
private CancellationTokenSource _combinedCancelToken, _reconnectCancelToken, _connectionCancelToken;
private Task _task;
public ConnectionState State { get; private set; }
public CancellationToken CancelToken { get; private set; }
public bool IsCompleted => _readyPromise.Task.IsCompleted;
internal ConnectionManager(SemaphoreSlim stateLock, Logger logger, int connectionTimeout,
Func<Task> onConnecting, Func<Exception, Task> onDisconnecting, Action<Func<Exception, Task>> clientDisconnectHandler)
{
_stateLock = stateLock;
_logger = logger;
_connectionTimeout = connectionTimeout;
_onConnecting = onConnecting;
_onDisconnecting = onDisconnecting;
clientDisconnectHandler(ex =>
{
if (ex != null)
{
var ex2 = ex as WebSocketClosedException;
if (ex2?.CloseCode == 4006)
CriticalError(new Exception("WebSocket session expired", ex));
else
Error(new Exception("WebSocket connection was closed", ex));
}
else
Error(new Exception("WebSocket connection was closed"));
return Task.Delay(0);
});
}
public virtual async Task StartAsync()
{
await AcquireConnectionLock().ConfigureAwait(false);
var reconnectCancelToken = new CancellationTokenSource();
_reconnectCancelToken = reconnectCancelToken;
_task = Task.Run(async () =>
{
try
{
Random jitter = new Random();
int nextReconnectDelay = 1000;
while (!reconnectCancelToken.IsCancellationRequested)
{
try
{
await ConnectAsync(reconnectCancelToken).ConfigureAwait(false);
nextReconnectDelay = 1000; //Reset delay
await _connectionPromise.Task.ConfigureAwait(false);
}
catch (OperationCanceledException ex)
{
Cancel(); //In case this exception didn't come from another Error call
await DisconnectAsync(ex, !reconnectCancelToken.IsCancellationRequested).ConfigureAwait(false);
}
catch (Exception ex)
{
Error(ex); //In case this exception didn't come from another Error call
if (!reconnectCancelToken.IsCancellationRequested)
{
await _logger.WarningAsync(ex).ConfigureAwait(false);
await DisconnectAsync(ex, true).ConfigureAwait(false);
}
else
{
await _logger.ErrorAsync(ex).ConfigureAwait(false);
await DisconnectAsync(ex, false).ConfigureAwait(false);
}
}
if (!reconnectCancelToken.IsCancellationRequested)
{
//Wait before reconnecting
await Task.Delay(nextReconnectDelay, reconnectCancelToken.Token).ConfigureAwait(false);
nextReconnectDelay = (nextReconnectDelay * 2) + jitter.Next(-250, 250);
if (nextReconnectDelay > 60000)
nextReconnectDelay = 60000;
}
}
}
finally { _stateLock.Release(); }
});
}
public virtual async Task StopAsync()
{
Cancel();
var task = _task;
if (task != null)
await task.ConfigureAwait(false);
}
private async Task ConnectAsync(CancellationTokenSource reconnectCancelToken)
{
_connectionCancelToken = new CancellationTokenSource();
_combinedCancelToken = CancellationTokenSource.CreateLinkedTokenSource(_connectionCancelToken.Token, reconnectCancelToken.Token);
CancelToken = _combinedCancelToken.Token;
_connectionPromise = new TaskCompletionSource<bool>();
State = ConnectionState.Connecting;
await _logger.InfoAsync("Connecting").ConfigureAwait(false);
try
{
var readyPromise = new TaskCompletionSource<bool>();
_readyPromise = readyPromise;
//Abort connection on timeout
var cancelToken = CancelToken;
var _ = Task.Run(async () =>
{
try
{
await Task.Delay(_connectionTimeout, cancelToken).ConfigureAwait(false);
readyPromise.TrySetException(new TimeoutException());
}
catch (OperationCanceledException) { }
});
await _onConnecting().ConfigureAwait(false);
await _logger.InfoAsync("Connected").ConfigureAwait(false);
State = ConnectionState.Connected;
await _logger.DebugAsync("Raising Event").ConfigureAwait(false);
await _connectedEvent.InvokeAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
Error(ex);
throw;
}
}
private async Task DisconnectAsync(Exception ex, bool isReconnecting)
{
if (State == ConnectionState.Disconnected) return;
State = ConnectionState.Disconnecting;
await _logger.InfoAsync("Disconnecting").ConfigureAwait(false);
await _onDisconnecting(ex).ConfigureAwait(false);
await _logger.InfoAsync("Disconnected").ConfigureAwait(false);
State = ConnectionState.Disconnected;
await _disconnectedEvent.InvokeAsync(ex, isReconnecting).ConfigureAwait(false);
}
public async Task CompleteAsync()
{
await _readyPromise.TrySetResultAsync(true).ConfigureAwait(false);
}
public async Task WaitAsync()
{
await _readyPromise.Task.ConfigureAwait(false);
}
public void Cancel()
{
_readyPromise?.TrySetCanceled();
_connectionPromise?.TrySetCanceled();
_reconnectCancelToken?.Cancel();
_connectionCancelToken?.Cancel();
}
public void Error(Exception ex)
{
_readyPromise.TrySetException(ex);
_connectionPromise.TrySetException(ex);
_connectionCancelToken?.Cancel();
}
public void CriticalError(Exception ex)
{
_reconnectCancelToken?.Cancel();
Error(ex);
}
private async Task AcquireConnectionLock()
{
while (true)
{
await StopAsync().ConfigureAwait(false);
if (await _stateLock.WaitAsync(0).ConfigureAwait(false))
break;
}
}
}
}

View File

@@ -95,6 +95,8 @@ namespace Discord.WebSocket
_gatewayLogger = LogManager.CreateLogger(ShardId == 0 && TotalShards == 1 ? "Gateway" : $"Shard #{ShardId}");
_connection = new ConnectionManager(_stateLock, _gatewayLogger, config.ConnectionTimeout,
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
_connection.Connected += () => _connectedEvent.InvokeAsync();
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
_nextAudioId = 1;
_connectionGroupLock = groupLock;
@@ -173,8 +175,6 @@ namespace Discord.WebSocket
{
await _gatewayLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
await ApiClient.ConnectAsync().ConfigureAwait(false);
await _gatewayLogger.DebugAsync("Raising Event").ConfigureAwait(false);
await _connectedEvent.InvokeAsync().ConfigureAwait(false);
if (_sessionId != null)
{
@@ -189,7 +189,7 @@ namespace Discord.WebSocket
//Wait for READY
await _connection.WaitAsync().ConfigureAwait(false);
await _gatewayLogger.DebugAsync("Sending Status").ConfigureAwait(false);
await SendStatusAsync().ConfigureAwait(false);

View File

@@ -506,15 +506,16 @@ namespace Discord.WebSocket
}
internal async Task FinishConnectAudio(int id, string url, string token)
{
//TODO: Mem Leak: Disconnected/Connected handlers arent cleaned up
var voiceState = GetVoiceState(Discord.CurrentUser.Id).Value;
await _audioLock.WaitAsync().ConfigureAwait(false);
try
{
var promise = _audioConnectPromise;
if (_audioClient == null)
{
var audioClient = new AudioClient(this, id);
var promise = _audioConnectPromise;
audioClient.Disconnected += async ex =>
{
if (!promise.Task.IsCompleted)
@@ -532,7 +533,7 @@ namespace Discord.WebSocket
}
_audioClient.Connected += () =>
{
var _ = _audioConnectPromise.TrySetResultAsync(_audioClient);
var _ = promise.TrySetResultAsync(_audioClient);
return Task.Delay(0);
};
await _audioClient.StartAsync(url, Discord.CurrentUser.Id, voiceState.VoiceSessionId, token).ConfigureAwait(false);