Split DiscordClient into DiscordBaseClient and DiscordClient. Several fixes.

This commit is contained in:
RogueException
2015-09-26 23:26:27 -03:00
parent a965ec4a23
commit a8ca994667
15 changed files with 887 additions and 786 deletions

View File

@@ -136,6 +136,12 @@
<Compile Include="..\Discord.Net\DiscordAPIClient.cs">
<Link>DiscordAPIClient.cs</Link>
</Compile>
<Compile Include="..\Discord.Net\DiscordBaseClient.cs">
<Link>DiscordBaseClient.cs</Link>
</Compile>
<Compile Include="..\Discord.Net\DiscordBaseClient.Events.cs">
<Link>DiscordBaseClient.Events.cs</Link>
</Compile>
<Compile Include="..\Discord.Net\DiscordClient.API.cs">
<Link>DiscordClient.API.cs</Link>
</Compile>

View File

@@ -0,0 +1,107 @@
using System;
namespace Discord
{
public enum LogMessageSeverity : byte
{
Error = 1,
Warning = 2,
Info = 3,
Verbose = 4,
Debug = 5
}
public enum LogMessageSource : byte
{
Unknown = 0,
Cache,
Client,
DataWebSocket,
MessageQueue,
Rest,
VoiceWebSocket,
}
public class DisconnectedEventArgs : EventArgs
{
public readonly bool WasUnexpected;
public readonly Exception Error;
internal DisconnectedEventArgs(bool wasUnexpected, Exception error)
{
WasUnexpected = wasUnexpected;
Error = error;
}
}
public sealed class LogMessageEventArgs : EventArgs
{
public LogMessageSeverity Severity { get; }
public LogMessageSource Source { get; }
public string Message { get; }
internal LogMessageEventArgs(LogMessageSeverity severity, LogMessageSource source, string msg)
{
Severity = severity;
Source = source;
Message = msg;
}
}
public sealed class VoicePacketEventArgs
{
public string UserId { get; }
public string ChannelId { get; }
public byte[] Buffer { get; }
public int Offset { get; }
public int Count { get; }
internal VoicePacketEventArgs(string userId, string channelId, byte[] buffer, int offset, int count)
{
UserId = userId;
Buffer = buffer;
Offset = offset;
Count = count;
}
}
public abstract partial class DiscordBaseClient
{
public event EventHandler Connected;
private void RaiseConnected()
{
if (Connected != null)
RaiseEvent(nameof(Connected), () => Connected(this, EventArgs.Empty));
}
public event EventHandler<DisconnectedEventArgs> Disconnected;
private void RaiseDisconnected(DisconnectedEventArgs e)
{
if (Disconnected != null)
RaiseEvent(nameof(Disconnected), () => Disconnected(this, e));
}
public event EventHandler<LogMessageEventArgs> LogMessage;
internal void RaiseOnLog(LogMessageSeverity severity, LogMessageSource source, string message)
{
if (LogMessage != null)
RaiseEvent(nameof(LogMessage), () => LogMessage(this, new LogMessageEventArgs(severity, source, message)));
}
public event EventHandler VoiceConnected;
private void RaiseVoiceConnected()
{
if (VoiceConnected != null)
RaiseEvent(nameof(VoiceConnected), () => VoiceConnected(this, EventArgs.Empty));
}
public event EventHandler<DisconnectedEventArgs> VoiceDisconnected;
private void RaiseVoiceDisconnected(DisconnectedEventArgs e)
{
if (VoiceDisconnected != null)
RaiseEvent(nameof(VoiceDisconnected), () => VoiceDisconnected(this, e));
}
public event EventHandler<VoicePacketEventArgs> OnVoicePacket;
internal void RaiseOnVoicePacket(VoicePacketEventArgs e)
{
if (OnVoicePacket != null)
OnVoicePacket(this, e);
}
}
}

View File

@@ -0,0 +1,272 @@
using Discord.API;
using Discord.Collections;
using Discord.Helpers;
using Discord.WebSockets.Data;
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using VoiceWebSocket = Discord.WebSockets.Voice.VoiceWebSocket;
namespace Discord
{
public enum DiscordClientState : byte
{
Disconnected,
Connecting,
Connected,
Disconnecting
}
/// <summary> Provides a barebones connection to the Discord service </summary>
public partial class DiscordBaseClient
{
internal readonly DataWebSocket _dataSocket;
internal readonly VoiceWebSocket _voiceSocket;
protected readonly ManualResetEvent _disconnectedEvent;
protected readonly ManualResetEventSlim _connectedEvent;
private Task _runTask;
private string _gateway, _token;
protected ExceptionDispatchInfo _disconnectReason;
private bool _wasDisconnectUnexpected;
/// <summary> Returns the id of the current logged-in user. </summary>
public string CurrentUserId => _currentUserId;
private string _currentUserId;
/*/// <summary> Returns the server this user is currently connected to for voice. </summary>
public string CurrentVoiceServerId => _voiceSocket.CurrentServerId;*/
/// <summary> Returns the current connection state of this client. </summary>
public DiscordClientState State => (DiscordClientState)_state;
private int _state;
/// <summary> Returns the configuration object used to make this client. Note that this object cannot be edited directly - to change the configuration of this client, use the DiscordClient(DiscordClientConfig config) constructor. </summary>
public DiscordClientConfig Config => _config;
protected readonly DiscordClientConfig _config;
public CancellationToken CancelToken => _cancelToken;
private CancellationTokenSource _cancelTokenSource;
private CancellationToken _cancelToken;
/// <summary> Initializes a new instance of the DiscordClient class. </summary>
public DiscordBaseClient(DiscordClientConfig config = null)
{
_config = config ?? new DiscordClientConfig();
_config.Lock();
_state = (int)DiscordClientState.Disconnected;
_cancelToken = new CancellationToken(true);
_disconnectedEvent = new ManualResetEvent(true);
_connectedEvent = new ManualResetEventSlim(false);
_dataSocket = new DataWebSocket(this);
_dataSocket.Connected += (s, e) => { if (_state == (int)DiscordClientState.Connecting) CompleteConnect(); };
_dataSocket.Disconnected += async (s, e) =>
{
RaiseDisconnected(e);
if (e.WasUnexpected)
await _dataSocket.Reconnect(_token);
};
if (Config.VoiceMode != DiscordVoiceMode.Disabled)
{
_voiceSocket = new VoiceWebSocket(this);
_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
_voiceSocket.Disconnected += async (s, e) =>
{
RaiseVoiceDisconnected(e);
if (e.WasUnexpected)
await _voiceSocket.Reconnect();
};
}
_dataSocket.LogMessage += (s, e) => RaiseOnLog(e.Severity, LogMessageSource.DataWebSocket, e.Message);
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
_voiceSocket.LogMessage += (s, e) => RaiseOnLog(e.Severity, LogMessageSource.VoiceWebSocket, e.Message);
if (_config.LogLevel >= LogMessageSeverity.Info)
{
_dataSocket.Connected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.DataWebSocket, "Connected");
_dataSocket.Disconnected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.DataWebSocket, "Disconnected");
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
{
_voiceSocket.Connected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.VoiceWebSocket, "Connected");
_voiceSocket.Disconnected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.VoiceWebSocket, "Disconnected");
}
}
_dataSocket.ReceivedEvent += (s, e) => OnReceivedEvent(e);
}
//Connection
protected async Task<string> Connect(string gateway, string token)
{
try
{
_state = (int)DiscordClientState.Connecting;
_disconnectedEvent.Reset();
_gateway = gateway;
_token = token;
_cancelTokenSource = new CancellationTokenSource();
_cancelToken = _cancelTokenSource.Token;
_dataSocket.Host = gateway;
_dataSocket.ParentCancelToken = _cancelToken;
await _dataSocket.Login(token).ConfigureAwait(false);
_runTask = RunTasks();
try
{
//Cancel if either Disconnect is called, data socket errors or timeout is reached
var cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelToken, _dataSocket.CancelToken).Token;
_connectedEvent.Wait(cancelToken);
}
catch (OperationCanceledException)
{
_dataSocket.ThrowError(); //Throws data socket's internal error if any occured
throw;
}
//_state = (int)DiscordClientState.Connected;
_token = token;
return token;
}
catch
{
await Disconnect().ConfigureAwait(false);
throw;
}
}
protected void CompleteConnect()
{
_state = (int)DiscordClientState.Connected;
_connectedEvent.Set();
RaiseConnected();
}
/// <summary> Disconnects from the Discord server, canceling any pending requests. </summary>
public Task Disconnect() => DisconnectInternal(new Exception("Disconnect was requested by user."), isUnexpected: false);
protected Task DisconnectInternal(Exception ex = null, bool isUnexpected = true, bool skipAwait = false)
{
int oldState;
bool hasWriterLock;
//If in either connecting or connected state, get a lock by being the first to switch to disconnecting
oldState = Interlocked.CompareExchange(ref _state, (int)DiscordClientState.Disconnecting, (int)DiscordClientState.Connecting);
if (oldState == (int)DiscordClientState.Disconnected) return TaskHelper.CompletedTask; //Already disconnected
hasWriterLock = oldState == (int)DiscordClientState.Connecting; //Caused state change
if (!hasWriterLock)
{
oldState = Interlocked.CompareExchange(ref _state, (int)DiscordClientState.Disconnecting, (int)DiscordClientState.Connected);
if (oldState == (int)DiscordClientState.Disconnected) return TaskHelper.CompletedTask; //Already disconnected
hasWriterLock = oldState == (int)DiscordClientState.Connected; //Caused state change
}
if (hasWriterLock)
{
_wasDisconnectUnexpected = isUnexpected;
_disconnectReason = ex != null ? ExceptionDispatchInfo.Capture(ex) : null;
_cancelTokenSource.Cancel();
/*if (_state == DiscordClientState.Connecting) //_runTask was never made
await Cleanup().ConfigureAwait(false);*/
}
if (!skipAwait)
return _runTask ?? TaskHelper.CompletedTask;
else
return TaskHelper.CompletedTask;
}
private async Task RunTasks()
{
Task[] tasks = Run();
Task firstTask = Task.WhenAny(tasks);
Task allTasks = Task.WhenAll(tasks);
//Wait until the first task ends/errors and capture the error
try { await firstTask.ConfigureAwait(false); }
catch (Exception ex) { await DisconnectInternal(ex: ex, skipAwait: true).ConfigureAwait(false); }
//Ensure all other tasks are signaled to end.
await DisconnectInternal(skipAwait: true);
//Wait for the remaining tasks to complete
try { await allTasks.ConfigureAwait(false); }
catch { }
//Start cleanup
var wasDisconnectUnexpected = _wasDisconnectUnexpected;
_wasDisconnectUnexpected = false;
await Cleanup().ConfigureAwait(false);
if (!wasDisconnectUnexpected)
{
_state = (int)DiscordClientState.Disconnected;
_disconnectedEvent.Set();
}
_connectedEvent.Reset();
_runTask = null;
}
protected virtual Task[] Run()
{
return new Task[] { _cancelToken.Wait() };
}
protected virtual async Task Cleanup()
{
await _dataSocket.Disconnect().ConfigureAwait(false);
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
await _voiceSocket.Disconnect().ConfigureAwait(false);
_currentUserId = null;
_gateway = null;
_token = null;
}
//Helpers
/// <summary> Blocking call that will not return until client has been stopped. This is mainly intended for use in console applications. </summary>
public void Block()
{
_disconnectedEvent.WaitOne();
}
protected void CheckReady(bool checkVoice = false)
{
switch (_state)
{
case (int)DiscordClientState.Disconnecting:
throw new InvalidOperationException("The client is disconnecting.");
case (int)DiscordClientState.Disconnected:
throw new InvalidOperationException("The client is not connected to Discord");
case (int)DiscordClientState.Connecting:
throw new InvalidOperationException("The client is connecting.");
}
if (checkVoice && _config.VoiceMode == DiscordVoiceMode.Disabled)
throw new InvalidOperationException("Voice is not enabled for this client.");
}
protected void RaiseEvent(string name, Action action)
{
try { action(); }
catch (Exception ex)
{
RaiseOnLog(LogMessageSeverity.Error, LogMessageSource.Client,
$"{name} event handler raised an exception: ${ex.GetBaseException().Message}");
}
}
internal virtual Task OnReceivedEvent(WebSocketEventEventArgs e)
{
if (e.Type == "READY")
_currentUserId = e.Payload["user"].Value<string>("id");
return TaskHelper.CompletedTask;
}
}
}

View File

@@ -1,5 +1,4 @@
using Discord.API;
using Discord.Helpers;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -98,7 +97,7 @@ namespace Discord
channel = user.PrivateChannel;
if (channel == null)
{
var response = await _api.CreatePMChannel(_currentUserId, userId).ConfigureAwait(false);
var response = await _api.CreatePMChannel(CurrentUserId, userId).ConfigureAwait(false);
channel = _channels.GetOrAdd(response.Id, response.GuildId, response.Recipient?.Id);
channel.Update(response);
}
@@ -266,13 +265,13 @@ namespace Discord
var nonce = GenerateNonce();
if (_config.UseMessageQueue)
{
var msg = _messages.GetOrAdd("nonce_" + nonce, channel.Id, _currentUserId);
var msg = _messages.GetOrAdd("nonce_" + nonce, channel.Id, CurrentUserId);
var currentMember = _members[msg.UserId, channel.ServerId];
msg.Update(new API.Message
{
Content = blockText,
Timestamp = DateTime.UtcNow,
Author = new UserReference { Avatar = currentMember.AvatarId, Discriminator = currentMember.Discriminator, Id = _currentUserId, Username = currentMember.Name },
Author = new UserReference { Avatar = currentMember.AvatarId, Discriminator = currentMember.Discriminator, Id = CurrentUserId, Username = currentMember.Name },
ChannelId = channel.Id,
IsTextToSpeech = isTextToSpeech
});
@@ -513,13 +512,13 @@ namespace Discord
}
//Profile
public Task<EditProfileResponse> EditProfile(string currentPassword,
public Task<EditProfileResponse> EditProfile(string currentPassword = "",
string username = null, string email = null, string password = null,
AvatarImageType avatarType = AvatarImageType.Png, byte[] avatar = null)
{
if (currentPassword == null) throw new ArgumentNullException(nameof(currentPassword));
return _api.EditProfile(currentPassword, username: username, email: email, password: password,
return _api.EditProfile(currentPassword: currentPassword, username: username, email: email, password: password,
avatarType: avatarType, avatar: avatar);
}

View File

@@ -2,50 +2,6 @@
namespace Discord
{
public enum LogMessageSeverity : byte
{
Error = 1,
Warning = 2,
Info = 3,
Verbose = 4,
Debug = 5
}
public enum LogMessageSource : byte
{
Unknown = 0,
Cache,
Client,
DataWebSocket,
MessageQueue,
Rest,
VoiceWebSocket,
}
public class DisconnectedEventArgs : EventArgs
{
public readonly bool WasUnexpected;
public readonly Exception Error;
internal DisconnectedEventArgs(bool wasUnexpected, Exception error)
{
WasUnexpected = wasUnexpected;
Error = error;
}
}
public sealed class LogMessageEventArgs : EventArgs
{
public LogMessageSeverity Severity { get; }
public LogMessageSource Source { get; }
public string Message { get; }
internal LogMessageEventArgs(LogMessageSeverity severity, LogMessageSource source, string msg)
{
Severity = severity;
Source = source;
Message = msg;
}
}
public sealed class ServerEventArgs : EventArgs
{
public Server Server { get; }
@@ -148,45 +104,9 @@ namespace Discord
IsSpeaking = isSpeaking;
}
}
public sealed class VoicePacketEventArgs
{
public string UserId { get; }
public string ChannelId { get; }
public byte[] Buffer { get; }
public int Offset { get; }
public int Count { get; }
internal VoicePacketEventArgs(string userId, string channelId, byte[] buffer, int offset, int count)
{
UserId = userId;
Buffer = buffer;
Offset = offset;
Count = count;
}
}
public partial class DiscordClient
{
//General
public event EventHandler Connected;
private void RaiseConnected()
{
if (Connected != null)
RaiseEvent(nameof(Connected), () => Connected(this, EventArgs.Empty));
}
public event EventHandler<DisconnectedEventArgs> Disconnected;
private void RaiseDisconnected(DisconnectedEventArgs e)
{
if (Disconnected != null)
RaiseEvent(nameof(Disconnected), () => Disconnected(this, e));
}
public event EventHandler<LogMessageEventArgs> LogMessage;
internal void RaiseOnLog(LogMessageSeverity severity, LogMessageSource source, string message)
{
if (LogMessage != null)
RaiseEvent(nameof(LogMessage), () => LogMessage(this, new LogMessageEventArgs(severity, source, message)));
}
//Server
public event EventHandler<ServerEventArgs> ServerCreated;
private void RaiseServerCreated(Server server)
@@ -342,26 +262,5 @@ namespace Discord
if (UserIsSpeaking != null)
RaiseEvent(nameof(UserIsSpeaking), () => UserIsSpeaking(this, new UserIsSpeakingEventArgs(member, isSpeaking)));
}
//Voice
public event EventHandler VoiceConnected;
private void RaiseVoiceConnected()
{
if (VoiceConnected != null)
RaiseEvent(nameof(UserIsSpeaking), () => VoiceConnected(this, EventArgs.Empty));
}
public event EventHandler<DisconnectedEventArgs> VoiceDisconnected;
private void RaiseVoiceDisconnected(DisconnectedEventArgs e)
{
if (VoiceDisconnected != null)
RaiseEvent(nameof(UserIsSpeaking), () => VoiceDisconnected(this, e));
}
public event EventHandler<VoicePacketEventArgs> OnVoicePacket;
internal void RaiseOnVoicePacket(VoicePacketEventArgs e)
{
if (OnVoicePacket != null)
OnVoicePacket(this, e);
}
}
}

View File

@@ -1,6 +1,7 @@
using Discord.Helpers;
using Discord.WebSockets;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Discord
@@ -8,30 +9,31 @@ namespace Discord
public partial class DiscordClient
{
public Task JoinVoiceServer(Channel channel)
=> JoinVoiceServer(channel?.Server, channel);
public Task JoinVoiceServer(string serverId, string channelId)
=> JoinVoiceServer(_servers[serverId], _channels[channelId]);
=> JoinVoiceServer(channel?.ServerId, channel?.Id);
public Task JoinVoiceServer(Server server, string channelId)
=> JoinVoiceServer(server, _channels[channelId]);
private async Task JoinVoiceServer(Server server, Channel channel)
=> JoinVoiceServer(server?.Id, channelId);
public async Task JoinVoiceServer(string serverId, string channelId)
{
CheckReady(checkVoice: true);
if (server == null) throw new ArgumentNullException(nameof(server));
if (channel == null) throw new ArgumentNullException(nameof(channel));
if (serverId == null) throw new ArgumentNullException(nameof(serverId));
if (channelId == null) throw new ArgumentNullException(nameof(channelId));
await LeaveVoiceServer().ConfigureAwait(false);
_voiceSocket.SetChannel(server, channel);
_dataSocket.SendJoinVoice(server.Id, channel.Id);
_voiceSocket.SetChannel(serverId, channelId);
_dataSocket.SendJoinVoice(serverId, channelId);
CancellationTokenSource tokenSource = new CancellationTokenSource();
try
{
await Task.Run(() => _voiceSocket.WaitForConnection())
.Timeout(_config.ConnectionTimeout)
await Task.Run(() => _voiceSocket.WaitForConnection(tokenSource.Token))
.Timeout(_config.ConnectionTimeout, tokenSource)
.ConfigureAwait(false);
}
catch (TaskCanceledException)
catch (TimeoutException)
{
tokenSource.Cancel();
await LeaveVoiceServer().ConfigureAwait(false);
throw;
}
}
public async Task LeaveVoiceServer()
@@ -40,11 +42,11 @@ namespace Discord
if (_voiceSocket.State != WebSocketState.Disconnected)
{
var server = _voiceSocket.CurrentVoiceServer;
if (server != null)
var serverId = _voiceSocket.CurrentServerId;
if (serverId != null)
{
await _voiceSocket.Disconnect().ConfigureAwait(false);
_dataSocket.SendLeaveVoice(server.Id);
_dataSocket.SendLeaveVoice(serverId);
}
}
}

View File

@@ -7,54 +7,22 @@ using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using VoiceWebSocket = Discord.WebSockets.Voice.VoiceWebSocket;
namespace Discord
{
public enum DiscordClientState : byte
{
Disconnected,
Connecting,
Connected,
Disconnecting
}
/// <summary> Provides a connection to the DiscordApp service. </summary>
public partial class DiscordClient
public partial class DiscordClient : DiscordBaseClient
{
protected readonly DiscordAPIClient _api;
private readonly Random _rand;
private readonly DiscordAPIClient _api;
private readonly DataWebSocket _dataSocket;
private readonly VoiceWebSocket _voiceSocket;
private readonly ConcurrentQueue<Message> _pendingMessages;
private readonly ManualResetEvent _disconnectedEvent;
private readonly ManualResetEventSlim _connectedEvent;
private readonly JsonSerializer _serializer;
private Task _runTask;
private string _token;
private readonly ConcurrentQueue<Message> _pendingMessages;
protected ExceptionDispatchInfo _disconnectReason;
private bool _wasDisconnectUnexpected;
/// <summary> Returns the id of the current logged-in user. </summary>
public string CurrentUserId => _currentUserId;
private string _currentUserId;
/// <summary> Returns the current logged-in user. </summary>
public User CurrentUser => _currentUser;
private User _currentUser;
/// <summary> Returns the server this user is currently connected to for voice. </summary>
public Server CurrentVoiceServer => _voiceSocket.CurrentVoiceServer;
/// <summary> Returns the current connection state of this client. </summary>
public DiscordClientState State => (DiscordClientState)_state;
private int _state;
/// <summary> Returns the configuration object used to make this client. Note that this object cannot be edited directly - to change the configuration of this client, use the DiscordClient(DiscordClientConfig config) constructor. </summary>
public DiscordClientConfig Config => _config;
private readonly DiscordClientConfig _config;
/// <summary> Returns a collection of all channels this client is a member of. </summary>
public Channels Channels => _channels;
@@ -76,65 +44,14 @@ namespace Discord
public Users Users => _users;
private readonly Users _users;
public CancellationToken CancelToken => _cancelToken;
private CancellationTokenSource _cancelTokenSource;
private CancellationToken _cancelToken;
/// <summary> Initializes a new instance of the DiscordClient class. </summary>
public DiscordClient(DiscordClientConfig config = null)
: base(config)
{
_config = config ?? new DiscordClientConfig();
_config.Lock();
_state = (int)DiscordClientState.Disconnected;
_cancelToken = new CancellationToken(true);
_disconnectedEvent = new ManualResetEvent(true);
_connectedEvent = new ManualResetEventSlim(false);
_rand = new Random();
_api = new DiscordAPIClient(_config.LogLevel, _config.APITimeout);
_dataSocket = new DataWebSocket(this);
_dataSocket.Connected += (s, e) => { if (_state == (int)DiscordClientState.Connecting) CompleteConnect(); };
_dataSocket.Disconnected += async (s, e) =>
{
RaiseDisconnected(e);
if (e.WasUnexpected)
await _dataSocket.Reconnect(_token);
};
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
{
_voiceSocket = new VoiceWebSocket(this);
_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
_voiceSocket.Disconnected += async (s, e) =>
{
foreach (var member in _members)
{
if (member.IsSpeaking)
{
member.IsSpeaking = false;
RaiseUserIsSpeaking(member, false);
}
}
RaiseVoiceDisconnected(e);
if (e.WasUnexpected)
await _voiceSocket.Reconnect();
};
_voiceSocket.IsSpeaking += (s, e) =>
{
if (_voiceSocket.State == WebSocketState.Connected)
{
var member = _members[e.UserId, _voiceSocket.CurrentVoiceServer.Id];
bool value = e.IsSpeaking;
if (member.IsSpeaking != value)
{
member.IsSpeaking = value;
RaiseUserIsSpeaking(member, value);
if (_config.TrackActivity)
member.UpdateActivity();
}
}
};
}
if (_config.UseMessageQueue)
_pendingMessages = new ConcurrentQueue<Message>();
object cacheLock = new object();
_channels = new Channels(this, cacheLock);
@@ -144,20 +61,38 @@ namespace Discord
_servers = new Servers(this, cacheLock);
_users = new Users(this, cacheLock);
_dataSocket.LogMessage += (s, e) => RaiseOnLog(e.Severity, LogMessageSource.DataWebSocket, e.Message);
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
_voiceSocket.LogMessage += (s, e) => RaiseOnLog(e.Severity, LogMessageSource.VoiceWebSocket, e.Message);
if (_config.LogLevel >= LogMessageSeverity.Info)
if (Config.VoiceMode != DiscordVoiceMode.Disabled)
{
_dataSocket.Connected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.DataWebSocket, "Connected");
_dataSocket.Disconnected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.DataWebSocket, "Disconnected");
//_dataSocket.ReceivedEvent += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.DataWebSocket, $"Received {e.Type}");
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
this.VoiceDisconnected += (s, e) =>
{
_voiceSocket.Connected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.VoiceWebSocket, "Connected");
_voiceSocket.Disconnected += (s, e) => RaiseOnLog(LogMessageSeverity.Info, LogMessageSource.VoiceWebSocket, "Disconnected");
foreach (var member in _members)
{
if (member.IsSpeaking)
{
member.IsSpeaking = false;
RaiseUserIsSpeaking(member, false);
}
}
};
_voiceSocket.IsSpeaking += (s, e) =>
{
if (_voiceSocket.State == WebSocketState.Connected)
{
var member = _members[e.UserId, _voiceSocket.CurrentServerId];
bool value = e.IsSpeaking;
if (member.IsSpeaking != value)
{
member.IsSpeaking = value;
RaiseUserIsSpeaking(member, value);
if (Config.TrackActivity)
member.UpdateActivity();
}
}
};
}
this.Connected += (s,e) => _api.CancelToken = CancelToken;
if (_config.LogLevel >= LogMessageSeverity.Verbose)
{
bool isDebug = _config.LogLevel >= LogMessageSeverity.Debug;
@@ -270,16 +205,118 @@ namespace Discord
_serializer.CheckAdditionalContent = true;
_serializer.MissingMemberHandling = MissingMemberHandling.Error;
#endif
}
_dataSocket.ReceivedEvent += async (s, e) =>
/// <summary> Connects to the Discord server with the provided email and password. </summary>
/// <returns> Returns a token for future connections. </returns>
public new async Task<string> Connect(string email, string password)
{
if (State != DiscordClientState.Disconnected)
await Disconnect().ConfigureAwait(false);
string token;
try
{
var response = await _api.Login(email, password)
.Timeout(5000);
token = response.Token;
if (_config.LogLevel >= LogMessageSeverity.Verbose)
RaiseOnLog(LogMessageSeverity.Verbose, LogMessageSource.Client, "Login successful, got token.");
}
catch (TaskCanceledException) { throw new TimeoutException(); }
await Connect(token);
return token;
}
/// <summary> Connects to the Discord server with the provided token. </summary>
public async Task Connect(string token)
{
if (State != (int)DiscordClientState.Disconnected)
await Disconnect().ConfigureAwait(false);
_api.Token = token;
string gateway = (await _api.Gateway().ConfigureAwait(false)).Url;
if (_config.LogLevel >= LogMessageSeverity.Verbose)
RaiseOnLog(LogMessageSeverity.Verbose, LogMessageSource.Client, $"Websocket endpoint: {gateway}");
await base.Connect(gateway, token)
.Timeout(_config.ConnectionTimeout)
.ConfigureAwait(false);
}
protected override async Task Cleanup()
{
await base.Cleanup().ConfigureAwait(false);
if (_config.UseMessageQueue)
{
Message ignored;
while (_pendingMessages.TryDequeue(out ignored)) { }
}
_channels.Clear();
_members.Clear();
_messages.Clear();
_roles.Clear();
_servers.Clear();
_users.Clear();
_currentUser = null;
}
//Experimental
private Task MessageQueueLoop()
{
var cancelToken = CancelToken;
int interval = _config.MessageQueueInterval;
return Task.Run(async () =>
{
Message msg;
while (!cancelToken.IsCancellationRequested)
{
while (_pendingMessages.TryDequeue(out msg))
{
bool hasFailed = false;
SendMessageResponse response = null;
try
{
response = await _api.SendMessage(msg.ChannelId, msg.RawText, msg.MentionIds, msg.Nonce, msg.IsTTS).ConfigureAwait(false);
}
catch (WebException) { break; }
catch (HttpException) { hasFailed = true; }
if (!hasFailed)
{
_messages.Remap(msg.Id, response.Id);
msg.Id = response.Id;
msg.Update(response);
}
msg.IsQueued = false;
msg.HasFailed = hasFailed;
RaiseMessageSent(msg);
}
await Task.Delay(interval).ConfigureAwait(false);
}
});
}
private string GenerateNonce()
{
lock (_rand)
return _rand.Next().ToString();
}
internal override async Task OnReceivedEvent(WebSocketEventEventArgs e)
{
await base.OnReceivedEvent(e);
switch (e.Type)
{
//Global
case "READY": //Resync
{
var data = e.Payload.ToObject<ReadyEvent>(_serializer);
_currentUserId = data.User.Id;
_currentUser = _users.GetOrAdd(data.User.Id);
_currentUser.Update(data.User);
foreach (var model in data.Guilds)
@@ -294,6 +331,9 @@ namespace Discord
var channel = _channels.GetOrAdd(model.Id, null, user.Id);
channel.Update(model);
}
/*foreach (var server in _servers)
_dataSocket.SendJoinVoice(server.Id, System.Linq.Enumerable.First(server.ChannelIds));*/
}
break;
case "RESUMED":
@@ -452,7 +492,7 @@ namespace Discord
var data = e.Payload.ToObject<MessageCreateEvent>(_serializer);
Message msg = null;
bool wasLocal = _config.UseMessageQueue && data.Author.Id == _currentUserId && data.Nonce != null;
bool wasLocal = _config.UseMessageQueue && data.Author.Id == CurrentUserId && data.Nonce != null;
if (wasLocal)
{
msg = _messages.Remap("nonce" + data.Nonce, data.Id);
@@ -587,13 +627,13 @@ namespace Discord
case "VOICE_SERVER_UPDATE":
{
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_serializer);
if (data.GuildId == _voiceSocket.CurrentVoiceServer.Id)
if (data.GuildId == _voiceSocket.CurrentServerId)
{
var server = _servers[data.GuildId];
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
{
_voiceSocket.Host = "wss://" + data.Endpoint.Split(':')[0];
await _voiceSocket.Login(_currentUserId, _dataSocket.SessionId, data.Token, _cancelToken).ConfigureAwait(false);
await _voiceSocket.Login(CurrentUserId, _dataSocket.SessionId, data.Token, CancelToken).ConfigureAwait(false);
}
}
}
@@ -622,252 +662,6 @@ namespace Discord
RaiseOnLog(LogMessageSeverity.Warning, LogMessageSource.DataWebSocket, $"Unknown message type: {e.Type}");
break;
}
};
}
//Connection
/// <summary> Connects to the Discord server with the provided token. </summary>
public async Task Connect(string token)
{
if (_state != (int)DiscordClientState.Disconnected)
await Disconnect().ConfigureAwait(false);
_cancelTokenSource = new CancellationTokenSource();
_cancelToken = _cancelTokenSource.Token;
_api.CancelToken = _cancelToken;
await ConnectInternal(token)
.Timeout(_config.ConnectionTimeout)
.ConfigureAwait(false);
}
/// <summary> Connects to the Discord server with the provided email and password. </summary>
/// <returns> Returns a token for future connections. </returns>
public async Task<string> Connect(string email, string password)
{
if (_state != (int)DiscordClientState.Disconnected)
await Disconnect().ConfigureAwait(false);
_cancelTokenSource = new CancellationTokenSource();
_cancelToken = _cancelTokenSource.Token;
_api.CancelToken = _cancelToken;
string token;
try
{
var response = await _api.Login(email, password).ConfigureAwait(false);
token = response.Token;
if (_config.LogLevel >= LogMessageSeverity.Verbose)
RaiseOnLog(LogMessageSeverity.Verbose, LogMessageSource.Client, "Login successful, got token.");
}
catch (TaskCanceledException) { throw new TimeoutException(); }
return await ConnectInternal(token)
.Timeout(_config.ConnectionTimeout)
.ConfigureAwait(false);
}
private async Task<string> ConnectInternal(string token)
{
try
{
_disconnectedEvent.Reset();
_api.Token = token;
_token = token;
_state = (int)DiscordClientState.Connecting;
string url = (await _api.Gateway().ConfigureAwait(false)).Url;
if (_config.LogLevel >= LogMessageSeverity.Verbose)
RaiseOnLog(LogMessageSeverity.Verbose, LogMessageSource.Client, $"Websocket endpoint: {url}");
_dataSocket.Host = url;
_dataSocket.ParentCancelToken = _cancelToken;
await _dataSocket.Login(token).ConfigureAwait(false);
_runTask = RunTasks();
try
{
//Cancel if either Disconnect is called, data socket errors or timeout is reached
var cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelToken, _dataSocket.CancelToken).Token;
_connectedEvent.Wait(cancelToken);
}
catch (OperationCanceledException)
{
_dataSocket.ThrowError(); //Throws data socket's internal error if any occured
throw;
}
//_state = (int)DiscordClientState.Connected;
_token = token;
return token;
}
catch
{
await Disconnect().ConfigureAwait(false);
throw;
}
}
protected void CompleteConnect()
{
_state = (int)DiscordClientState.Connected;
_connectedEvent.Set();
RaiseConnected();
}
/// <summary> Disconnects from the Discord server, canceling any pending requests. </summary>
public Task Disconnect() => DisconnectInternal(new Exception("Disconnect was requested by user."), isUnexpected: false);
protected Task DisconnectInternal(Exception ex = null, bool isUnexpected = true, bool skipAwait = false)
{
int oldState;
bool hasWriterLock;
//If in either connecting or connected state, get a lock by being the first to switch to disconnecting
oldState = Interlocked.CompareExchange(ref _state, (int)DiscordClientState.Disconnecting, (int)DiscordClientState.Connecting);
if (oldState == (int)DiscordClientState.Disconnected) return TaskHelper.CompletedTask; //Already disconnected
hasWriterLock = oldState == (int)DiscordClientState.Connecting; //Caused state change
if (!hasWriterLock)
{
oldState = Interlocked.CompareExchange(ref _state, (int)DiscordClientState.Disconnecting, (int)DiscordClientState.Connected);
if (oldState == (int)DiscordClientState.Disconnected) return TaskHelper.CompletedTask; //Already disconnected
hasWriterLock = oldState == (int)DiscordClientState.Connected; //Caused state change
}
if (hasWriterLock)
{
_wasDisconnectUnexpected = isUnexpected;
_disconnectReason = ex != null ? ExceptionDispatchInfo.Capture(ex) : null;
_cancelTokenSource.Cancel();
}
if (!skipAwait)
return _runTask ?? TaskHelper.CompletedTask;
else
return TaskHelper.CompletedTask;
}
private async Task RunTasks()
{
Task task;
if (_config.UseMessageQueue)
task = MessageQueueLoop();
else
task = _cancelToken.Wait();
try { await task.ConfigureAwait(false); }
catch (Exception ex) { await DisconnectInternal(ex, skipAwait: true).ConfigureAwait(false); }
//When the first task ends, make sure the rest do too
await DisconnectInternal(skipAwait: true);
await Cleanup().ConfigureAwait(false);
_runTask = null;
}
private async Task Cleanup()
{
var wasDisconnectUnexpected = _wasDisconnectUnexpected;
_wasDisconnectUnexpected = false;
await _dataSocket.Disconnect().ConfigureAwait(false);
if (_config.VoiceMode != DiscordVoiceMode.Disabled)
await _voiceSocket.Disconnect().ConfigureAwait(false);
if (_config.UseMessageQueue)
{
Message ignored;
while (_pendingMessages.TryDequeue(out ignored)) { }
}
_channels.Clear();
_members.Clear();
_messages.Clear();
_roles.Clear();
_servers.Clear();
_users.Clear();
_currentUser = null;
_currentUserId = null;
_token = null;
if (!wasDisconnectUnexpected)
{
_state = (int)DiscordClientState.Disconnected;
_disconnectedEvent.Set();
}
_connectedEvent.Reset();
}
//Helpers
/// <summary> Blocking call that will not return until client has been stopped. This is mainly intended for use in console applications. </summary>
public void Block()
{
_disconnectedEvent.WaitOne();
}
private void CheckReady(bool checkVoice = false)
{
switch (_state)
{
case (int)DiscordClientState.Disconnecting:
throw new InvalidOperationException("The client is disconnecting.");
case (int)DiscordClientState.Disconnected:
throw new InvalidOperationException("The client is not connected to Discord");
case (int)DiscordClientState.Connecting:
throw new InvalidOperationException("The client is connecting.");
}
if (checkVoice && _config.VoiceMode == DiscordVoiceMode.Disabled)
throw new InvalidOperationException("Voice is not enabled for this client.");
}
private void RaiseEvent(string name, Action action)
{
try { action(); }
catch (Exception ex)
{
RaiseOnLog(LogMessageSeverity.Error, LogMessageSource.Client,
$"{name} event handler raised an exception: ${ex.GetBaseException().Message}");
}
}
//Experimental
private Task MessageQueueLoop()
{
var cancelToken = _cancelToken;
int interval = _config.MessageQueueInterval;
return Task.Run(async () =>
{
Message msg;
while (!cancelToken.IsCancellationRequested)
{
while (_pendingMessages.TryDequeue(out msg))
{
bool hasFailed = false;
SendMessageResponse response = null;
try
{
response = await _api.SendMessage(msg.ChannelId, msg.RawText, msg.MentionIds, msg.Nonce, msg.IsTTS).ConfigureAwait(false);
}
catch (WebException) { break; }
catch (HttpException) { hasFailed = true; }
if (!hasFailed)
{
_messages.Remap(msg.Id, response.Id);
msg.Id = response.Id;
msg.Update(response);
}
msg.IsQueued = false;
msg.HasFailed = hasFailed;
RaiseMessageSent(msg);
}
await Task.Delay(interval).ConfigureAwait(false);
}
});
}
private string GenerateNonce()
{
lock (_rand)
return _rand.Next().ToString();
}
}
}

View File

@@ -1,4 +1,6 @@
using System.Threading.Tasks;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Helpers
{
@@ -32,5 +34,29 @@ namespace Discord.Helpers
else
return await self.ConfigureAwait(false);
}
public static async Task Timeout(this Task self, int milliseconds, CancellationTokenSource cancelToken)
{
try
{
cancelToken.CancelAfter(milliseconds);
await self;
}
catch (OperationCanceledException)
{
throw new TimeoutException();
}
}
public static async Task<T> Timeout<T>(this Task<T> self, int milliseconds, CancellationTokenSource cancelToken)
{
try
{
cancelToken.CancelAfter(milliseconds);
return await self;
}
catch (OperationCanceledException)
{
throw new TimeoutException();
}
}
}
}

View File

@@ -17,7 +17,6 @@ namespace Discord
private readonly DiscordClient _client;
private ConcurrentDictionary<string, bool> _messages;
private ConcurrentDictionary<uint, string> _ssrcMapping;
/// <summary> Returns the unique identifier for this channel. </summary>
public string Id { get; }
@@ -70,8 +69,6 @@ namespace Discord
{
Name = model.Name;
Type = model.Type;
if (Type == ChannelTypes.Voice && _ssrcMapping == null)
_ssrcMapping = new ConcurrentDictionary<uint, string>();
}
internal void Update(API.ChannelInfo model)
{
@@ -104,12 +101,5 @@ namespace Discord
bool ignored;
return _messages.TryRemove(messageId, out ignored);
}
internal string GetUserId(uint ssrc)
{
string userId = null;
_ssrcMapping.TryGetValue(ssrc, out userId);
return userId;
}
}
}

View File

@@ -12,7 +12,7 @@ namespace Discord.WebSockets.Data
public string SessionId => _sessionId;
private string _sessionId;
public DataWebSocket(DiscordClient client)
public DataWebSocket(DiscordBaseClient client)
: base(client)
{
}

View File

@@ -16,7 +16,7 @@ namespace Discord.WebSockets.Data
internal partial class DataWebSocket
{
public event EventHandler<WebSocketEventEventArgs> ReceivedEvent;
internal event EventHandler<WebSocketEventEventArgs> ReceivedEvent;
private void RaiseReceivedEvent(string type, JToken payload)
{
if (ReceivedEvent != null)

View File

@@ -2,7 +2,7 @@
namespace Discord.WebSockets.Voice
{
public sealed class IsTalkingEventArgs : EventArgs
internal sealed class IsTalkingEventArgs : EventArgs
{
public readonly string UserId;
public readonly bool IsSpeaking;

View File

@@ -27,6 +27,7 @@ namespace Discord.WebSockets.Voice
private readonly ConcurrentDictionary<uint, OpusDecoder> _decoders;
private ManualResetEventSlim _connectWaitOnLogin;
private uint _ssrc;
private ConcurrentDictionary<uint, string> _ssrcMapping;
private ConcurrentQueue<byte[]> _sendQueue;
private ManualResetEventSlim _sendQueueWait, _sendQueueEmptyWait;
@@ -35,17 +36,16 @@ namespace Discord.WebSockets.Voice
private bool _isClearing, _isEncrypted;
private byte[] _secretKey, _encodingBuffer;
private ushort _sequence;
private string _userId, _sessionId, _token, _encryptionMode;
private Server _server;
private Channel _channel;
private string _serverId, _channelId, _userId, _sessionId, _token, _encryptionMode;
#if USE_THREAD
private Thread _sendThread;
private Thread _sendThread, _receiveThread;
#endif
public Server CurrentVoiceServer => _server;
public string CurrentServerId => _serverId;
public string CurrentChannelId => _channelId;
public VoiceWebSocket(DiscordClient client)
public VoiceWebSocket(DiscordBaseClient client)
: base(client)
{
_rand = new Random();
@@ -56,12 +56,14 @@ namespace Discord.WebSockets.Voice
_sendQueueEmptyWait = new ManualResetEventSlim(true);
_targetAudioBufferLength = client.Config.VoiceBufferLength / 20; //20 ms frames
_encodingBuffer = new byte[MaxOpusSize];
_ssrcMapping = new ConcurrentDictionary<uint, string>();
_encoder = new OpusEncoder(48000, 1, 20, Opus.Application.Audio);
}
public void SetChannel(Server server, Channel channel)
public void SetChannel(string serverId, string channelId)
{
_server = server;
_channel = channel;
_serverId = serverId;
_channelId = channelId;
}
public async Task Login(string userId, string sessionId, string token, CancellationToken cancelToken)
{
@@ -113,7 +115,7 @@ namespace Discord.WebSockets.Voice
#endif
LoginCommand msg = new LoginCommand();
msg.Payload.ServerId = _server.Id;
msg.Payload.ServerId = _serverId;
msg.Payload.SessionId = _sessionId;
msg.Payload.Token = _token;
msg.Payload.UserId = _userId;
@@ -122,6 +124,8 @@ namespace Discord.WebSockets.Voice
#if USE_THREAD
_sendThread = new Thread(new ThreadStart(() => SendVoiceAsync(_cancelToken)));
_sendThread.Start();
_receiveThread = new Thread(new ThreadStart(() => ReceiveVoiceAsync(_cancelToken)));
_receiveThread.Start();
#if !DNXCORE50
return new Task[] { WatcherAsync() }.Concat(base.Run()).ToArray();
#else
@@ -141,7 +145,9 @@ namespace Discord.WebSockets.Voice
{
#if USE_THREAD
_sendThread.Join();
_receiveThread.Join();
_sendThread = null;
_receiveThread = null;
#endif
OpusDecoder decoder;
@@ -274,9 +280,9 @@ namespace Discord.WebSockets.Voice
/*if (_logLevel >= LogMessageSeverity.Debug)
RaiseOnLog(LogMessageSeverity.Debug, $"Received {buffer.Length - 12} bytes.");*/
string userId = _channel.GetUserId(ssrc);
if (userId != null)
RaiseOnPacket(userId, _channel.Id, result, resultOffset, resultLength);
string userId;
if (_ssrcMapping.TryGetValue(ssrc, out userId))
RaiseOnPacket(userId, _channelId, result, resultOffset, resultLength);
}
}
#if USE_THREAD || DNXCORE50
@@ -568,9 +574,9 @@ namespace Discord.WebSockets.Voice
{
_sendQueueEmptyWait.Wait(_cancelToken);
}
public void WaitForConnection()
public void WaitForConnection(CancellationToken cancelToken)
{
_connectedEvent.Wait();
_connectedEvent.Wait(cancelToken);
}
}
}

View File

@@ -2,7 +2,7 @@
namespace Discord.WebSockets
{
internal partial class WebSocket
internal abstract partial class WebSocket
{
public event EventHandler Connected;
private void RaiseConnected()

View File

@@ -35,7 +35,7 @@ namespace Discord.WebSockets
internal abstract partial class WebSocket
{
protected readonly IWebSocketEngine _engine;
protected readonly DiscordClient _client;
protected readonly DiscordBaseClient _client;
protected readonly LogMessageSeverity _logLevel;
protected readonly ManualResetEventSlim _connectedEvent;
@@ -57,7 +57,7 @@ namespace Discord.WebSockets
public WebSocketState State => (WebSocketState)_state;
protected int _state;
public WebSocket(DiscordClient client)
public WebSocket(DiscordBaseClient client)
{
_client = client;
_logLevel = client.Config.LogLevel;
@@ -131,9 +131,9 @@ namespace Discord.WebSockets
_disconnectState = (WebSocketState)oldState;
_disconnectReason = ex != null ? ExceptionDispatchInfo.Capture(ex) : null;
if (_disconnectState == WebSocketState.Connecting) //_runTask was never made
await Cleanup();
_cancelTokenSource.Cancel();
if (_disconnectState == WebSocketState.Connecting) //_runTask was never made
await Cleanup().ConfigureAwait(false);
}
if (!skipAwait)
@@ -162,7 +162,7 @@ namespace Discord.WebSockets
try { await allTasks.ConfigureAwait(false); }
catch { }
//Clean up state variables and raise disconnect event
//Start cleanup
await Cleanup().ConfigureAwait(false);
}
protected virtual Task[] Run()