Merge branch 'dev' into feature/reactions
This commit is contained in:
@@ -2,25 +2,47 @@
|
||||
|
||||
namespace Discord.Net.Queue
|
||||
{
|
||||
public struct ClientBucket
|
||||
public enum ClientBucketType
|
||||
{
|
||||
public const string SendEditId = "<send_edit>";
|
||||
Unbucketed = 0,
|
||||
SendEdit = 1
|
||||
}
|
||||
internal struct ClientBucket
|
||||
{
|
||||
private static readonly ImmutableDictionary<ClientBucketType, ClientBucket> _defsByType;
|
||||
private static readonly ImmutableDictionary<string, ClientBucket> _defsById;
|
||||
|
||||
private static readonly ImmutableDictionary<string, ClientBucket> _defs;
|
||||
static ClientBucket()
|
||||
{
|
||||
var builder = ImmutableDictionary.CreateBuilder<string, ClientBucket>();
|
||||
builder.Add(SendEditId, new ClientBucket(10, 10));
|
||||
_defs = builder.ToImmutable();
|
||||
var buckets = new[]
|
||||
{
|
||||
new ClientBucket(ClientBucketType.Unbucketed, "<unbucketed>", 10, 10),
|
||||
new ClientBucket(ClientBucketType.SendEdit, "<send_edit>", 10, 10)
|
||||
};
|
||||
|
||||
var builder = ImmutableDictionary.CreateBuilder<ClientBucketType, ClientBucket>();
|
||||
foreach (var bucket in buckets)
|
||||
builder.Add(bucket.Type, bucket);
|
||||
_defsByType = builder.ToImmutable();
|
||||
|
||||
var builder2 = ImmutableDictionary.CreateBuilder<string, ClientBucket>();
|
||||
foreach (var bucket in buckets)
|
||||
builder2.Add(bucket.Id, bucket);
|
||||
_defsById = builder2.ToImmutable();
|
||||
}
|
||||
|
||||
public static ClientBucket Get(string id) =>_defs[id];
|
||||
|
||||
public static ClientBucket Get(ClientBucketType type) => _defsByType[type];
|
||||
public static ClientBucket Get(string id) => _defsById[id];
|
||||
|
||||
public ClientBucketType Type { get; }
|
||||
public string Id { get; }
|
||||
public int WindowCount { get; }
|
||||
public int WindowSeconds { get; }
|
||||
|
||||
public ClientBucket(int count, int seconds)
|
||||
public ClientBucket(ClientBucketType type, string id, int count, int seconds)
|
||||
{
|
||||
Type = type;
|
||||
Id = id;
|
||||
WindowCount = count;
|
||||
WindowSeconds = seconds;
|
||||
}
|
||||
|
||||
@@ -79,7 +79,9 @@ namespace Discord.Net.Queue
|
||||
int millis = (int)Math.Ceiling((_waitUntil - DateTimeOffset.UtcNow).TotalMilliseconds);
|
||||
if (millis > 0)
|
||||
{
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Sleeping {millis} ms (Pre-emptive) [Global]");
|
||||
#endif
|
||||
await Task.Delay(millis).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
using Newtonsoft.Json;
|
||||
using Discord.Net.Rest;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using System;
|
||||
#if DEBUG_LIMITS
|
||||
using System.Diagnostics;
|
||||
#endif
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
@@ -27,8 +30,8 @@ namespace Discord.Net.Queue
|
||||
|
||||
_lock = new object();
|
||||
|
||||
if (request.Options.ClientBucketId != null)
|
||||
WindowCount = ClientBucket.Get(request.Options.ClientBucketId).WindowCount;
|
||||
if (request.Options.IsClientBucket)
|
||||
WindowCount = ClientBucket.Get(request.Options.BucketId).WindowCount;
|
||||
else
|
||||
WindowCount = 1; //Only allow one request until we get a header back
|
||||
_semaphore = WindowCount;
|
||||
@@ -40,62 +43,91 @@ namespace Discord.Net.Queue
|
||||
public async Task<Stream> SendAsync(RestRequest request)
|
||||
{
|
||||
int id = Interlocked.Increment(ref nextId);
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Start");
|
||||
#endif
|
||||
LastAttemptAt = DateTimeOffset.UtcNow;
|
||||
while (true)
|
||||
{
|
||||
await _queue.EnterGlobalAsync(id, request).ConfigureAwait(false);
|
||||
await EnterAsync(id, request).ConfigureAwait(false);
|
||||
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Sending...");
|
||||
var response = await request.SendAsync().ConfigureAwait(false);
|
||||
TimeSpan lag = DateTimeOffset.UtcNow - DateTimeOffset.Parse(response.Headers["Date"]);
|
||||
var info = new RateLimitInfo(response.Headers);
|
||||
|
||||
if (response.StatusCode < (HttpStatusCode)200 || response.StatusCode >= (HttpStatusCode)300)
|
||||
#endif
|
||||
TimeSpan lag = default(TimeSpan);
|
||||
RateLimitInfo info = default(RateLimitInfo);
|
||||
try
|
||||
{
|
||||
switch (response.StatusCode)
|
||||
var response = await request.SendAsync().ConfigureAwait(false);
|
||||
lag = DateTimeOffset.UtcNow - DateTimeOffset.Parse(response.Headers["Date"]);
|
||||
info = new RateLimitInfo(response.Headers);
|
||||
|
||||
if (response.StatusCode < (HttpStatusCode)200 || response.StatusCode >= (HttpStatusCode)300)
|
||||
{
|
||||
case (HttpStatusCode)429:
|
||||
if (info.IsGlobal)
|
||||
{
|
||||
Debug.WriteLine($"[{id}] (!) 429 [Global]");
|
||||
_queue.PauseGlobal(info, lag);
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug.WriteLine($"[{id}] (!) 429");
|
||||
UpdateRateLimit(id, request, info, lag, true);
|
||||
}
|
||||
await _queue.RaiseRateLimitTriggered(Id, info).ConfigureAwait(false);
|
||||
continue; //Retry
|
||||
case HttpStatusCode.BadGateway: //502
|
||||
Debug.WriteLine($"[{id}] (!) 502");
|
||||
continue; //Continue
|
||||
default:
|
||||
string reason = null;
|
||||
if (response.Stream != null)
|
||||
{
|
||||
try
|
||||
switch (response.StatusCode)
|
||||
{
|
||||
case (HttpStatusCode)429:
|
||||
if (info.IsGlobal)
|
||||
{
|
||||
using (var reader = new StreamReader(response.Stream))
|
||||
using (var jsonReader = new JsonTextReader(reader))
|
||||
{
|
||||
var json = JToken.Load(jsonReader);
|
||||
reason = json.Value<string>("message");
|
||||
}
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] (!) 429 [Global]");
|
||||
#endif
|
||||
_queue.PauseGlobal(info, lag);
|
||||
}
|
||||
catch { }
|
||||
}
|
||||
throw new HttpException(response.StatusCode, reason);
|
||||
else
|
||||
{
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] (!) 429");
|
||||
#endif
|
||||
UpdateRateLimit(id, request, info, lag, true);
|
||||
}
|
||||
await _queue.RaiseRateLimitTriggered(Id, info).ConfigureAwait(false);
|
||||
continue; //Retry
|
||||
case HttpStatusCode.BadGateway: //502
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] (!) 502");
|
||||
#endif
|
||||
continue; //Continue
|
||||
default:
|
||||
string reason = null;
|
||||
if (response.Stream != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
using (var reader = new StreamReader(response.Stream))
|
||||
using (var jsonReader = new JsonTextReader(reader))
|
||||
{
|
||||
var json = JToken.Load(jsonReader);
|
||||
reason = json.Value<string>("message");
|
||||
}
|
||||
}
|
||||
catch { }
|
||||
}
|
||||
throw new HttpException(response.StatusCode, reason);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Success");
|
||||
#endif
|
||||
return response.Stream;
|
||||
}
|
||||
}
|
||||
else
|
||||
#if DEBUG_LIMITS
|
||||
catch
|
||||
{
|
||||
Debug.WriteLine($"[{id}] Error");
|
||||
throw;
|
||||
}
|
||||
#endif
|
||||
finally
|
||||
{
|
||||
Debug.WriteLine($"[{id}] Success");
|
||||
UpdateRateLimit(id, request, info, lag, false);
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Stop");
|
||||
return response.Stream;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -135,7 +167,9 @@ namespace Discord.Net.Queue
|
||||
if (resetAt > timeoutAt)
|
||||
throw new RateLimitedException();
|
||||
int millis = (int)Math.Ceiling((resetAt.Value - DateTimeOffset.UtcNow).TotalMilliseconds);
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Sleeping {millis} ms (Pre-emptive)");
|
||||
#endif
|
||||
if (millis > 0)
|
||||
await Task.Delay(millis, request.CancelToken).ConfigureAwait(false);
|
||||
}
|
||||
@@ -143,13 +177,17 @@ namespace Discord.Net.Queue
|
||||
{
|
||||
if ((timeoutAt.Value - DateTimeOffset.UtcNow).TotalMilliseconds < 500.0)
|
||||
throw new RateLimitedException();
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Sleeping 500* ms (Pre-emptive)");
|
||||
#endif
|
||||
await Task.Delay(500, request.CancelToken).ConfigureAwait(false);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
#if DEBUG_LIMITS
|
||||
else
|
||||
Debug.WriteLine($"[{id}] Entered Semaphore ({_semaphore}/{WindowCount} remaining)");
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -166,7 +204,9 @@ namespace Discord.Net.Queue
|
||||
{
|
||||
WindowCount = info.Limit.Value;
|
||||
_semaphore = info.Remaining.Value;
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Upgraded Semaphore to {info.Remaining.Value}/{WindowCount}");
|
||||
#endif
|
||||
}
|
||||
|
||||
var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
|
||||
@@ -182,24 +222,32 @@ namespace Discord.Net.Queue
|
||||
{
|
||||
//RetryAfter is more accurate than Reset, where available
|
||||
resetTick = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value);
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Retry-After: {info.RetryAfter.Value} ({info.RetryAfter.Value} ms)");
|
||||
#endif
|
||||
}
|
||||
else if (info.Reset.HasValue)
|
||||
{
|
||||
resetTick = info.Reset.Value.AddSeconds(/*1.0 +*/ lag.TotalSeconds);
|
||||
int diff = (int)(resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds;
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] X-RateLimit-Reset: {info.Reset.Value.ToUnixTimeSeconds()} ({diff} ms, {lag.TotalMilliseconds} ms lag)");
|
||||
#endif
|
||||
}
|
||||
else if (request.Options.ClientBucketId != null)
|
||||
else if (request.Options.IsClientBucket && request.Options.BucketId != null)
|
||||
{
|
||||
resetTick = DateTimeOffset.UtcNow.AddSeconds(ClientBucket.Get(request.Options.ClientBucketId).WindowSeconds);
|
||||
Debug.WriteLine($"[{id}] Client Bucket ({ClientBucket.Get(request.Options.ClientBucketId).WindowSeconds * 1000} ms)");
|
||||
resetTick = DateTimeOffset.UtcNow.AddSeconds(ClientBucket.Get(request.Options.BucketId).WindowSeconds);
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Client Bucket ({ClientBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)");
|
||||
#endif
|
||||
}
|
||||
|
||||
if (resetTick == null)
|
||||
{
|
||||
WindowCount = 0; //No rate limit info, disable limits on this bucket (should only ever happen with a user token)
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Disabled Semaphore");
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -207,7 +255,9 @@ namespace Discord.Net.Queue
|
||||
{
|
||||
_resetTick = resetTick;
|
||||
LastAttemptAt = resetTick.Value; //Make sure we dont destroy this until after its been reset
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] Reset in {(int)Math.Ceiling((resetTick - DateTimeOffset.UtcNow).Value.TotalMilliseconds)} ms");
|
||||
#endif
|
||||
|
||||
if (!hasQueuedReset)
|
||||
{
|
||||
@@ -227,7 +277,9 @@ namespace Discord.Net.Queue
|
||||
millis = (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds);
|
||||
if (millis <= 0) //Make sure we havent gotten a more accurate reset time
|
||||
{
|
||||
#if DEBUG_LIMITS
|
||||
Debug.WriteLine($"[{id}] * Reset *");
|
||||
#endif
|
||||
_semaphore = WindowCount;
|
||||
_resetTick = null;
|
||||
return;
|
||||
@@ -236,4 +288,4 @@ namespace Discord.Net.Queue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ namespace Discord.Net.Rest
|
||||
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelToken, cancelToken).Token;
|
||||
HttpResponseMessage response = await _client.SendAsync(request, cancelToken).ConfigureAwait(false);
|
||||
|
||||
var headers = response.Headers.ToDictionary(x => x.Key, x => x.Value.FirstOrDefault());
|
||||
var headers = response.Headers.ToDictionary(x => x.Key, x => x.Value.FirstOrDefault(), StringComparer.OrdinalIgnoreCase);
|
||||
var stream = !headerOnly ? await response.Content.ReadAsStreamAsync().ConfigureAwait(false) : null;
|
||||
|
||||
return new RestResponse(response.StatusCode, headers, stream);
|
||||
|
||||
@@ -101,6 +101,8 @@ namespace Discord.Net.WebSockets
|
||||
|
||||
if (_client != null && _client.State == WebSocketState.Open)
|
||||
{
|
||||
var token = new CancellationToken();
|
||||
await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", token);
|
||||
_client.Dispose();
|
||||
_client = null;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user