Added new experimental MessageQueue

This commit is contained in:
RogueException
2016-02-10 17:19:41 -04:00
parent 9843298af8
commit cf9fcc9521
2 changed files with 154 additions and 80 deletions

View File

@@ -68,8 +68,6 @@ namespace Discord
/// <summary> Gets a collection of all servers this client is a member of. </summary> /// <summary> Gets a collection of all servers this client is a member of. </summary>
public IEnumerable<Server> Servers => _servers.Select(x => x.Value); public IEnumerable<Server> Servers => _servers.Select(x => x.Value);
// /// <summary> Gets a collection of all channels this client is a member of. </summary>
// public IEnumerable<Channel> Channels => _channels.Select(x => x.Value);
/// <summary> Gets a collection of all private channels this client is a member of. </summary> /// <summary> Gets a collection of all private channels this client is a member of. </summary>
public IEnumerable<Channel> PrivateChannels => _privateChannels.Select(x => x.Value); public IEnumerable<Channel> PrivateChannels => _privateChannels.Select(x => x.Value);
/// <summary> Gets a collection of all voice regions currently offered by Discord. </summary> /// <summary> Gets a collection of all voice regions currently offered by Discord. </summary>
@@ -198,11 +196,8 @@ namespace Discord
await Login(email, password, token).ConfigureAwait(false); await Login(email, password, token).ConfigureAwait(false);
await GatewaySocket.Connect(ClientAPI, CancelToken).ConfigureAwait(false); await GatewaySocket.Connect(ClientAPI, CancelToken).ConfigureAwait(false);
Task[] tasks = new[] var tasks = new[] { CancelToken.Wait() }
{ .Concat(MessageQueue.Run(CancelToken));
CancelToken.Wait(),
MessageQueue.Run(CancelToken)
};
await _taskManager.Start(tasks, cancelSource).ConfigureAwait(false); await _taskManager.Start(tasks, cancelSource).ConfigureAwait(false);
GatewaySocket.WaitForConnection(CancelToken); GatewaySocket.WaitForConnection(CancelToken);

View File

@@ -3,6 +3,7 @@ using Discord.Logging;
using Discord.Net.Rest; using Discord.Net.Rest;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net; using System.Net;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -12,42 +13,16 @@ namespace Discord.Net
/// <summary> Manages an outgoing message queue for DiscordClient. </summary> /// <summary> Manages an outgoing message queue for DiscordClient. </summary>
public class MessageQueue public class MessageQueue
{ {
private interface IQueuedAction private struct MessageEdit
{ {
Task Do(MessageQueue queue); public readonly Message Message;
} public readonly string NewText;
private struct SendAction : IQueuedAction public MessageEdit(Message message, string newText)
{
private readonly Message _msg;
public SendAction(Message msg)
{ {
_msg = msg; Message = message;
NewText = newText;
} }
Task IQueuedAction.Do(MessageQueue queue) => queue.Send(_msg);
}
private struct EditAction : IQueuedAction
{
private readonly Message _msg;
private readonly string _text;
public EditAction(Message msg, string text)
{
_msg = msg;
_text = text;
}
Task IQueuedAction.Do(MessageQueue queue) => queue.Edit(_msg, _text);
}
private struct DeleteAction : IQueuedAction
{
private readonly Message _msg;
public DeleteAction(Message msg)
{
_msg = msg;
}
Task IQueuedAction.Do(MessageQueue queue) => queue.Delete(_msg);
} }
private const int WarningStart = 30; private const int WarningStart = 30;
@@ -55,12 +30,16 @@ namespace Discord.Net
private readonly Random _nonceRand; private readonly Random _nonceRand;
private readonly RestClient _rest; private readonly RestClient _rest;
private readonly Logger _logger; private readonly Logger _logger;
private readonly ConcurrentQueue<IQueuedAction> _pendingActions; private readonly ConcurrentQueue<int> _pendingSends;
private readonly ConcurrentDictionary<int, Message> _pendingSends; private readonly ConcurrentQueue<MessageEdit> _pendingEdits;
private readonly ConcurrentQueue<Message> _pendingDeletes;
private readonly ConcurrentDictionary<int, Message> _pendingSendsByNonce;
private readonly ConcurrentQueue<Task> _pendingTasks;
private int _nextWarning; private int _nextWarning;
private int _count;
/// <summary> Gets the current number of queued actions. </summary> /// <summary> Gets the current number of queued actions. </summary>
public int Count { get; private set; } public int Count => _count;
internal MessageQueue(RestClient rest, Logger logger) internal MessageQueue(RestClient rest, Logger logger)
{ {
@@ -68,8 +47,11 @@ namespace Discord.Net
_logger = logger; _logger = logger;
_nonceRand = new Random(); _nonceRand = new Random();
_pendingActions = new ConcurrentQueue<IQueuedAction>(); _pendingSends = new ConcurrentQueue<int>();
_pendingSends = new ConcurrentDictionary<int, Message>(); _pendingEdits = new ConcurrentQueue<MessageEdit>();
_pendingDeletes = new ConcurrentQueue<Message>();
_pendingSendsByNonce = new ConcurrentDictionary<int, Message>();
_pendingTasks = new ConcurrentQueue<Task>();
} }
internal Message QueueSend(Channel channel, string text, bool isTTS) internal Message QueueSend(Channel channel, string text, bool isTTS)
@@ -78,28 +60,45 @@ namespace Discord.Net
msg.RawText = text; msg.RawText = text;
msg.Text = msg.Resolve(text); msg.Text = msg.Resolve(text);
msg.Nonce = GenerateNonce(); msg.Nonce = GenerateNonce();
msg.State = MessageState.Queued; if (_pendingSendsByNonce.TryAdd(msg.Nonce, msg))
_pendingSends.TryAdd(msg.Nonce, msg); {
_pendingActions.Enqueue(new SendAction(msg)); msg.State = MessageState.Queued;
IncrementCount();
_pendingSends.Enqueue(msg.Nonce);
}
else
msg.State = MessageState.Failed;
return msg; return msg;
} }
internal void QueueEdit(Message msg, string text) internal void QueueEdit(Message msg, string text)
{ {
_pendingActions.Enqueue(new EditAction(msg, text)); IncrementCount();
_pendingEdits.Enqueue(new MessageEdit(msg, text));
} }
internal void QueueDelete(Message msg) internal void QueueDelete(Message msg)
{ {
Message ignored; Message ignored;
if (msg.State == MessageState.Queued && _pendingSends.TryRemove(msg.Nonce, out ignored)) if (msg.State == MessageState.Queued && _pendingSendsByNonce.TryRemove(msg.Nonce, out ignored))
{ {
//Successfully stopped the message from being sent in the first place
msg.State = MessageState.Aborted; msg.State = MessageState.Aborted;
return; return;
} }
IncrementCount();
_pendingActions.Enqueue(new DeleteAction(msg)); _pendingDeletes.Enqueue(msg);
} }
internal Task Run(CancellationToken cancelToken) internal Task[] Run(CancellationToken cancelToken)
{
return new[]
{
RunSendQueue(cancelToken),
RunEditQueue(cancelToken),
RunDeleteQueue(cancelToken),
RunTaskQueue(cancelToken)
};
}
private Task RunSendQueue(CancellationToken cancelToken)
{ {
_nextWarning = WarningStart; _nextWarning = WarningStart;
return Task.Run((Func<Task>)(async () => return Task.Run((Func<Task>)(async () =>
@@ -108,18 +107,13 @@ namespace Discord.Net
{ {
while (!cancelToken.IsCancellationRequested) while (!cancelToken.IsCancellationRequested)
{ {
Count = _pendingActions.Count; int nonce;
if (Count >= _nextWarning) while (_pendingSends.TryDequeue(out nonce))
{ {
_nextWarning *= 2; Message msg;
_logger.Warning($"Queue is backed up, currently at {Count} actions."); if (_pendingSendsByNonce.TryRemove(nonce, out msg)) //If it was delete from queue, this will fail
await Send(msg).ConfigureAwait(false);
} }
else if (Count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart;
IQueuedAction queuedAction;
while (_pendingActions.TryDequeue(out queuedAction))
await queuedAction.Do(this).ConfigureAwait(false);
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false); await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
} }
@@ -127,29 +121,96 @@ namespace Discord.Net
catch (OperationCanceledException) { } catch (OperationCanceledException) { }
})); }));
} }
private Task RunEditQueue(CancellationToken cancelToken)
internal async Task Send(Message msg)
{ {
if (_pendingSends.TryRemove(msg.Nonce, out msg)) //Remove it from pending _nextWarning = WarningStart;
return Task.Run((Func<Task>)(async () =>
{ {
try try
{ {
var request = new SendMessageRequest(msg.Channel.Id) while (!cancelToken.IsCancellationRequested)
{ {
Content = msg.RawText, MessageEdit edit;
Nonce = msg.Nonce.ToString(), while (_pendingEdits.TryPeek(out edit) && edit.Message.State != MessageState.Queued)
IsTTS = msg.IsTTS {
}; _pendingEdits.TryDequeue(out edit);
var response = await _rest.Send(request).ConfigureAwait(false); if (edit.Message.State == MessageState.Normal)
msg.Id = response.Id; await Edit(edit.Message, edit.NewText);
msg.Update(response); }
msg.State = MessageState.Normal;
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
} }
catch (Exception ex) catch (OperationCanceledException) { }
}));
}
private Task RunDeleteQueue(CancellationToken cancelToken)
{
_nextWarning = WarningStart;
return Task.Run((Func<Task>)(async () =>
{
try
{ {
msg.State = MessageState.Failed; while (!cancelToken.IsCancellationRequested)
_logger.Error("Failed to send message", ex); {
Message msg;
while (_pendingDeletes.TryPeek(out msg) && msg.State != MessageState.Queued)
{
_pendingDeletes.TryDequeue(out msg);
if (msg.State == MessageState.Normal)
_pendingTasks.Enqueue(Delete(msg));
}
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
} }
catch (OperationCanceledException) { }
}));
}
private Task RunTaskQueue(CancellationToken cancelToken)
{
return Task.Run(async () =>
{
Task task;
while (!cancelToken.IsCancellationRequested)
{
while (_pendingTasks.TryPeek(out task) && task.IsCompleted)
{
_pendingTasks.TryDequeue(out task); //Should never fail
if (task.IsFaulted)
_logger.Warning("Error during Edit/Delete", task.Exception);
}
await Task.Delay((int)Discord.DiscordConfig.MessageQueueInterval).ConfigureAwait(false);
}
//Wait for remaining tasks to complete
while (_pendingTasks.TryDequeue(out task))
{
if (!task.IsCompleted)
await task.ConfigureAwait(false);
}
});
}
internal async Task Send(Message msg)
{
try
{
var request = new SendMessageRequest(msg.Channel.Id)
{
Content = msg.RawText,
Nonce = msg.Nonce.ToString(),
IsTTS = msg.IsTTS
};
var response = await _rest.Send(request).ConfigureAwait(false);
msg.State = MessageState.Normal;
msg.Id = response.Id;
msg.Update(response);
}
catch (Exception ex)
{
msg.State = MessageState.Failed;
_logger.Error("Failed to send message", ex);
} }
} }
internal async Task Edit(Message msg, string text) internal async Task Edit(Message msg, string text)
@@ -187,11 +248,29 @@ namespace Discord.Net
} }
} }
/// <summary> Clears all queued message sends/edits/deletes </summary> private void IncrementCount()
{
int count = Interlocked.Increment(ref _count);
if (count >= _nextWarning)
{
_nextWarning *= 2;
_logger.Warning($"Queue is backed up, currently at {count} actions.");
}
else if (count < WarningStart) //Reset once the problem is solved
_nextWarning = WarningStart;
}
/// <summary> Clears all queued message sends/edits/deletes. </summary>
public void Clear() public void Clear()
{ {
IQueuedAction ignored; int nonce;
while (_pendingActions.TryDequeue(out ignored)) { } while (_pendingSends.TryDequeue(out nonce)) { }
MessageEdit edit;
while (_pendingEdits.TryDequeue(out edit)) { }
Message msg;
while (_pendingDeletes.TryDequeue(out msg)) { }
} }
private int GenerateNonce() private int GenerateNonce()