Added SendFile(User), reworked the message queue, adding edit message queuing

This commit is contained in:
RogueException
2015-12-09 20:34:46 -04:00
parent fb190b45ac
commit 946820d08a
3 changed files with 157 additions and 106 deletions

View File

@@ -2,8 +2,8 @@ using Discord.API;
using Discord.Net; using Discord.Net;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
@@ -37,7 +37,20 @@ namespace Discord
=> base.Import(messages); => base.Import(messages);
} }
public class MessageEventArgs : EventArgs internal class MessageQueueItem
{
public readonly Message Message;
public readonly string Text;
public readonly long[] MentionedUsers;
public MessageQueueItem(Message msg, string text, long[] userIds)
{
Message = msg;
Text = text;
MentionedUsers = userIds;
}
}
public class MessageEventArgs : EventArgs
{ {
public Message Message { get; } public Message Message { get; }
public User User => Message.User; public User User => Message.User;
@@ -83,10 +96,13 @@ namespace Discord
} }
internal Messages Messages => _messages; internal Messages Messages => _messages;
private readonly Messages _messages; private readonly Random _nonceRand;
private readonly Messages _messages;
private readonly JsonSerializer _messageImporter;
private readonly ConcurrentQueue<MessageQueueItem> _pendingMessages;
/// <summary> Returns the message with the specified id, or null if none was found. </summary> /// <summary> Returns the message with the specified id, or null if none was found. </summary>
public Message GetMessage(long id) public Message GetMessage(long id)
{ {
if (id <= 0) throw new ArgumentOutOfRangeException(nameof(id)); if (id <= 0) throw new ArgumentOutOfRangeException(nameof(id));
CheckReady(); CheckReady();
@@ -101,93 +117,112 @@ namespace Discord
if (text == null) throw new ArgumentNullException(nameof(text)); if (text == null) throw new ArgumentNullException(nameof(text));
CheckReady(); CheckReady();
return SendMessage(channel, text, false); return SendMessageInternal(channel, text, false);
} }
/// <summary> Sends a text-to-speech message to the provided channel. To include a mention, see the Mention static helper class. </summary> /// <summary> Sends a private message to the provided user. </summary>
public Task<Message> SendTTSMessage(Channel channel, string text) public async Task<Message> SendMessage(User user, string text)
{
if (user == null) throw new ArgumentNullException(nameof(user));
if (text == null) throw new ArgumentNullException(nameof(text));
CheckReady();
var channel = await CreatePMChannel(user).ConfigureAwait(false);
return await SendMessageInternal(channel, text, false).ConfigureAwait(false);
}
/// <summary> Sends a text-to-speech message to the provided channel. To include a mention, see the Mention static helper class. </summary>
public Task<Message> SendTTSMessage(Channel channel, string text)
{ {
if (channel == null) throw new ArgumentNullException(nameof(channel)); if (channel == null) throw new ArgumentNullException(nameof(channel));
if (text == null) throw new ArgumentNullException(nameof(text)); if (text == null) throw new ArgumentNullException(nameof(text));
CheckReady(); CheckReady();
return SendMessage(channel, text, true); return SendMessageInternal(channel, text, true);
} }
/// <summary> Sends a private message to the provided user. </summary> /// <summary> Sends a file to the provided channel. </summary>
public async Task<Message> SendPrivateMessage(User user, string text) public Task<Message> SendFile(Channel channel, string filePath)
{ {
if (user == null) throw new ArgumentNullException(nameof(user)); if (channel == null) throw new ArgumentNullException(nameof(channel));
if (text == null) throw new ArgumentNullException(nameof(text)); if (filePath == null) throw new ArgumentNullException(nameof(filePath));
CheckReady(); CheckReady();
var channel = await CreatePMChannel(user).ConfigureAwait(false); return SendFile(channel, Path.GetFileName(filePath), File.OpenRead(filePath));
return await SendMessage(channel, text).ConfigureAwait(false); }
} /// <summary> Sends a file to the provided channel. </summary>
private async Task<Message> SendMessage(Channel channel, string text, bool isTextToSpeech) public async Task<Message> SendFile(Channel channel, string filename, Stream stream)
{
if (channel == null) throw new ArgumentNullException(nameof(channel));
if (filename == null) throw new ArgumentNullException(nameof(filename));
if (stream == null) throw new ArgumentNullException(nameof(stream));
CheckReady();
var model = await _api.SendFile(channel.Id, filename, stream).ConfigureAwait(false);
var msg = _messages.GetOrAdd(model.Id, channel.Id, model.Author.Id);
msg.Update(model);
RaiseMessageSent(msg);
return msg;
}
/// <summary> Sends a file to the provided channel. </summary>
public async Task<Message> SendFile(User user, string filePath)
{
if (user == null) throw new ArgumentNullException(nameof(user));
if (filePath == null) throw new ArgumentNullException(nameof(filePath));
CheckReady();
var channel = await CreatePMChannel(user).ConfigureAwait(false);
return await SendFile(channel, Path.GetFileName(filePath), File.OpenRead(filePath)).ConfigureAwait(false);
}
/// <summary> Sends a file to the provided channel. </summary>
public async Task<Message> SendFile(User user, string filename, Stream stream)
{
if (user == null) throw new ArgumentNullException(nameof(user));
if (filename == null) throw new ArgumentNullException(nameof(filename));
if (stream == null) throw new ArgumentNullException(nameof(stream));
CheckReady();
var channel = await CreatePMChannel(user).ConfigureAwait(false);
return await SendFile(channel, filename, stream).ConfigureAwait(false);
}
private async Task<Message> SendMessageInternal(Channel channel, string text, bool isTextToSpeech)
{ {
Message msg; Message msg;
var server = channel.Server; var server = channel.Server;
if (Config.UseMessageQueue) var mentionedUsers = new List<User>();
text = Mention.CleanUserMentions(this, server, text, mentionedUsers);
if (text.Length > MaxMessageSize)
throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {MaxMessageSize} characters or less.");
if (Config.UseMessageQueue)
{ {
var nonce = GenerateNonce(); var nonce = GenerateNonce();
msg = _messages.GetOrAdd(nonce, channel.Id, _userId.Value); msg = _messages.GetOrAdd(nonce, channel.Id, _userId.Value);
var currentUser = msg.User; var currentUser = msg.User;
msg.Update(new MessageInfo msg.Update(new MessageInfo
{ {
Content = text, Content = text,
Timestamp = DateTime.UtcNow, Timestamp = DateTime.UtcNow,
Author = new UserReference { Avatar = currentUser.AvatarId, Discriminator = currentUser.Discriminator, Id = _userId.Value, Username = currentUser.Name }, Author = new UserReference { Avatar = currentUser.AvatarId, Discriminator = currentUser.Discriminator, Id = _userId.Value, Username = currentUser.Name },
ChannelId = channel.Id, ChannelId = channel.Id,
Nonce = IdConvert.ToString(nonce),
IsTextToSpeech = isTextToSpeech IsTextToSpeech = isTextToSpeech
}); });
msg.State = MessageState.Queued; msg.State = MessageState.Queued;
if (text.Length > MaxMessageSize) _pendingMessages.Enqueue(new MessageQueueItem(msg, text, mentionedUsers.Select(x => x.Id).ToArray()));
throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {MaxMessageSize} characters or less."); }
_pendingMessages.Enqueue(msg);
}
else else
{ {
var mentionedUsers = new List<User>(); var model = await _api.SendMessage(channel.Id, text, mentionedUsers.Select(x => x.Id), null, isTextToSpeech).ConfigureAwait(false);
if (!channel.IsPrivate) msg = _messages.GetOrAdd(model.Id, channel.Id, model.Author.Id);
text = Mention.CleanUserMentions(this, server, text, mentionedUsers); msg.Update(model);
RaiseMessageSent(msg);
if (text.Length > MaxMessageSize) }
throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {MaxMessageSize} characters or less.");
var model = await _api.SendMessage(channel.Id, text, mentionedUsers.Select(x => x.Id), null, isTextToSpeech).ConfigureAwait(false);
msg = _messages.GetOrAdd(model.Id, channel.Id, model.Author.Id);
msg.Update(model);
RaiseMessageSent(msg);
}
return msg; return msg;
} }
/// <summary> Edits the provided message, changing only non-null attributes. </summary>
/// <summary> Sends a file to the provided channel. </summary> /// <remarks> While not required, it is recommended to include a mention reference in the text (see Mention.User). </remarks>
public Task SendFile(Channel channel, string filePath) public async Task EditMessage(Message message, string text)
{
if (channel == null) throw new ArgumentNullException(nameof(channel));
if (filePath == null) throw new ArgumentNullException(nameof(filePath));
CheckReady();
return _api.SendFile(channel.Id, Path.GetFileName(filePath), File.OpenRead(filePath));
}
/// <summary> Sends a file to the provided channel. </summary>
public Task SendFile(Channel channel, string filename, Stream stream)
{
if (channel == null) throw new ArgumentNullException(nameof(channel));
if (filename == null) throw new ArgumentNullException(nameof(filename));
if (stream == null) throw new ArgumentNullException(nameof(stream));
CheckReady();
return _api.SendFile(channel.Id, filename, stream);
}
/// <summary> Edits the provided message, changing only non-null attributes. </summary>
/// <remarks> While not required, it is recommended to include a mention reference in the text (see Mention.User). </remarks>
public Task EditMessage(Message message, string text)
{ {
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));
if (text == null) throw new ArgumentNullException(nameof(text)); if (text == null) throw new ArgumentNullException(nameof(text));
@@ -200,8 +235,11 @@ namespace Discord
if (text.Length > MaxMessageSize) if (text.Length > MaxMessageSize)
throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {MaxMessageSize} characters or less."); throw new ArgumentOutOfRangeException(nameof(text), $"Message must be {MaxMessageSize} characters or less.");
return _api.EditMessage(message.Id, message.Channel.Id, text, mentionedUsers.Select(x => x.Id)); if (Config.UseMessageQueue)
_pendingMessages.Enqueue(new MessageQueueItem(message, text, mentionedUsers.Select(x => x.Id).ToArray()));
else
await _api.EditMessage(message.Id, message.Channel.Id, text, mentionedUsers.Select(x => x.Id)).ConfigureAwait(false);
} }
/// <summary> Deletes the provided message. </summary> /// <summary> Deletes the provided message. </summary>
@@ -302,46 +340,44 @@ namespace Discord
return JsonConvert.SerializeObject(channel.Messages); return JsonConvert.SerializeObject(channel.Messages);
} }
private Task MessageQueueLoop() private Task MessageQueueAsync()
{ {
var cancelToken = _cancelToken; var cancelToken = _cancelToken;
int interval = Config.MessageQueueInterval; int interval = Config.MessageQueueInterval;
return Task.Run(async () => return Task.Run(async () =>
{ {
Message msg; MessageQueueItem queuedMessage;
while (!cancelToken.IsCancellationRequested) while (!cancelToken.IsCancellationRequested)
{ {
while (_pendingMessages.TryDequeue(out msg)) while (_pendingMessages.TryDequeue(out queuedMessage))
{ {
bool hasFailed = false;
SendMessageResponse response = null; SendMessageResponse response = null;
try var msg = queuedMessage.Message;
{ try
response = await _api.SendMessage(msg.Channel.Id, msg.RawText, msg.MentionedUsers.Select(x => x.Id), IdConvert.ToString(msg.Id), msg.IsTTS).ConfigureAwait(false); {
} response = await _api.SendMessage(
catch (WebException) { break; } msg.Channel.Id,
catch (HttpException) { hasFailed = true; } queuedMessage.Text,
queuedMessage.MentionedUsers,
if (!hasFailed) IdConvert.ToString(msg.Id), //Nonce
{ msg.IsTTS)
_messages.Remap(msg.Id, response.Id); .ConfigureAwait(false);
msg.Id = response.Id;
msg.Update(response);
msg.State = MessageState.Normal;
} }
else catch (WebException) { break; }
msg.State = MessageState.Failed; catch (HttpException) { msg.State = MessageState.Failed; }
RaiseMessageSent(msg); RaiseMessageSent(msg);
} }
await Task.Delay(interval).ConfigureAwait(false); await Task.Delay(interval).ConfigureAwait(false);
} }
}); });
} }
private long GenerateNonce() private long GenerateNonce()
{ {
lock (_rand) lock (_nonceRand)
return -_rand.Next(1, int.MaxValue - 1); return -_nonceRand.Next(1, int.MaxValue - 1);
} }
} }
} }

View File

@@ -53,9 +53,6 @@ namespace Discord
private readonly ManualResetEvent _disconnectedEvent; private readonly ManualResetEvent _disconnectedEvent;
private readonly ManualResetEventSlim _connectedEvent; private readonly ManualResetEventSlim _connectedEvent;
private readonly Random _rand;
private readonly JsonSerializer _messageImporter;
private readonly ConcurrentQueue<Message> _pendingMessages;
private readonly Dictionary<Type, object> _singletons; private readonly Dictionary<Type, object> _singletons;
private readonly LogService _log; private readonly LogService _log;
private readonly object _cacheLock; private readonly object _cacheLock;
@@ -114,7 +111,7 @@ namespace Discord
_config = config ?? new DiscordConfig(); _config = config ?? new DiscordConfig();
_config.Lock(); _config.Lock();
_rand = new Random(); _nonceRand = new Random();
_state = (int)DiscordClientState.Disconnected; _state = (int)DiscordClientState.Disconnected;
_status = UserStatus.Online; _status = UserStatus.Online;
@@ -155,7 +152,7 @@ namespace Discord
_api = new DiscordAPIClient(_config); _api = new DiscordAPIClient(_config);
if (Config.UseMessageQueue) if (Config.UseMessageQueue)
_pendingMessages = new ConcurrentQueue<Message>(); _pendingMessages = new ConcurrentQueue<MessageQueueItem>();
Connected += async (s, e) => Connected += async (s, e) =>
{ {
_api.CancelToken = _cancelToken; _api.CancelToken = _cancelToken;
@@ -393,7 +390,7 @@ namespace Discord
List<Task> tasks = new List<Task>(); List<Task> tasks = new List<Task>();
tasks.Add(_cancelToken.Wait()); tasks.Add(_cancelToken.Wait());
if (_config.UseMessageQueue) if (_config.UseMessageQueue)
tasks.Add(MessageQueueLoop()); tasks.Add(MessageQueueAsync());
Task[] tasksArray = tasks.ToArray(); Task[] tasksArray = tasks.ToArray();
Task firstTask = Task.WhenAny(tasksArray); Task firstTask = Task.WhenAny(tasksArray);
@@ -432,7 +429,7 @@ namespace Discord
{ {
if (Config.UseMessageQueue) if (Config.UseMessageQueue)
{ {
Message ignored; MessageQueueItem ignored;
while (_pendingMessages.TryDequeue(out ignored)) { } while (_pendingMessages.TryDequeue(out ignored)) { }
} }
@@ -668,9 +665,19 @@ namespace Discord
Message msg = null; Message msg = null;
bool isAuthor = data.Author.Id == _userId; bool isAuthor = data.Author.Id == _userId;
int nonce = 0;
if (data.Author.Id == _privateUser.Id && Config.UseMessageQueue)
{
if (data.Nonce != null && int.TryParse(data.Nonce, out nonce))
msg = _messages[nonce];
}
if (msg == null)
{
msg = _messages.GetOrAdd(data.Id, data.ChannelId, data.Author.Id);
nonce = 0;
}
if (msg == null)
msg = _messages.GetOrAdd(data.Id, data.ChannelId, data.Author.Id);
msg.Update(data); msg.Update(data);
if (Config.TrackActivity) if (Config.TrackActivity)
{ {
@@ -683,7 +690,15 @@ namespace Discord
} }
} }
RaiseMessageReceived(msg); //Remapped queued message
if (nonce != 0)
{
msg = _messages.Remap(nonce, data.Id);
msg.Id = data.Id;
}
msg.State = MessageState.Normal;
RaiseMessageReceived(msg);
if (Config.AckMessages && !isAuthor) if (Config.AckMessages && !isAuthor)
await _api.AckMessage(data.Id, data.ChannelId).ConfigureAwait(false); await _api.AckMessage(data.Id, data.ChannelId).ConfigureAwait(false);
@@ -694,9 +709,10 @@ namespace Discord
var data = e.Payload.ToObject<MessageUpdateEvent>(_webSocket.Serializer); var data = e.Payload.ToObject<MessageUpdateEvent>(_webSocket.Serializer);
var msg = _messages[data.Id]; var msg = _messages[data.Id];
if (msg != null) if (msg != null)
{ {
msg.Update(data); msg.Update(data);
RaiseMessageUpdated(msg); msg.State = MessageState.Normal;
RaiseMessageUpdated(msg);
} }
} }
break; break;

View File

@@ -75,7 +75,6 @@ namespace Discord.Net.Rest
{ {
var retryAfter = response.Headers var retryAfter = response.Headers
.FirstOrDefault(x => x.Name.Equals("Retry-After", StringComparison.OrdinalIgnoreCase)); .FirstOrDefault(x => x.Name.Equals("Retry-After", StringComparison.OrdinalIgnoreCase));
int milliseconds;
if (retryAfter != null) if (retryAfter != null)
{ {
await Task.Delay((int)retryAfter.Value); await Task.Delay((int)retryAfter.Value);