Removed old bucket system, cleaned up api calls. Fixed compile errors.

This commit is contained in:
RogueException
2016-09-29 05:10:40 -03:00
parent dd86f03306
commit e038475ab4
59 changed files with 464 additions and 570 deletions

View File

@@ -1,30 +0,0 @@
namespace Discord.Net.Queue
{
public sealed class Bucket
{
/// <summary> Gets the unique identifier for this bucket. </summary>
public string Id { get; }
/// <summary> Gets the name of this bucket. </summary>
public string Name { get; }
/// <summary> Gets the amount of requests that may be sent per window. </summary>
public int WindowCount { get; }
/// <summary> Gets the length of this bucket's window, in seconds. </summary>
public int WindowSeconds { get; }
/// <summary> Gets the type of account this bucket affects. </summary>
public BucketTarget Target { get; }
/// <summary> Gets this bucket's parent. </summary>
public GlobalBucket? Parent { get; }
internal Bucket(string id, int windowCount, int windowSeconds, BucketTarget target, GlobalBucket? parent = null)
: this(id, id, windowCount, windowSeconds, target, parent) { }
internal Bucket(string id, string name, int windowCount, int windowSeconds, BucketTarget target, GlobalBucket? parent = null)
{
Id = id;
Name = name;
WindowCount = windowCount;
WindowSeconds = windowSeconds;
Target = target;
Parent = parent;
}
}
}

View File

@@ -1,9 +0,0 @@
namespace Discord.Net.Queue
{
public enum BucketGroup
{
Global,
Guild,
Channel
}
}

View File

@@ -1,9 +0,0 @@
namespace Discord.Net.Queue
{
public enum BucketTarget
{
Client,
Bot,
Both
}
}

View File

@@ -1,7 +0,0 @@
namespace Discord.Net.Queue
{
public enum ChannelBucket
{
SendEditMessage,
}
}

View File

@@ -1,14 +0,0 @@
namespace Discord.Net.Queue
{
public enum GlobalBucket
{
GeneralRest,
DirectMessage,
SendEditMessage,
GeneralGateway,
UpdateStatus,
GeneralRpc
}
}

View File

@@ -1,11 +0,0 @@
namespace Discord.Net.Queue
{
public enum GuildBucket
{
SendEditMessage,
DeleteMessage,
DeleteMessages,
ModifyMember,
Nickname
}
}

View File

@@ -1,7 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -10,91 +8,23 @@ namespace Discord.Net.Queue
{
public class RequestQueue
{
public event Func<string, Bucket, int, Task> RateLimitTriggered;
private readonly static ImmutableDictionary<GlobalBucket, Bucket> _globalLimits;
private readonly static ImmutableDictionary<GuildBucket, Bucket> _guildLimits;
private readonly static ImmutableDictionary<ChannelBucket, Bucket> _channelLimits;
public event Func<string, RequestQueueBucket, int?, Task> RateLimitTriggered;
private readonly SemaphoreSlim _lock;
private readonly RequestQueueBucket[] _globalBuckets;
private readonly ConcurrentDictionary<ulong, RequestQueueBucket>[] _guildBuckets;
private readonly ConcurrentDictionary<ulong, RequestQueueBucket>[] _channelBuckets;
private readonly ConcurrentDictionary<string, RequestQueueBucket> _buckets;
private CancellationTokenSource _clearToken;
private CancellationToken _parentToken;
private CancellationToken _cancelToken;
static RequestQueue()
{
_globalLimits = new Dictionary<GlobalBucket, Bucket>
{
//REST
[GlobalBucket.GeneralRest] = new Bucket(null, "rest", 0, 0, BucketTarget.Both), //No Limit
//[GlobalBucket.Login] = new BucketDefinition(1, 1),
[GlobalBucket.DirectMessage] = new Bucket("bot:msg:dm", 5, 5, BucketTarget.Bot),
[GlobalBucket.SendEditMessage] = new Bucket("bot:msg:global", 50, 10, BucketTarget.Bot),
//[GlobalBucket.Username] = new Bucket("bot:msg:global", 2, 3600, BucketTarget.Both),
//Gateway
[GlobalBucket.GeneralGateway] = new Bucket(null, "gateway", 120, 60, BucketTarget.Both),
[GlobalBucket.UpdateStatus] = new Bucket(null, "status", 5, 1, BucketTarget.Both, GlobalBucket.GeneralGateway),
//Rpc
[GlobalBucket.GeneralRpc] = new Bucket(null, "rpc", 120, 60, BucketTarget.Both)
}.ToImmutableDictionary();
_guildLimits = new Dictionary<GuildBucket, Bucket>
{
//REST
[GuildBucket.SendEditMessage] = new Bucket("bot:msg:server", 5, 5, BucketTarget.Bot, GlobalBucket.SendEditMessage),
[GuildBucket.DeleteMessage] = new Bucket("dmsg", 5, 1, BucketTarget.Bot),
[GuildBucket.DeleteMessages] = new Bucket("bdmsg", 1, 1, BucketTarget.Bot),
[GuildBucket.ModifyMember] = new Bucket("guild_member", 10, 10, BucketTarget.Bot),
[GuildBucket.Nickname] = new Bucket("guild_member_nick", 1, 1, BucketTarget.Bot)
}.ToImmutableDictionary();
//Client-Only
_channelLimits = new Dictionary<ChannelBucket, Bucket>
{
//REST
[ChannelBucket.SendEditMessage] = new Bucket("msg", 10, 10, BucketTarget.Client, GlobalBucket.SendEditMessage),
}.ToImmutableDictionary();
}
public static Bucket GetBucketInfo(GlobalBucket bucket) => _globalLimits[bucket];
public static Bucket GetBucketInfo(GuildBucket bucket) => _guildLimits[bucket];
public static Bucket GetBucketInfo(ChannelBucket bucket) => _channelLimits[bucket];
public RequestQueue()
{
_lock = new SemaphoreSlim(1, 1);
_globalBuckets = new RequestQueueBucket[_globalLimits.Count];
foreach (var pair in _globalLimits)
{
//var target = _globalLimits[pair.Key].Target;
//if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot))
_globalBuckets[(int)pair.Key] = CreateBucket(pair.Value);
}
_guildBuckets = new ConcurrentDictionary<ulong, RequestQueueBucket>[_guildLimits.Count];
for (int i = 0; i < _guildLimits.Count; i++)
{
//var target = _guildLimits[(GuildBucket)i].Target;
//if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot))
_guildBuckets[i] = new ConcurrentDictionary<ulong, RequestQueueBucket>();
}
_channelBuckets = new ConcurrentDictionary<ulong, RequestQueueBucket>[_channelLimits.Count];
for (int i = 0; i < _channelLimits.Count; i++)
{
//var target = _channelLimits[(GuildBucket)i].Target;
//if (target == BucketTarget.Both || (target == BucketTarget.Bot && isBot) || (target == BucketTarget.Client && !isBot))
_channelBuckets[i] = new ConcurrentDictionary<ulong, RequestQueueBucket>();
}
_clearToken = new CancellationTokenSource();
_cancelToken = CancellationToken.None;
_parentToken = CancellationToken.None;
_buckets = new ConcurrentDictionary<string, RequestQueueBucket>();
}
public async Task SetCancelTokenAsync(CancellationToken cancelToken)
{
@@ -107,63 +37,29 @@ namespace Discord.Net.Queue
finally { _lock.Release(); }
}
public async Task<Stream> SendAsync(RestRequest request, BucketGroup group, int bucketId, ulong objId)
public async Task<Stream> SendAsync(RestRequest request)
{
request.CancelToken = _cancelToken;
var bucket = GetBucket(group, bucketId, objId);
var bucket = GetOrCreateBucket(request.Options.BucketId);
return await bucket.SendAsync(request).ConfigureAwait(false);
}
public async Task<Stream> SendAsync(WebSocketRequest request, BucketGroup group, int bucketId, ulong objId)
public async Task<Stream> SendAsync(WebSocketRequest request)
{
request.CancelToken = _cancelToken;
var bucket = GetBucket(group, bucketId, objId);
var bucket = GetOrCreateBucket(request.Options.BucketId);
return await bucket.SendAsync(request).ConfigureAwait(false);
}
private RequestQueueBucket CreateBucket(Bucket def)
private RequestQueueBucket GetOrCreateBucket(string id)
{
var parent = def.Parent != null ? GetGlobalBucket(def.Parent.Value) : null;
return new RequestQueueBucket(this, def, parent);
return new RequestQueueBucket(this, id, null);
}
public void DestroyGuildBucket(GuildBucket type, ulong guildId)
public void DestroyBucket(string id)
{
//Assume this object is locked
RequestQueueBucket bucket;
_guildBuckets[(int)type].TryRemove(guildId, out bucket);
}
public void DestroyChannelBucket(ChannelBucket type, ulong channelId)
{
//Assume this object is locked
RequestQueueBucket bucket;
_channelBuckets[(int)type].TryRemove(channelId, out bucket);
}
private RequestQueueBucket GetBucket(BucketGroup group, int bucketId, ulong objId)
{
switch (group)
{
case BucketGroup.Global:
return GetGlobalBucket((GlobalBucket)bucketId);
case BucketGroup.Guild:
return GetGuildBucket((GuildBucket)bucketId, objId);
case BucketGroup.Channel:
return GetChannelBucket((ChannelBucket)bucketId, objId);
default:
throw new ArgumentException($"Unknown bucket group: {group}", nameof(group));
}
}
private RequestQueueBucket GetGlobalBucket(GlobalBucket type)
{
return _globalBuckets[(int)type];
}
private RequestQueueBucket GetGuildBucket(GuildBucket type, ulong guildId)
{
return _guildBuckets[(int)type].GetOrAdd(guildId, _ => CreateBucket(_guildLimits[type]));
}
private RequestQueueBucket GetChannelBucket(ChannelBucket type, ulong channelId)
{
return _channelBuckets[(int)type].GetOrAdd(channelId, _ => CreateBucket(_channelLimits[type]));
_buckets.TryRemove(id, out bucket);
}
public async Task ClearAsync()
@@ -181,9 +77,9 @@ namespace Discord.Net.Queue
finally { _lock.Release(); }
}
internal async Task RaiseRateLimitTriggered(string id, Bucket bucket, int millis)
internal async Task RaiseRateLimitTriggered(string id, RequestQueueBucket bucket, int? millis)
{
await RateLimitTriggered.Invoke(id, bucket, millis).ConfigureAwait(false);
await RateLimitTriggered(id, bucket, millis).ConfigureAwait(false);
}
}
}

View File

@@ -7,7 +7,7 @@ using System.Threading.Tasks;
namespace Discord.Net.Queue
{
internal class RequestQueueBucket
public class RequestQueueBucket
{
private readonly RequestQueue _queue;
private readonly SemaphoreSlim _semaphore;
@@ -15,16 +15,15 @@ namespace Discord.Net.Queue
private int _pauseEndTick;
private TaskCompletionSource<byte> _resumeNotifier;
public Bucket Definition { get; }
public string Id { get; }
public RequestQueueBucket Parent { get; }
public Task _resetTask { get; }
public int WindowSeconds { get; }
public RequestQueueBucket(RequestQueue queue, Bucket definition, RequestQueueBucket parent = null)
public RequestQueueBucket(RequestQueue queue, string id, RequestQueueBucket parent = null)
{
_queue = queue;
Definition = definition;
if (definition.WindowCount != 0)
_semaphore = new SemaphoreSlim(definition.WindowCount, definition.WindowCount);
Id = id;
_semaphore = new SemaphoreSlim(5, 5);
Parent = parent;
_pauseLock = new object();
@@ -44,12 +43,8 @@ namespace Discord.Net.Queue
{
//When a 429 occurs, we drop all our locks.
//This is generally safe though since 429s actually occuring should be very rare.
RequestQueueBucket bucket;
bool success = FindBucket(ex.BucketId, out bucket);
await _queue.RaiseRateLimitTriggered(ex.BucketId, success ? bucket.Definition : null, ex.RetryAfterMilliseconds).ConfigureAwait(false);
bucket.Pause(ex.RetryAfterMilliseconds);
await _queue.RaiseRateLimitTriggered(Id, this, ex.RetryAfterMilliseconds).ConfigureAwait(false);
Pause(ex.RetryAfterMilliseconds);
}
}
}
@@ -107,24 +102,7 @@ namespace Discord.Net.Queue
QueueExitAsync();
}
}
private bool FindBucket(string id, out RequestQueueBucket bucket)
{
//Keep going up until we find a bucket with matching id or we're at the topmost bucket
if (Definition.Id == id)
{
bucket = this;
return true;
}
else if (Parent == null)
{
bucket = this;
return false;
}
else
return Parent.FindBucket(id, out bucket);
}
private void Pause(int milliseconds)
{
lock (_pauseLock)
@@ -151,13 +129,22 @@ namespace Discord.Net.Queue
int millis = unchecked(endTick.Value - Environment.TickCount);
if (millis <= 0 || !await _semaphore.WaitAsync(millis).ConfigureAwait(false))
throw new TimeoutException();
if (!await _semaphore.WaitAsync(0))
{
await _queue.RaiseRateLimitTriggered(Id, this, null).ConfigureAwait(false);
millis = unchecked(endTick.Value - Environment.TickCount);
if (millis <= 0 || !await _semaphore.WaitAsync(millis).ConfigureAwait(false))
throw new TimeoutException();
}
}
else
await _semaphore.WaitAsync().ConfigureAwait(false);
}
private async Task QueueExitAsync()
{
await Task.Delay(Definition.WindowSeconds * 1000).ConfigureAwait(false);
await Task.Delay(WindowSeconds * 1000).ConfigureAwait(false);
_semaphore.Release();
}
}

View File

@@ -0,0 +1,22 @@
using Discord.Net.Rest;
using System.IO;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class JsonRestRequest : RestRequest
{
public string Json { get; }
public JsonRestRequest(IRestClient client, string method, string endpoint, string json, RequestOptions options)
: base(client, method, endpoint, options)
{
Json = json;
}
public override async Task<Stream> SendAsync()
{
return await Client.SendAsync(Method, Endpoint, Json, Options).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,24 @@
using Discord.Net.Rest;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class MultipartRestRequest : RestRequest
{
public IReadOnlyDictionary<string, object> MultipartParams { get; }
public MultipartRestRequest(IRestClient client, string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, RequestOptions options)
: base(client, method, endpoint, options)
{
MultipartParams = multipartParams;
}
public override async Task<Stream> SendAsync()
{
return await Client.SendAsync(Method, Endpoint, MultipartParams, Options).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,36 @@
using Discord.Net.Rest;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class RestRequest : IQueuedRequest
{
public IRestClient Client { get; }
public string Method { get; }
public string Endpoint { get; }
public int? TimeoutTick { get; }
public TaskCompletionSource<Stream> Promise { get; }
public RequestOptions Options { get; }
public CancellationToken CancelToken { get; internal set; }
public RestRequest(IRestClient client, string method, string endpoint, RequestOptions options)
{
Preconditions.NotNull(options, nameof(options));
Client = client;
Method = method;
Endpoint = endpoint;
Options = options;
TimeoutTick = options.Timeout.HasValue ? (int?)unchecked(Environment.TickCount + options.Timeout.Value) : null;
Promise = new TaskCompletionSource<Stream>();
}
public virtual async Task<Stream> SendAsync()
{
return await Client.SendAsync(Method, Endpoint, Options).ConfigureAwait(false);
}
}
}

View File

@@ -13,16 +13,17 @@ namespace Discord.Net.Queue
public bool IsText { get; }
public int? TimeoutTick { get; }
public TaskCompletionSource<Stream> Promise { get; }
public CancellationToken CancelToken { get; set; }
public RequestOptions Options { get; }
public CancellationToken CancelToken { get; internal set; }
public WebSocketRequest(IWebSocketClient client, byte[] data, bool isText, RequestOptions options)
{
if (options == null)
options = RequestOptions.Default;
Preconditions.NotNull(options, nameof(options));
Client = client;
Data = data;
IsText = isText;
Options = options;
TimeoutTick = options.Timeout.HasValue ? (int?)unchecked(Environment.TickCount + options.Timeout.Value) : null;
Promise = new TaskCompletionSource<Stream>();
}

View File

@@ -1,61 +0,0 @@
using Discord.Net.Rest;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class RestRequest : IQueuedRequest
{
public IRestClient Client { get; }
public string Method { get; }
public string Endpoint { get; }
public string Json { get; }
public bool HeaderOnly { get; }
public int? TimeoutTick { get; }
public IReadOnlyDictionary<string, object> MultipartParams { get; }
public TaskCompletionSource<Stream> Promise { get; }
public CancellationToken CancelToken { get; set; }
public bool IsMultipart => MultipartParams != null;
public RestRequest(IRestClient client, string method, string endpoint, string json, bool headerOnly, RequestOptions options)
: this(client, method, endpoint, headerOnly, options)
{
Json = json;
}
public RestRequest(IRestClient client, string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, bool headerOnly, RequestOptions options)
: this(client, method, endpoint, headerOnly, options)
{
MultipartParams = multipartParams;
}
private RestRequest(IRestClient client, string method, string endpoint, bool headerOnly, RequestOptions options)
{
if (options == null)
options = RequestOptions.Default;
Client = client;
Method = method;
Endpoint = endpoint;
Json = null;
MultipartParams = null;
HeaderOnly = headerOnly;
TimeoutTick = options.Timeout.HasValue ? (int?)unchecked(Environment.TickCount + options.Timeout.Value) : null;
Promise = new TaskCompletionSource<Stream>();
}
public async Task<Stream> SendAsync()
{
if (IsMultipart)
return await Client.SendAsync(Method, Endpoint, MultipartParams, HeaderOnly).ConfigureAwait(false);
else if (Json != null)
return await Client.SendAsync(Method, Endpoint, Json, HeaderOnly).ConfigureAwait(false);
else
return await Client.SendAsync(Method, Endpoint, HeaderOnly).ConfigureAwait(false);
}
}
}

View File

@@ -4,13 +4,13 @@ namespace Discord.Net
{
public class HttpRateLimitException : HttpException
{
public string BucketId { get; }
public string Id { get; }
public int RetryAfterMilliseconds { get; }
public HttpRateLimitException(string bucketId, int retryAfterMilliseconds, string reason)
: base((HttpStatusCode)429, reason)
{
BucketId = bucketId;
Id = bucketId;
RetryAfterMilliseconds = retryAfterMilliseconds;
}
}

View File

@@ -67,22 +67,22 @@ namespace Discord.Net.Rest
_cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_parentToken, _cancelTokenSource.Token).Token;
}
public async Task<Stream> SendAsync(string method, string endpoint, bool headerOnly = false)
public async Task<Stream> SendAsync(string method, string endpoint, RequestOptions options)
{
string uri = Path.Combine(_baseUrl, endpoint);
using (var restRequest = new HttpRequestMessage(GetMethod(method), uri))
return await SendInternalAsync(restRequest, headerOnly).ConfigureAwait(false);
return await SendInternalAsync(restRequest, options).ConfigureAwait(false);
}
public async Task<Stream> SendAsync(string method, string endpoint, string json, bool headerOnly = false)
public async Task<Stream> SendAsync(string method, string endpoint, string json, RequestOptions options)
{
string uri = Path.Combine(_baseUrl, endpoint);
using (var restRequest = new HttpRequestMessage(GetMethod(method), uri))
{
restRequest.Content = new StringContent(json, Encoding.UTF8, "application/json");
return await SendInternalAsync(restRequest, headerOnly).ConfigureAwait(false);
return await SendInternalAsync(restRequest, options).ConfigureAwait(false);
}
}
public async Task<Stream> SendAsync(string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, bool headerOnly = false)
public async Task<Stream> SendAsync(string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, RequestOptions options)
{
string uri = Path.Combine(_baseUrl, endpoint);
using (var restRequest = new HttpRequestMessage(GetMethod(method), uri))
@@ -110,11 +110,11 @@ namespace Discord.Net.Rest
}
}
restRequest.Content = content;
return await SendInternalAsync(restRequest, headerOnly).ConfigureAwait(false);
return await SendInternalAsync(restRequest, options).ConfigureAwait(false);
}
}
private async Task<Stream> SendInternalAsync(HttpRequestMessage request, bool headerOnly)
private async Task<Stream> SendInternalAsync(HttpRequestMessage request, RequestOptions options)
{
while (true)
{
@@ -154,7 +154,7 @@ namespace Discord.Net.Rest
throw new HttpException(response.StatusCode, reason);
}
if (headerOnly)
if (options.HeaderOnly)
return null;
else
return await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

View File

@@ -11,8 +11,8 @@ namespace Discord.Net.Rest
void SetHeader(string key, string value);
void SetCancelToken(CancellationToken cancelToken);
Task<Stream> SendAsync(string method, string endpoint, bool headerOnly = false);
Task<Stream> SendAsync(string method, string endpoint, string json, bool headerOnly = false);
Task<Stream> SendAsync(string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, bool headerOnly = false);
Task<Stream> SendAsync(string method, string endpoint, RequestOptions options);
Task<Stream> SendAsync(string method, string endpoint, string json, RequestOptions options);
Task<Stream> SendAsync(string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, RequestOptions options);
}
}