Moved (re)connection handling to ConnectionManager

This commit is contained in:
RogueException
2017-02-23 16:38:57 -04:00
parent 8630185ac9
commit 3190d7e26d
15 changed files with 447 additions and 588 deletions

View File

@@ -5,6 +5,7 @@ using Discord.WebSocket;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading;
@@ -12,6 +13,7 @@ using System.Threading.Tasks;
namespace Discord.Audio
{
//TODO: Add audio reconnecting
internal class AudioClient : IAudioClient, IDisposable
{
public event Func<Task> Connected
@@ -34,34 +36,37 @@ namespace Discord.Audio
private readonly AsyncEvent<Func<int, int, Task>> _latencyUpdatedEvent = new AsyncEvent<Func<int, int, Task>>();
private readonly Logger _audioLogger;
internal readonly SemaphoreSlim _connectionLock;
private readonly JsonSerializer _serializer;
private readonly ConnectionManager _connection;
private readonly SemaphoreSlim _stateLock;
private readonly ConcurrentQueue<long> _heartbeatTimes;
private TaskCompletionSource<bool> _connectTask;
private CancellationTokenSource _cancelTokenSource;
private Task _heartbeatTask;
private long _heartbeatTime;
private string _url;
private long _lastMessageTime;
private string _url, _sessionId, _token;
private ulong _userId;
private uint _ssrc;
private byte[] _secretKey;
private bool _isDisposed;
public SocketGuild Guild { get; }
public DiscordVoiceAPIClient ApiClient { get; private set; }
public ConnectionState ConnectionState { get; private set; }
public int Latency { get; private set; }
private DiscordSocketClient Discord => Guild.Discord;
public ConnectionState ConnectionState => _connection.State;
/// <summary> Creates a new REST/WebSocket discord client. </summary>
internal AudioClient(SocketGuild guild, int id)
{
Guild = guild;
_stateLock = new SemaphoreSlim(1, 1);
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
_heartbeatTimes = new ConcurrentQueue<long>();
_audioLogger = Discord.LogManager.CreateLogger($"Audio #{id}");
_connectionLock = new SemaphoreSlim(1, 1);
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
_serializer.Error += (s, e) =>
{
@@ -76,83 +81,28 @@ namespace Discord.Audio
//ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
ApiClient.ReceivedEvent += ProcessMessageAsync;
ApiClient.ReceivedPacket += ProcessPacketAsync;
ApiClient.Disconnected += async ex =>
{
if (ex != null)
await _audioLogger.WarningAsync($"Connection Closed", ex).ConfigureAwait(false);
else
await _audioLogger.WarningAsync($"Connection Closed").ConfigureAwait(false);
};
LatencyUpdated += async (old, val) => await _audioLogger.VerboseAsync($"Latency = {val} ms").ConfigureAwait(false);
}
/// <inheritdoc />
internal async Task ConnectAsync(string url, ulong userId, string sessionId, string token)
internal async Task StartAsync(string url, ulong userId, string sessionId, string token)
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
await ConnectInternalAsync(url, userId, sessionId, token).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
_url = url;
_userId = userId;
_sessionId = sessionId;
_token = token;
await _connection.StartAsync().ConfigureAwait(false);
}
private async Task ConnectInternalAsync(string url, ulong userId, string sessionId, string token)
public async Task StopAsync()
=> await _connection.StopAsync().ConfigureAwait(false);
private async Task OnConnectingAsync()
{
var state = ConnectionState;
if (state == ConnectionState.Connecting || state == ConnectionState.Connected)
await DisconnectInternalAsync(null).ConfigureAwait(false);
ConnectionState = ConnectionState.Connecting;
await _audioLogger.InfoAsync("Connecting").ConfigureAwait(false);
try
{
_url = url;
_connectTask = new TaskCompletionSource<bool>();
_cancelTokenSource = new CancellationTokenSource();
await ApiClient.ConnectAsync("wss://" + url).ConfigureAwait(false);
await ApiClient.SendIdentityAsync(userId, sessionId, token).ConfigureAwait(false);
await _connectTask.Task.ConfigureAwait(false);
await _connectedEvent.InvokeAsync().ConfigureAwait(false);
ConnectionState = ConnectionState.Connected;
await _audioLogger.InfoAsync("Connected").ConfigureAwait(false);
}
catch (Exception)
{
await DisconnectInternalAsync(null).ConfigureAwait(false);
throw;
}
await ApiClient.ConnectAsync("wss://" + _url).ConfigureAwait(false);
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);
}
/// <inheritdoc />
public async Task DisconnectAsync()
private async Task OnDisconnectingAsync(Exception ex)
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
await DisconnectInternalAsync(null).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
}
private async Task DisconnectAsync(Exception ex)
{
await _connectionLock.WaitAsync().ConfigureAwait(false);
try
{
await DisconnectInternalAsync(ex).ConfigureAwait(false);
}
finally { _connectionLock.Release(); }
}
private async Task DisconnectInternalAsync(Exception ex)
{
if (ConnectionState == ConnectionState.Disconnected) return;
ConnectionState = ConnectionState.Disconnecting;
await _audioLogger.InfoAsync("Disconnecting").ConfigureAwait(false);
//Signal tasks to complete
try { _cancelTokenSource.Cancel(); } catch { }
//Disconnect from server
await ApiClient.DisconnectAsync().ConfigureAwait(false);
@@ -162,17 +112,17 @@ namespace Discord.Audio
await heartbeatTask.ConfigureAwait(false);
_heartbeatTask = null;
ConnectionState = ConnectionState.Disconnected;
await _audioLogger.InfoAsync("Disconnected").ConfigureAwait(false);
await _disconnectedEvent.InvokeAsync(ex).ConfigureAwait(false);
await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
long time;
while (_heartbeatTimes.TryDequeue(out time)) { }
_lastMessageTime = 0;
}
public AudioOutStream CreateOpusStream(int samplesPerFrame, int bufferMillis)
{
CheckSamplesPerFrame(samplesPerFrame);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _cancelTokenSource.Token);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _connection.CancelToken);
return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc);
}
public AudioOutStream CreateDirectOpusStream(int samplesPerFrame)
@@ -184,7 +134,7 @@ namespace Discord.Audio
public AudioOutStream CreatePCMStream(int samplesPerFrame, int channels, int? bitrate, int bufferMillis)
{
CheckSamplesPerFrame(samplesPerFrame);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _cancelTokenSource.Token);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _connection.CancelToken);
return new OpusEncodeStream(target, _secretKey, channels, samplesPerFrame, _ssrc, bitrate);
}
public AudioOutStream CreateDirectPCMStream(int samplesPerFrame, int channels, int? bitrate)
@@ -202,6 +152,8 @@ namespace Discord.Audio
private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
{
_lastMessageTime = Environment.TickCount;
try
{
switch (opCode)
@@ -216,8 +168,7 @@ namespace Discord.Audio
if (!data.Modes.Contains(DiscordVoiceAPIClient.Mode))
throw new InvalidOperationException($"Discord does not support {DiscordVoiceAPIClient.Mode}");
_heartbeatTime = 0;
_heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _cancelTokenSource.Token);
_heartbeatTask = RunHeartbeatAsync(data.HeartbeatInterval, _connection.CancelToken);
ApiClient.SetUdpEndpoint(_url, data.Port);
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);
@@ -234,19 +185,17 @@ namespace Discord.Audio
_secretKey = data.SecretKey;
await ApiClient.SendSetSpeaking(true).ConfigureAwait(false);
var _ = _connectTask.TrySetResultAsync(true);
var _ = _connection.CompleteAsync();
}
break;
case VoiceOpCode.HeartbeatAck:
{
await _audioLogger.DebugAsync("Received HeartbeatAck").ConfigureAwait(false);
var heartbeatTime = _heartbeatTime;
if (heartbeatTime != 0)
long time;
if (_heartbeatTimes.TryDequeue(out time))
{
int latency = (int)(Environment.TickCount - _heartbeatTime);
_heartbeatTime = 0;
int latency = (int)(Environment.TickCount - time);
int before = Latency;
Latency = latency;
@@ -267,7 +216,7 @@ namespace Discord.Audio
}
private async Task ProcessPacketAsync(byte[] packet)
{
if (!_connectTask.Task.IsCompleted)
if (!_connection.IsCompleted)
{
if (packet.Length == 70)
{
@@ -291,33 +240,50 @@ namespace Discord.Audio
//Clean this up when Discord's session patch is live
try
{
await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
var now = Environment.TickCount;
//Did server respond to our last heartbeat, or are we still receiving messages (long load?)
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
ConnectionState == ConnectionState.Connected)
{
_connection.Error(new Exception("Server missed last heartbeat"));
return;
}
_heartbeatTimes.Enqueue(now);
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
if (_heartbeatTime != 0) //Server never responded to our last heartbeat
try
{
if (ConnectionState == ConnectionState.Connected)
{
await _audioLogger.WarningAsync("Server missed last heartbeat").ConfigureAwait(false);
await DisconnectInternalAsync(new Exception("Server missed last heartbeat")).ConfigureAwait(false);
return;
}
await ApiClient.SendHeartbeatAsync().ConfigureAwait(false);
}
else
_heartbeatTime = Environment.TickCount;
await ApiClient.SendHeartbeatAsync().ConfigureAwait(false);
catch (Exception ex)
{
await _audioLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
}
await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
}
catch (OperationCanceledException)
{
await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
}
internal void Dispose(bool disposing)
{
if (disposing && !_isDisposed)
if (disposing)
{
_isDisposed = true;
DisconnectInternalAsync(null).GetAwaiter().GetResult();
StopAsync().GetAwaiter().GetResult();
ApiClient.Dispose();
}
}