feature: Implement Dispose for types which have disposable data (#1171)
* Initial set of dispose implementations Not handled yet: - Discord.Net.Websocket/Entities/SocketGuild - Discord.Net.Tests * Refactor DiscordSocketClient init into ctor This way we remove an IDisposableAnalyzer warning for not disposing the client when we set the client variable. * Dispose of clients when disposing sharded client * Finish implementing IDisposable where appropriate I opted to use NoWarn in the Tests project as it wasn't really necessary considering that our tests only run once * Tweak samples after feedback
This commit is contained in:
@@ -71,7 +71,7 @@ namespace Discord.Audio
|
||||
ApiClient.ReceivedPacket += ProcessPacketAsync;
|
||||
|
||||
_stateLock = new SemaphoreSlim(1, 1);
|
||||
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
|
||||
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
|
||||
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
|
||||
_connection.Connected += () => _connectedEvent.InvokeAsync();
|
||||
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
|
||||
@@ -79,7 +79,7 @@ namespace Discord.Audio
|
||||
_keepaliveTimes = new ConcurrentQueue<KeyValuePair<ulong, int>>();
|
||||
_ssrcMap = new ConcurrentDictionary<uint, ulong>();
|
||||
_streams = new ConcurrentDictionary<ulong, StreamPair>();
|
||||
|
||||
|
||||
_serializer = new JsonSerializer { ContractResolver = new DiscordContractResolver() };
|
||||
_serializer.Error += (s, e) =>
|
||||
{
|
||||
@@ -91,7 +91,7 @@ namespace Discord.Audio
|
||||
UdpLatencyUpdated += async (old, val) => await _audioLogger.DebugAsync($"UDP Latency = {val} ms").ConfigureAwait(false);
|
||||
}
|
||||
|
||||
internal async Task StartAsync(string url, ulong userId, string sessionId, string token)
|
||||
internal async Task StartAsync(string url, ulong userId, string sessionId, string token)
|
||||
{
|
||||
_url = url;
|
||||
_userId = userId;
|
||||
@@ -100,7 +100,7 @@ namespace Discord.Audio
|
||||
await _connection.StartAsync().ConfigureAwait(false);
|
||||
}
|
||||
public async Task StopAsync()
|
||||
{
|
||||
{
|
||||
await _connection.StopAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -225,11 +225,11 @@ namespace Discord.Audio
|
||||
|
||||
if (!data.Modes.Contains(DiscordVoiceAPIClient.Mode))
|
||||
throw new InvalidOperationException($"Discord does not support {DiscordVoiceAPIClient.Mode}");
|
||||
|
||||
|
||||
ApiClient.SetUdpEndpoint(data.Ip, data.Port);
|
||||
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);
|
||||
|
||||
|
||||
|
||||
_heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken);
|
||||
}
|
||||
break;
|
||||
@@ -305,9 +305,9 @@ namespace Discord.Audio
|
||||
catch (Exception ex)
|
||||
{
|
||||
await _audioLogger.DebugAsync("Malformed Packet", ex).ConfigureAwait(false);
|
||||
return;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false);
|
||||
await ApiClient.SendSelectProtocol(ip, port).ConfigureAwait(false);
|
||||
}
|
||||
@@ -317,7 +317,7 @@ namespace Discord.Audio
|
||||
{
|
||||
await _audioLogger.DebugAsync("Received Keepalive").ConfigureAwait(false);
|
||||
|
||||
ulong value =
|
||||
ulong value =
|
||||
((ulong)packet[0] >> 0) |
|
||||
((ulong)packet[1] >> 8) |
|
||||
((ulong)packet[2] >> 16) |
|
||||
@@ -341,7 +341,7 @@ namespace Discord.Audio
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
{
|
||||
if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
|
||||
{
|
||||
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
|
||||
@@ -388,7 +388,7 @@ namespace Discord.Audio
|
||||
var now = Environment.TickCount;
|
||||
|
||||
//Did server respond to our last heartbeat?
|
||||
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
|
||||
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
|
||||
ConnectionState == ConnectionState.Connected)
|
||||
{
|
||||
_connection.Error(new Exception("Server missed last heartbeat"));
|
||||
@@ -437,7 +437,7 @@ namespace Discord.Audio
|
||||
{
|
||||
await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
|
||||
await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
|
||||
}
|
||||
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
|
||||
@@ -467,6 +467,7 @@ namespace Discord.Audio
|
||||
{
|
||||
StopAsync().GetAwaiter().GetResult();
|
||||
ApiClient.Dispose();
|
||||
_stateLock?.Dispose();
|
||||
}
|
||||
}
|
||||
/// <inheritdoc />
|
||||
|
||||
@@ -27,7 +27,7 @@ namespace Discord.Audio.Streams
|
||||
|
||||
private readonly AudioClient _client;
|
||||
private readonly AudioStream _next;
|
||||
private readonly CancellationTokenSource _cancelTokenSource;
|
||||
private readonly CancellationTokenSource _disposeTokenSource, _cancelTokenSource;
|
||||
private readonly CancellationToken _cancelToken;
|
||||
private readonly Task _task;
|
||||
private readonly ConcurrentQueue<Frame> _queuedFrames;
|
||||
@@ -49,12 +49,13 @@ namespace Discord.Audio.Streams
|
||||
_logger = logger;
|
||||
_queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up
|
||||
|
||||
_cancelTokenSource = new CancellationTokenSource();
|
||||
_cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token;
|
||||
_disposeTokenSource = new CancellationTokenSource();
|
||||
_cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_disposeTokenSource.Token, cancelToken);
|
||||
_cancelToken = _cancelTokenSource.Token;
|
||||
_queuedFrames = new ConcurrentQueue<Frame>();
|
||||
_bufferPool = new ConcurrentQueue<byte[]>();
|
||||
for (int i = 0; i < _queueLength; i++)
|
||||
_bufferPool.Enqueue(new byte[maxFrameSize]);
|
||||
_bufferPool.Enqueue(new byte[maxFrameSize]);
|
||||
_queueLock = new SemaphoreSlim(_queueLength, _queueLength);
|
||||
_silenceFrames = MaxSilenceFrames;
|
||||
|
||||
@@ -63,7 +64,12 @@ namespace Discord.Audio.Streams
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
_cancelTokenSource.Cancel();
|
||||
{
|
||||
_disposeTokenSource?.Cancel();
|
||||
_disposeTokenSource?.Dispose();
|
||||
_cancelTokenSource?.Dispose();
|
||||
_queueLock?.Dispose();
|
||||
}
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
@@ -131,8 +137,12 @@ namespace Discord.Audio.Streams
|
||||
public override void WriteHeader(ushort seq, uint timestamp, bool missed) { } //Ignore, we use our own timing
|
||||
public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken)
|
||||
{
|
||||
CancellationTokenSource writeCancelToken = null;
|
||||
if (cancelToken.CanBeCanceled)
|
||||
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, _cancelToken).Token;
|
||||
{
|
||||
writeCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, _cancelToken);
|
||||
cancelToken = writeCancelToken.Token;
|
||||
}
|
||||
else
|
||||
cancelToken = _cancelToken;
|
||||
|
||||
@@ -142,6 +152,9 @@ namespace Discord.Audio.Streams
|
||||
#if DEBUG
|
||||
var _ = _logger?.DebugAsync("Buffer overflow"); //Should never happen because of the queueLock
|
||||
#endif
|
||||
#pragma warning disable IDISP016
|
||||
writeCancelToken?.Dispose();
|
||||
#pragma warning restore IDISP016
|
||||
return;
|
||||
}
|
||||
Buffer.BlockCopy(data, offset, buffer, 0, count);
|
||||
@@ -153,6 +166,7 @@ namespace Discord.Audio.Streams
|
||||
#endif
|
||||
_isPreloaded = true;
|
||||
}
|
||||
writeCancelToken?.Dispose();
|
||||
}
|
||||
|
||||
public override async Task FlushAsync(CancellationToken cancelToken)
|
||||
|
||||
@@ -96,7 +96,17 @@ namespace Discord.Audio.Streams
|
||||
|
||||
protected override void Dispose(bool isDisposing)
|
||||
{
|
||||
_isDisposed = true;
|
||||
if (!_isDisposed)
|
||||
{
|
||||
if (isDisposing)
|
||||
{
|
||||
_signal?.Dispose();
|
||||
}
|
||||
|
||||
_isDisposed = true;
|
||||
}
|
||||
|
||||
base.Dispose(isDisposing);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user