Isolated API definitions to their own library

This commit is contained in:
RogueException
2016-12-23 15:10:45 -04:00
parent ca6eb6aff4
commit 8326d01f62
200 changed files with 183 additions and 73 deletions

View File

@@ -0,0 +1,50 @@
using System.Collections.Immutable;
namespace Discord.Net.Queue
{
public enum ClientBucketType
{
Unbucketed = 0,
SendEdit = 1
}
internal struct ClientBucket
{
private static readonly ImmutableDictionary<ClientBucketType, ClientBucket> _defsByType;
private static readonly ImmutableDictionary<string, ClientBucket> _defsById;
static ClientBucket()
{
var buckets = new[]
{
new ClientBucket(ClientBucketType.Unbucketed, "<unbucketed>", 10, 10),
new ClientBucket(ClientBucketType.SendEdit, "<send_edit>", 10, 10)
};
var builder = ImmutableDictionary.CreateBuilder<ClientBucketType, ClientBucket>();
foreach (var bucket in buckets)
builder.Add(bucket.Type, bucket);
_defsByType = builder.ToImmutable();
var builder2 = ImmutableDictionary.CreateBuilder<string, ClientBucket>();
foreach (var bucket in buckets)
builder2.Add(bucket.Id, bucket);
_defsById = builder2.ToImmutable();
}
public static ClientBucket Get(ClientBucketType type) => _defsByType[type];
public static ClientBucket Get(string id) => _defsById[id];
public ClientBucketType Type { get; }
public string Id { get; }
public int WindowCount { get; }
public int WindowSeconds { get; }
public ClientBucket(ClientBucketType type, string id, int count, int seconds)
{
Type = type;
Id = id;
WindowCount = count;
WindowSeconds = seconds;
}
}
}

View File

@@ -0,0 +1,133 @@
using System;
using System.Collections.Concurrent;
#if DEBUG_LIMITS
using System.Diagnostics;
#endif
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
internal class RequestQueue : IDisposable
{
public event Func<string, RateLimitInfo?, Task> RateLimitTriggered;
private readonly ConcurrentDictionary<string, RequestBucket> _buckets;
private readonly SemaphoreSlim _tokenLock;
private CancellationTokenSource _clearToken;
private CancellationToken _parentToken;
private CancellationToken _requestCancelToken; //Parent token + Clear token
private CancellationTokenSource _cancelToken; //Dispose token
private DateTimeOffset _waitUntil;
private Task _cleanupTask;
public RequestQueue()
{
_tokenLock = new SemaphoreSlim(1, 1);
_clearToken = new CancellationTokenSource();
_cancelToken = new CancellationTokenSource();
_requestCancelToken = CancellationToken.None;
_parentToken = CancellationToken.None;
_buckets = new ConcurrentDictionary<string, RequestBucket>();
_cleanupTask = RunCleanup();
}
public async Task SetCancelTokenAsync(CancellationToken cancelToken)
{
await _tokenLock.WaitAsync().ConfigureAwait(false);
try
{
_parentToken = cancelToken;
_requestCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, _clearToken.Token).Token;
}
finally { _tokenLock.Release(); }
}
public async Task ClearAsync()
{
await _tokenLock.WaitAsync().ConfigureAwait(false);
try
{
_clearToken?.Cancel();
_clearToken = new CancellationTokenSource();
if (_parentToken != null)
_requestCancelToken = CancellationTokenSource.CreateLinkedTokenSource(_clearToken.Token, _parentToken).Token;
else
_requestCancelToken = _clearToken.Token;
}
finally { _tokenLock.Release(); }
}
public async Task<Stream> SendAsync(RestRequest request)
{
if (request.Options.CancelToken.CanBeCanceled)
request.Options.CancelToken = CancellationTokenSource.CreateLinkedTokenSource(_requestCancelToken, request.Options.CancelToken).Token;
else
request.Options.CancelToken = _requestCancelToken;
var bucket = GetOrCreateBucket(request.Options.BucketId, request);
return await bucket.SendAsync(request).ConfigureAwait(false);
}
public async Task SendAsync(WebSocketRequest request)
{
//TODO: Re-impl websocket buckets
request.CancelToken = _requestCancelToken;
await request.SendAsync().ConfigureAwait(false);
}
internal async Task EnterGlobalAsync(int id, RestRequest request)
{
int millis = (int)Math.Ceiling((_waitUntil - DateTimeOffset.UtcNow).TotalMilliseconds);
if (millis > 0)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Sleeping {millis} ms (Pre-emptive) [Global]");
#endif
await Task.Delay(millis).ConfigureAwait(false);
}
}
internal void PauseGlobal(RateLimitInfo info, TimeSpan lag)
{
_waitUntil = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value + lag.TotalMilliseconds);
}
private RequestBucket GetOrCreateBucket(string id, RestRequest request)
{
return _buckets.GetOrAdd(id, x => new RequestBucket(this, request, x));
}
internal async Task RaiseRateLimitTriggered(string bucketId, RateLimitInfo? info)
{
await RateLimitTriggered(bucketId, info).ConfigureAwait(false);
}
private async Task RunCleanup()
{
try
{
while (!_cancelToken.IsCancellationRequested)
{
var now = DateTimeOffset.UtcNow;
foreach (var bucket in _buckets.Select(x => x.Value))
{
RequestBucket ignored;
if ((now - bucket.LastAttemptAt).TotalMinutes > 1.0)
_buckets.TryRemove(bucket.Id, out ignored);
}
await Task.Delay(60000, _cancelToken.Token); //Runs each minute
}
}
catch (OperationCanceledException) { }
catch (ObjectDisposedException) { }
}
public void Dispose()
{
_cancelToken.Dispose();
}
}
}

View File

@@ -0,0 +1,312 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
#if DEBUG_LIMITS
using System.Diagnostics;
#endif
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
internal class RequestBucket
{
private readonly object _lock;
private readonly RequestQueue _queue;
private int _semaphore;
private DateTimeOffset? _resetTick;
public string Id { get; private set; }
public int WindowCount { get; private set; }
public DateTimeOffset LastAttemptAt { get; private set; }
public RequestBucket(RequestQueue queue, RestRequest request, string id)
{
_queue = queue;
Id = id;
_lock = new object();
if (request.Options.IsClientBucket)
WindowCount = ClientBucket.Get(request.Options.BucketId).WindowCount;
else
WindowCount = 1; //Only allow one request until we get a header back
_semaphore = WindowCount;
_resetTick = null;
LastAttemptAt = DateTimeOffset.UtcNow;
}
static int nextId = 0;
public async Task<Stream> SendAsync(RestRequest request)
{
int id = Interlocked.Increment(ref nextId);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Start");
#endif
LastAttemptAt = DateTimeOffset.UtcNow;
while (true)
{
await _queue.EnterGlobalAsync(id, request).ConfigureAwait(false);
await EnterAsync(id, request).ConfigureAwait(false);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Sending...");
#endif
TimeSpan lag = default(TimeSpan);
RateLimitInfo info = default(RateLimitInfo);
try
{
var response = await request.SendAsync().ConfigureAwait(false);
lag = DateTimeOffset.UtcNow - DateTimeOffset.Parse(response.Headers["Date"]);
info = new RateLimitInfo(response.Headers);
if (response.StatusCode < (HttpStatusCode)200 || response.StatusCode >= (HttpStatusCode)300)
{
switch (response.StatusCode)
{
case (HttpStatusCode)429:
if (info.IsGlobal)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] (!) 429 [Global]");
#endif
_queue.PauseGlobal(info, lag);
}
else
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] (!) 429");
#endif
UpdateRateLimit(id, request, info, lag, true);
}
await _queue.RaiseRateLimitTriggered(Id, info).ConfigureAwait(false);
continue; //Retry
case HttpStatusCode.BadGateway: //502
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] (!) 502");
#endif
if ((request.Options.RetryMode & RetryMode.Retry502) == 0)
throw new HttpException(HttpStatusCode.BadGateway, null);
continue; //Retry
default:
string reason = null;
if (response.Stream != null)
{
try
{
using (var reader = new StreamReader(response.Stream))
using (var jsonReader = new JsonTextReader(reader))
{
var json = JToken.Load(jsonReader);
reason = json.Value<string>("message");
}
}
catch { }
}
throw new HttpException(response.StatusCode, reason);
}
}
else
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Success");
#endif
return response.Stream;
}
}
catch (TimeoutException)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Timeout");
#endif
if ((request.Options.RetryMode & RetryMode.RetryTimeouts) == 0)
throw;
await Task.Delay(500);
continue; //Retry
}
catch (Exception)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Error");
#endif
if ((request.Options.RetryMode & RetryMode.RetryErrors) == 0)
throw;
await Task.Delay(500);
continue; //Retry
}
finally
{
UpdateRateLimit(id, request, info, lag, false);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Stop");
#endif
}
}
}
private async Task EnterAsync(int id, RestRequest request)
{
int windowCount;
DateTimeOffset? resetAt;
bool isRateLimited = false;
while (true)
{
if (DateTimeOffset.UtcNow > request.TimeoutAt || request.Options.CancelToken.IsCancellationRequested)
{
if (!isRateLimited)
throw new TimeoutException();
else
throw new RateLimitedException();
}
lock (_lock)
{
windowCount = WindowCount;
resetAt = _resetTick;
}
DateTimeOffset? timeoutAt = request.TimeoutAt;
if (windowCount > 0 && Interlocked.Decrement(ref _semaphore) < 0)
{
if (!isRateLimited)
{
isRateLimited = true;
await _queue.RaiseRateLimitTriggered(Id, null).ConfigureAwait(false);
}
if ((request.Options.RetryMode & RetryMode.RetryRatelimit) == 0)
throw new RateLimitedException();
if (resetAt.HasValue)
{
if (resetAt > timeoutAt)
throw new RateLimitedException();
int millis = (int)Math.Ceiling((resetAt.Value - DateTimeOffset.UtcNow).TotalMilliseconds);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Sleeping {millis} ms (Pre-emptive)");
#endif
if (millis > 0)
await Task.Delay(millis, request.Options.CancelToken).ConfigureAwait(false);
}
else
{
if ((timeoutAt.Value - DateTimeOffset.UtcNow).TotalMilliseconds < 500.0)
throw new RateLimitedException();
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Sleeping 500* ms (Pre-emptive)");
#endif
await Task.Delay(500, request.Options.CancelToken).ConfigureAwait(false);
}
continue;
}
#if DEBUG_LIMITS
else
Debug.WriteLine($"[{id}] Entered Semaphore ({_semaphore}/{WindowCount} remaining)");
#endif
break;
}
}
private void UpdateRateLimit(int id, RestRequest request, RateLimitInfo info, TimeSpan lag, bool is429)
{
if (WindowCount == 0)
return;
lock (_lock)
{
bool hasQueuedReset = _resetTick != null;
if (info.Limit.HasValue && WindowCount != info.Limit.Value)
{
WindowCount = info.Limit.Value;
_semaphore = info.Remaining.Value;
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Upgraded Semaphore to {info.Remaining.Value}/{WindowCount}");
#endif
}
var now = DateTimeUtils.ToUnixSeconds(DateTimeOffset.UtcNow);
DateTimeOffset? resetTick = null;
//Using X-RateLimit-Remaining causes a race condition
/*if (info.Remaining.HasValue)
{
Debug.WriteLine($"[{id}] X-RateLimit-Remaining: " + info.Remaining.Value);
_semaphore = info.Remaining.Value;
}*/
if (info.RetryAfter.HasValue)
{
//RetryAfter is more accurate than Reset, where available
resetTick = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Retry-After: {info.RetryAfter.Value} ({info.RetryAfter.Value} ms)");
#endif
}
else if (info.Reset.HasValue)
{
resetTick = info.Reset.Value.AddSeconds(/*1.0 +*/ lag.TotalSeconds);
int diff = (int)(resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds;
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] X-RateLimit-Reset: {info.Reset.Value.ToUnixTimeSeconds()} ({diff} ms, {lag.TotalMilliseconds} ms lag)");
#endif
}
else if (request.Options.IsClientBucket && request.Options.BucketId != null)
{
resetTick = DateTimeOffset.UtcNow.AddSeconds(ClientBucket.Get(request.Options.BucketId).WindowSeconds);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Client Bucket ({ClientBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)");
#endif
}
if (resetTick == null)
{
WindowCount = 0; //No rate limit info, disable limits on this bucket (should only ever happen with a user token)
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Disabled Semaphore");
#endif
return;
}
if (!hasQueuedReset || resetTick > _resetTick)
{
_resetTick = resetTick;
LastAttemptAt = resetTick.Value; //Make sure we dont destroy this until after its been reset
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Reset in {(int)Math.Ceiling((resetTick - DateTimeOffset.UtcNow).Value.TotalMilliseconds)} ms");
#endif
if (!hasQueuedReset)
{
var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds));
}
}
}
}
private async Task QueueReset(int id, int millis)
{
while (true)
{
if (millis > 0)
await Task.Delay(millis).ConfigureAwait(false);
lock (_lock)
{
millis = (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds);
if (millis <= 0) //Make sure we havent gotten a more accurate reset time
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] * Reset *");
#endif
_semaphore = WindowCount;
_resetTick = null;
return;
}
}
}
}
}
}

View File

@@ -0,0 +1,21 @@
using Discord.Net.Rest;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class JsonRestRequest : RestRequest
{
public string Json { get; }
public JsonRestRequest(IRestClient client, string method, string endpoint, string json, RequestOptions options)
: base(client, method, endpoint, options)
{
Json = json;
}
public override async Task<RestResponse> SendAsync()
{
return await Client.SendAsync(Method, Endpoint, Json, Options.CancelToken, Options.HeaderOnly).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,22 @@
using Discord.Net.Rest;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class MultipartRestRequest : RestRequest
{
public IReadOnlyDictionary<string, object> MultipartParams { get; }
public MultipartRestRequest(IRestClient client, string method, string endpoint, IReadOnlyDictionary<string, object> multipartParams, RequestOptions options)
: base(client, method, endpoint, options)
{
MultipartParams = multipartParams;
}
public override async Task<RestResponse> SendAsync()
{
return await Client.SendAsync(Method, Endpoint, MultipartParams, Options.CancelToken, Options.HeaderOnly).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,34 @@
using Discord.Net.Rest;
using System;
using System.IO;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class RestRequest
{
public IRestClient Client { get; }
public string Method { get; }
public string Endpoint { get; }
public DateTimeOffset? TimeoutAt { get; }
public TaskCompletionSource<Stream> Promise { get; }
public RequestOptions Options { get; }
public RestRequest(IRestClient client, string method, string endpoint, RequestOptions options)
{
Preconditions.NotNull(options, nameof(options));
Client = client;
Method = method;
Endpoint = endpoint;
Options = options;
TimeoutAt = options.Timeout.HasValue ? DateTimeOffset.UtcNow.AddMilliseconds(options.Timeout.Value) : (DateTimeOffset?)null;
Promise = new TaskCompletionSource<Stream>();
}
public virtual async Task<RestResponse> SendAsync()
{
return await Client.SendAsync(Method, Endpoint, Options.CancelToken, Options.HeaderOnly).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,38 @@
using Discord.Net.WebSockets;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Net.Queue
{
public class WebSocketRequest
{
public IWebSocketClient Client { get; }
public string BucketId { get; }
public byte[] Data { get; }
public bool IsText { get; }
public DateTimeOffset? TimeoutAt { get; }
public TaskCompletionSource<Stream> Promise { get; }
public RequestOptions Options { get; }
public CancellationToken CancelToken { get; internal set; }
public WebSocketRequest(IWebSocketClient client, string bucketId, byte[] data, bool isText, RequestOptions options)
{
Preconditions.NotNull(options, nameof(options));
Client = client;
BucketId = bucketId;
Data = data;
IsText = isText;
Options = options;
TimeoutAt = options.Timeout.HasValue ? DateTimeOffset.UtcNow.AddMilliseconds(options.Timeout.Value) : (DateTimeOffset?)null;
Promise = new TaskCompletionSource<Stream>();
}
public async Task SendAsync()
{
await Client.SendAsync(Data, 0, Data.Length, IsText).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
namespace Discord.Net
{
internal struct RateLimitInfo
{
public bool IsGlobal { get; }
public int? Limit { get; }
public int? Remaining { get; }
public int? RetryAfter { get; }
public DateTimeOffset? Reset { get; }
internal RateLimitInfo(Dictionary<string, string> headers)
{
string temp;
IsGlobal = headers.TryGetValue("X-RateLimit-Global", out temp) ? bool.Parse(temp) : false;
Limit = headers.TryGetValue("X-RateLimit-Limit", out temp) ? int.Parse(temp) : (int?)null;
Remaining = headers.TryGetValue("X-RateLimit-Remaining", out temp) ? int.Parse(temp) : (int?)null;
Reset = headers.TryGetValue("X-RateLimit-Reset", out temp) ? DateTimeUtils.FromUnixSeconds(int.Parse(temp)) : (DateTimeOffset?)null;
RetryAfter = headers.TryGetValue("Retry-After", out temp) ? int.Parse(temp) : (int?)null;
}
}
}