Fix OperationCancelledException and add IAsyncEnumerable to wait without thread blocking (#1562)

* Fixed Task Scheduler operation cancelled error caused by Orphaned RunCleanup task on RequestQueue not being awaited on dispose

* Added async disposable interface to various components

* Added incorrect early marking of disposed for DiscordSocketApi client

* Added general task canceled exception catch to cleanup task

* Fix merge errors

Co-authored-by: Quin Lynch <49576606+quinchs@users.noreply.github.com>
Co-authored-by: quin lynch <lynchquin@gmail.com>
This commit is contained in:
James Grant
2022-01-11 17:24:30 +10:00
committed by GitHub
parent 4ed4718e18
commit cd36bb802b
8 changed files with 136 additions and 6 deletions

View File

@@ -8,7 +8,7 @@ namespace Discord
/// <summary> /// <summary>
/// Represents a generic Discord client. /// Represents a generic Discord client.
/// </summary> /// </summary>
public interface IDiscordClient : IDisposable public interface IDiscordClient : IDisposable, IAsyncDisposable
{ {
/// <summary> /// <summary>
/// Gets the current state of connection. /// Gets the current state of connection.

View File

@@ -149,9 +149,24 @@ namespace Discord.Rest
_isDisposed = true; _isDisposed = true;
} }
} }
internal virtual async ValueTask DisposeAsync(bool disposing)
{
if (!_isDisposed)
{
#pragma warning disable IDISP007
await ApiClient.DisposeAsync().ConfigureAwait(false);
#pragma warning restore IDISP007
_stateLock?.Dispose();
_isDisposed = true;
}
}
/// <inheritdoc /> /// <inheritdoc />
public void Dispose() => Dispose(true); public void Dispose() => Dispose(true);
public ValueTask DisposeAsync() => DisposeAsync(true);
/// <inheritdoc /> /// <inheritdoc />
public Task<int> GetRecommendedShardCountAsync(RequestOptions options = null) public Task<int> GetRecommendedShardCountAsync(RequestOptions options = null)
=> ClientHelper.GetRecommendShardCountAsync(this, options); => ClientHelper.GetRecommendShardCountAsync(this, options);

View File

@@ -21,7 +21,7 @@ using System.Threading.Tasks;
namespace Discord.API namespace Discord.API
{ {
internal class DiscordRestApiClient : IDisposable internal class DiscordRestApiClient : IDisposable, IAsyncDisposable
{ {
#region DiscordRestApiClient #region DiscordRestApiClient
private static readonly ConcurrentDictionary<string, Func<BucketIds, BucketId>> _bucketIdGenerators = new ConcurrentDictionary<string, Func<BucketIds, BucketId>>(); private static readonly ConcurrentDictionary<string, Func<BucketIds, BucketId>> _bucketIdGenerators = new ConcurrentDictionary<string, Func<BucketIds, BucketId>>();
@@ -97,8 +97,29 @@ namespace Discord.API
_isDisposed = true; _isDisposed = true;
} }
} }
internal virtual async ValueTask DisposeAsync(bool disposing)
{
if (!_isDisposed)
{
if (disposing)
{
_loginCancelToken?.Dispose();
RestClient?.Dispose();
if (!(RequestQueue is null))
await RequestQueue.DisposeAsync().ConfigureAwait(false);
_stateLock?.Dispose();
}
_isDisposed = true;
}
}
public void Dispose() => Dispose(true); public void Dispose() => Dispose(true);
public ValueTask DisposeAsync() => DisposeAsync(true);
public async Task LoginAsync(TokenType tokenType, string token, RequestOptions options = null) public async Task LoginAsync(TokenType tokenType, string token, RequestOptions options = null)
{ {
await _stateLock.WaitAsync().ConfigureAwait(false); await _stateLock.WaitAsync().ConfigureAwait(false);

View File

@@ -47,6 +47,14 @@ namespace Discord.Rest
base.Dispose(disposing); base.Dispose(disposing);
} }
internal override async ValueTask DisposeAsync(bool disposing)
{
if (disposing)
await ApiClient.DisposeAsync().ConfigureAwait(false);
base.Dispose(disposing);
}
/// <inheritdoc /> /// <inheritdoc />
internal override async Task OnLoginAsync(TokenType tokenType, string token) internal override async Task OnLoginAsync(TokenType tokenType, string token)
{ {

View File

@@ -1,3 +1,4 @@
using Newtonsoft.Json.Bson;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
#if DEBUG_LIMITS #if DEBUG_LIMITS
@@ -10,7 +11,7 @@ using System.Threading.Tasks;
namespace Discord.Net.Queue namespace Discord.Net.Queue
{ {
internal class RequestQueue : IDisposable internal class RequestQueue : IDisposable, IAsyncDisposable
{ {
public event Func<BucketId, RateLimitInfo?, string, Task> RateLimitTriggered; public event Func<BucketId, RateLimitInfo?, string, Task> RateLimitTriggered;
@@ -187,13 +188,31 @@ namespace Discord.Net.Queue
await Task.Delay(60000, _cancelTokenSource.Token).ConfigureAwait(false); //Runs each minute await Task.Delay(60000, _cancelTokenSource.Token).ConfigureAwait(false); //Runs each minute
} }
} }
catch (OperationCanceledException) { } catch (TaskCanceledException) { }
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
} }
public void Dispose() public void Dispose()
{ {
_cancelTokenSource?.Dispose(); if (!(_cancelTokenSource is null))
{
_cancelTokenSource.Cancel();
_cancelTokenSource.Dispose();
_cleanupTask.GetAwaiter().GetResult();
}
_tokenLock?.Dispose();
_clearToken?.Dispose();
_requestCancelTokenSource?.Dispose();
}
public async ValueTask DisposeAsync()
{
if (!(_cancelTokenSource is null))
{
_cancelTokenSource.Cancel();
_cancelTokenSource.Dispose();
await _cleanupTask.ConfigureAwait(false);
}
_tokenLock?.Dispose(); _tokenLock?.Dispose();
_clearToken?.Dispose(); _clearToken?.Dispose();
_requestCancelTokenSource?.Dispose(); _requestCancelTokenSource?.Dispose();

View File

@@ -568,6 +568,25 @@ namespace Discord.WebSocket
base.Dispose(disposing); base.Dispose(disposing);
} }
internal override ValueTask DisposeAsync(bool disposing)
{
if (!_isDisposed)
{
if (disposing)
{
if (_shards != null)
{
foreach (var client in _shards)
client?.Dispose();
}
}
_isDisposed = true;
}
return base.DisposeAsync(disposing);
}
#endregion #endregion
} }
} }

View File

@@ -124,12 +124,39 @@ namespace Discord.API
_decompressor?.Dispose(); _decompressor?.Dispose();
_compressed?.Dispose(); _compressed?.Dispose();
} }
_isDisposed = true;
} }
base.Dispose(disposing); base.Dispose(disposing);
} }
#if NETSTANDARD2_1
internal override async ValueTask DisposeAsync(bool disposing)
#else
internal override ValueTask DisposeAsync(bool disposing)
#endif
{
if (!_isDisposed)
{
if (disposing)
{
_connectCancelToken?.Dispose();
(WebSocketClient as IDisposable)?.Dispose();
#if NETSTANDARD2_1
if (!(_decompressor is null))
await _decompressor.DisposeAsync().ConfigureAwait(false);
#else
_decompressor?.Dispose();
#endif
}
}
#if NETSTANDARD2_1
await base.DisposeAsync(disposing).ConfigureAwait(false);
#else
return base.DisposeAsync(disposing);
#endif
}
public async Task ConnectAsync() public async Task ConnectAsync()
{ {
await _stateLock.WaitAsync().ConfigureAwait(false); await _stateLock.WaitAsync().ConfigureAwait(false);

View File

@@ -213,6 +213,27 @@ namespace Discord.WebSocket
base.Dispose(disposing); base.Dispose(disposing);
} }
internal override async ValueTask DisposeAsync(bool disposing)
{
if (!_isDisposed)
{
if (disposing)
{
await StopAsync().ConfigureAwait(false);
if (!(ApiClient is null))
await ApiClient.DisposeAsync().ConfigureAwait(false);
_stateLock?.Dispose();
}
_isDisposed = true;
}
await base.DisposeAsync(disposing).ConfigureAwait(false);
}
/// <inheritdoc />
internal override async Task OnLoginAsync(TokenType tokenType, string token) internal override async Task OnLoginAsync(TokenType tokenType, string token)
{ {
if (_shardedClient == null && _defaultStickers.Length == 0 && AlwaysDownloadDefaultStickers) if (_shardedClient == null && _defaultStickers.Length == 0 && AlwaysDownloadDefaultStickers)