Rewrote AudioClient, fixed several async issues, removed most sealed keywords.
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
using Discord.API.Client.GatewaySocket;
|
||||
using Discord.API.Client.Rest;
|
||||
using Discord.Logging;
|
||||
using Discord.Net.Rest;
|
||||
using Discord.Net.WebSockets;
|
||||
using Newtonsoft.Json;
|
||||
using Nito.AsyncEx;
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
@@ -41,124 +44,190 @@ namespace Discord.Audio
|
||||
}
|
||||
}
|
||||
|
||||
private readonly DiscordConfig _config;
|
||||
private readonly AsyncLock _connectionLock;
|
||||
private readonly JsonSerializer _serializer;
|
||||
private CancellationTokenSource _cancelTokenSource;
|
||||
private readonly TaskManager _taskManager;
|
||||
private ConnectionState _gatewayState;
|
||||
|
||||
internal AudioService Service { get; }
|
||||
internal Logger Logger { get; }
|
||||
|
||||
/// <summary> Gets the unique identifier for this client. </summary>
|
||||
public int Id { get; }
|
||||
/// <summary> Gets the service managing this client. </summary>
|
||||
public AudioService Service { get; }
|
||||
/// <summary> Gets the configuration object used to make this client. </summary>
|
||||
public AudioServiceConfig Config { get; }
|
||||
/// <summary> Gets the internal RestClient for the Client API endpoint. </summary>
|
||||
public RestClient ClientAPI { get; }
|
||||
/// <summary> Gets the internal WebSocket for the Gateway event stream. </summary>
|
||||
public GatewaySocket GatewaySocket { get; }
|
||||
public VoiceWebSocket VoiceSocket { get; }
|
||||
/// <summary> Gets the internal WebSocket for the Voice control stream. </summary>
|
||||
public VoiceSocket VoiceSocket { get; }
|
||||
/// <summary> Gets the JSON serializer used by this client. </summary>
|
||||
public JsonSerializer Serializer { get; }
|
||||
/// <summary> </summary>
|
||||
public Stream OutputStream { get; }
|
||||
|
||||
/// <summary> Gets a cancellation token that triggers when the client is manually disconnected. </summary>
|
||||
public CancellationToken CancelToken { get; private set; }
|
||||
/// <summary> Gets the session id for the current connection. </summary>
|
||||
public string SessionId { get; private set; }
|
||||
|
||||
/// <summary> Gets the current state of this client. </summary>
|
||||
public ConnectionState State => VoiceSocket.State;
|
||||
/// <summary> Gets the server this client is bound to. </summary>
|
||||
public Server Server => VoiceSocket.Server;
|
||||
/// <summary> Gets the channel </summary>
|
||||
public Channel Channel => VoiceSocket.Channel;
|
||||
|
||||
public AudioClient(AudioService service, int clientId, Server server, GatewaySocket gatewaySocket, Logger logger)
|
||||
public AudioClient(DiscordClient client, Server server, int id)
|
||||
{
|
||||
Service = service;
|
||||
_serializer = service.Client.Serializer;
|
||||
Id = clientId;
|
||||
GatewaySocket = gatewaySocket;
|
||||
Logger = logger;
|
||||
OutputStream = new OutStream(this);
|
||||
Id = id;
|
||||
_config = client.Config;
|
||||
Service = client.Audio();
|
||||
Config = Service.Config;
|
||||
Serializer = client.Serializer;
|
||||
_gatewayState = (int)ConnectionState.Disconnected;
|
||||
|
||||
//Logging
|
||||
Logger = client.Log.CreateLogger($"AudioClient #{id}");
|
||||
|
||||
//Async
|
||||
_taskManager = new TaskManager(Cleanup, false);
|
||||
_connectionLock = new AsyncLock();
|
||||
CancelToken = new CancellationToken(true);
|
||||
|
||||
GatewaySocket.ReceivedDispatch += OnReceivedDispatch;
|
||||
|
||||
VoiceSocket = new VoiceWebSocket(service.Client, this, logger);
|
||||
//Networking
|
||||
if (Config.EnableMultiserver)
|
||||
{
|
||||
ClientAPI = new RestClient(_config, DiscordConfig.ClientAPIUrl, client.Log.CreateLogger($"ClientAPI #{id}"));
|
||||
GatewaySocket = new GatewaySocket(_config, client.Serializer, client.Log.CreateLogger($"Gateway #{id}"));
|
||||
GatewaySocket.Connected += (s, e) =>
|
||||
{
|
||||
if (_gatewayState == ConnectionState.Connecting)
|
||||
EndGatewayConnect();
|
||||
};
|
||||
}
|
||||
else
|
||||
GatewaySocket = client.GatewaySocket;
|
||||
GatewaySocket.ReceivedDispatch += (s, e) => OnReceivedEvent(e);
|
||||
VoiceSocket = new VoiceSocket(_config, Config, client.Serializer, client.Log.CreateLogger($"Voice #{id}"));
|
||||
VoiceSocket.Server = server;
|
||||
OutputStream = new OutStream(this);
|
||||
}
|
||||
|
||||
/*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
|
||||
_voiceSocket.Disconnected += async (s, e) =>
|
||||
{
|
||||
_voiceSocket.CurrentServerId;
|
||||
if (voiceServerId != null)
|
||||
_gatewaySocket.SendLeaveVoice(voiceServerId.Value);
|
||||
await _voiceSocket.Disconnect().ConfigureAwait(false);
|
||||
RaiseVoiceDisconnected(socket.CurrentServerId.Value, e);
|
||||
if (e.WasUnexpected)
|
||||
await socket.Reconnect().ConfigureAwait(false);
|
||||
};*/
|
||||
/// <summary> Connects to the Discord server with the provided token. </summary>
|
||||
public async Task Connect()
|
||||
{
|
||||
if (Config.EnableMultiserver)
|
||||
await BeginGatewayConnect().ConfigureAwait(false);
|
||||
else
|
||||
{
|
||||
var cancelSource = new CancellationTokenSource();
|
||||
CancelToken = cancelSource.Token;
|
||||
await _taskManager.Start(new Task[0], cancelSource).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
private async Task BeginGatewayConnect()
|
||||
{
|
||||
try
|
||||
{
|
||||
using (await _connectionLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
_taskManager.ClearException();
|
||||
|
||||
/*_voiceSocket.IsSpeaking += (s, e) =>
|
||||
{
|
||||
if (_voiceSocket.State == WebSocketState.Connected)
|
||||
{
|
||||
var user = _users[e.UserId, socket.CurrentServerId];
|
||||
bool value = e.IsSpeaking;
|
||||
if (user.IsSpeaking != value)
|
||||
{
|
||||
user.IsSpeaking = value;
|
||||
var channel = _channels[_voiceSocket.CurrentChannelId];
|
||||
RaiseUserIsSpeaking(user, channel, value);
|
||||
if (Config.TrackActivity)
|
||||
user.UpdateActivity();
|
||||
}
|
||||
}
|
||||
};*/
|
||||
ClientAPI.Token = Service.Client.ClientAPI.Token;
|
||||
|
||||
/*this.Connected += (s, e) =>
|
||||
{
|
||||
_voiceSocket.ParentCancelToken = _cancelToken;
|
||||
};*/
|
||||
Stopwatch stopwatch = null;
|
||||
if (_config.LogLevel >= LogSeverity.Verbose)
|
||||
stopwatch = Stopwatch.StartNew();
|
||||
_gatewayState = ConnectionState.Connecting;
|
||||
|
||||
var cancelSource = new CancellationTokenSource();
|
||||
CancelToken = cancelSource.Token;
|
||||
ClientAPI.CancelToken = CancelToken;
|
||||
|
||||
await GatewaySocket.Connect(ClientAPI, CancelToken).ConfigureAwait(false);
|
||||
|
||||
await _taskManager.Start(new Task[0], cancelSource).ConfigureAwait(false);
|
||||
GatewaySocket.WaitForConnection(CancelToken);
|
||||
|
||||
if (_config.LogLevel >= LogSeverity.Verbose)
|
||||
{
|
||||
stopwatch.Stop();
|
||||
double seconds = Math.Round(stopwatch.ElapsedTicks / (double)TimeSpan.TicksPerSecond, 2);
|
||||
Logger.Verbose($"Connection took {seconds} sec");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await _taskManager.SignalError(ex).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
private void EndGatewayConnect()
|
||||
{
|
||||
_gatewayState = ConnectionState.Connected;
|
||||
}
|
||||
|
||||
/// <summary> Disconnects from the Discord server, canceling any pending requests. </summary>
|
||||
public async Task Disconnect()
|
||||
{
|
||||
await _taskManager.Stop(true).ConfigureAwait(false);
|
||||
if (Config.EnableMultiserver)
|
||||
ClientAPI.Token = null;
|
||||
}
|
||||
private async Task Cleanup()
|
||||
{
|
||||
var oldState = _gatewayState;
|
||||
_gatewayState = ConnectionState.Disconnecting;
|
||||
|
||||
if (Config.EnableMultiserver)
|
||||
{
|
||||
if (oldState == ConnectionState.Connected)
|
||||
{
|
||||
try { await ClientAPI.Send(new LogoutRequest()).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { }
|
||||
}
|
||||
|
||||
await GatewaySocket.Disconnect().ConfigureAwait(false);
|
||||
ClientAPI.Token = null;
|
||||
}
|
||||
|
||||
var server = VoiceSocket.Server;
|
||||
VoiceSocket.Server = null;
|
||||
VoiceSocket.Channel = null;
|
||||
if (Config.EnableMultiserver)
|
||||
await Service.RemoveClient(server, this).ConfigureAwait(false);
|
||||
SendVoiceUpdate(server.Id, null);
|
||||
|
||||
await VoiceSocket.Disconnect().ConfigureAwait(false);
|
||||
if (Config.EnableMultiserver)
|
||||
await GatewaySocket.Disconnect().ConfigureAwait(false);
|
||||
|
||||
_gatewayState = (int)ConnectionState.Disconnected;
|
||||
}
|
||||
|
||||
public async Task Join(Channel channel)
|
||||
{
|
||||
if (channel == null) throw new ArgumentNullException(nameof(channel));
|
||||
{
|
||||
if (channel == null) throw new ArgumentNullException(nameof(channel));
|
||||
if (channel.Type != ChannelType.Voice)
|
||||
throw new ArgumentException("Channel must be a voice channel.", nameof(channel));
|
||||
if (channel.Server != VoiceSocket.Server)
|
||||
throw new ArgumentException("This is channel is not part of the current server.", nameof(channel));
|
||||
throw new ArgumentException("Channel must be a voice channel.", nameof(channel));
|
||||
if (channel == VoiceSocket.Channel) return;
|
||||
var server = channel.Server;
|
||||
if (server != VoiceSocket.Server)
|
||||
throw new ArgumentException("This is channel is not part of the current server.", nameof(channel));
|
||||
if (VoiceSocket.Server == null)
|
||||
throw new InvalidOperationException("This client has been closed.");
|
||||
|
||||
using (await _connectionLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
VoiceSocket.Channel = channel;
|
||||
|
||||
await Task.Run(() =>
|
||||
{
|
||||
SendVoiceUpdate();
|
||||
VoiceSocket.WaitForConnection(_cancelTokenSource.Token);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async Task Connect(bool connectGateway)
|
||||
{
|
||||
SendVoiceUpdate(channel.Server.Id, channel.Id);
|
||||
using (await _connectionLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
_cancelTokenSource = new CancellationTokenSource();
|
||||
var cancelToken = _cancelTokenSource.Token;
|
||||
VoiceSocket.ParentCancelToken = cancelToken;
|
||||
|
||||
if (connectGateway)
|
||||
{
|
||||
GatewaySocket.ParentCancelToken = cancelToken;
|
||||
await GatewaySocket.Connect().ConfigureAwait(false);
|
||||
GatewaySocket.WaitForConnection(cancelToken);
|
||||
}
|
||||
}
|
||||
await Task.Run(() => VoiceSocket.WaitForConnection(CancelToken));
|
||||
}
|
||||
|
||||
public async Task Disconnect()
|
||||
{
|
||||
using (await _connectionLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
await Service.RemoveClient(VoiceSocket.Server, this).ConfigureAwait(false);
|
||||
VoiceSocket.Channel = null;
|
||||
SendVoiceUpdate();
|
||||
await VoiceSocket.Disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
private async void OnReceivedDispatch(object sender, WebSocketEventEventArgs e)
|
||||
private async void OnReceivedEvent(WebSocketEventEventArgs e)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -166,11 +235,11 @@ namespace Discord.Audio
|
||||
{
|
||||
case "VOICE_STATE_UPDATE":
|
||||
{
|
||||
var data = e.Payload.ToObject<VoiceStateUpdateEvent>(_serializer);
|
||||
var data = e.Payload.ToObject<VoiceStateUpdateEvent>(Serializer);
|
||||
if (data.GuildId == VoiceSocket.Server?.Id && data.UserId == Service.Client.CurrentUser?.Id)
|
||||
{
|
||||
if (data.ChannelId == null)
|
||||
await Disconnect();
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
else
|
||||
{
|
||||
var channel = Service.Client.GetChannel(data.ChannelId.Value);
|
||||
@@ -179,7 +248,7 @@ namespace Discord.Audio
|
||||
else
|
||||
{
|
||||
Logger.Warning("VOICE_STATE_UPDATE referenced an unknown channel, disconnecting.");
|
||||
await Disconnect();
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -187,13 +256,16 @@ namespace Discord.Audio
|
||||
break;
|
||||
case "VOICE_SERVER_UPDATE":
|
||||
{
|
||||
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_serializer);
|
||||
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(Serializer);
|
||||
if (data.GuildId == VoiceSocket.Server?.Id)
|
||||
{
|
||||
var client = Service.Client;
|
||||
VoiceSocket.Token = data.Token;
|
||||
VoiceSocket.Host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0];
|
||||
await VoiceSocket.Connect().ConfigureAwait(false);
|
||||
var id = client.CurrentUser?.Id;
|
||||
if (id != null)
|
||||
{
|
||||
var host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0];
|
||||
await VoiceSocket.Connect(host, data.Token, id.Value, GatewaySocket.SessionId, CancelToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -233,15 +305,11 @@ namespace Discord.Audio
|
||||
VoiceSocket.WaitForQueue();
|
||||
}
|
||||
|
||||
private void SendVoiceUpdate()
|
||||
public void SendVoiceUpdate(ulong? serverId, ulong? channelId)
|
||||
{
|
||||
var serverId = VoiceSocket.Server?.Id;
|
||||
if (serverId != null)
|
||||
{
|
||||
GatewaySocket.SendUpdateVoice(serverId, VoiceSocket.Channel?.Id,
|
||||
(Service.Config.Mode | AudioMode.Outgoing) == 0,
|
||||
(Service.Config.Mode | AudioMode.Incoming) == 0);
|
||||
}
|
||||
GatewaySocket.SendUpdateVoice(serverId, channelId,
|
||||
(Service.Config.Mode | AudioMode.Outgoing) == 0,
|
||||
(Service.Config.Mode | AudioMode.Incoming) == 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
242
src/Discord.Net.Audio/AudioClient.cs.old
Normal file
242
src/Discord.Net.Audio/AudioClient.cs.old
Normal file
@@ -0,0 +1,242 @@
|
||||
using Discord.API.Client.GatewaySocket;
|
||||
using Discord.Logging;
|
||||
using Discord.Net.Rest;
|
||||
using Discord.Net.WebSockets;
|
||||
using Newtonsoft.Json;
|
||||
using Nito.AsyncEx;
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Discord.Audio
|
||||
{
|
||||
internal class AudioClient : IAudioClient
|
||||
{
|
||||
private class OutStream : Stream
|
||||
{
|
||||
public override bool CanRead => false;
|
||||
public override bool CanSeek => false;
|
||||
public override bool CanWrite => true;
|
||||
|
||||
private readonly AudioClient _client;
|
||||
|
||||
internal OutStream(AudioClient client)
|
||||
{
|
||||
_client = client;
|
||||
}
|
||||
|
||||
public override long Length { get { throw new InvalidOperationException(); } }
|
||||
public override long Position
|
||||
{
|
||||
get { throw new InvalidOperationException(); }
|
||||
set { throw new InvalidOperationException(); }
|
||||
}
|
||||
public override void Flush() { throw new InvalidOperationException(); }
|
||||
public override long Seek(long offset, SeekOrigin origin) { throw new InvalidOperationException(); }
|
||||
public override void SetLength(long value) { throw new InvalidOperationException(); }
|
||||
public override int Read(byte[] buffer, int offset, int count) { throw new InvalidOperationException(); }
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
_client.Send(buffer, offset, count);
|
||||
}
|
||||
}
|
||||
|
||||
private readonly JsonSerializer _serializer;
|
||||
private readonly bool _ownsGateway;
|
||||
private TaskManager _taskManager;
|
||||
private CancellationToken _cancelToken;
|
||||
|
||||
internal AudioService Service { get; }
|
||||
internal Logger Logger { get; }
|
||||
public int Id { get; }
|
||||
public GatewaySocket GatewaySocket { get; }
|
||||
public VoiceSocket VoiceSocket { get; }
|
||||
public Stream OutputStream { get; }
|
||||
|
||||
public ConnectionState State => VoiceSocket.State;
|
||||
public Server Server => VoiceSocket.Server;
|
||||
public Channel Channel => VoiceSocket.Channel;
|
||||
|
||||
public AudioClient(AudioService service, int clientId, Server server, GatewaySocket gatewaySocket, bool ownsGateway, Logger logger)
|
||||
{
|
||||
Service = service;
|
||||
_serializer = service.Client.Serializer;
|
||||
Id = clientId;
|
||||
GatewaySocket = gatewaySocket;
|
||||
_ownsGateway = ownsGateway;
|
||||
Logger = logger;
|
||||
OutputStream = new OutStream(this);
|
||||
_taskManager = new TaskManager(Cleanup, true);
|
||||
|
||||
GatewaySocket.ReceivedDispatch += OnReceivedDispatch;
|
||||
|
||||
VoiceSocket = new VoiceSocket(service.Client.Config, service.Config, service.Client.Serializer, logger);
|
||||
VoiceSocket.Server = server;
|
||||
|
||||
/*_voiceSocket.Connected += (s, e) => RaiseVoiceConnected();
|
||||
_voiceSocket.Disconnected += async (s, e) =>
|
||||
{
|
||||
_voiceSocket.CurrentServerId;
|
||||
if (voiceServerId != null)
|
||||
_gatewaySocket.SendLeaveVoice(voiceServerId.Value);
|
||||
await _voiceSocket.Disconnect().ConfigureAwait(false);
|
||||
RaiseVoiceDisconnected(socket.CurrentServerId.Value, e);
|
||||
if (e.WasUnexpected)
|
||||
await socket.Reconnect().ConfigureAwait(false);
|
||||
};*/
|
||||
|
||||
/*_voiceSocket.IsSpeaking += (s, e) =>
|
||||
{
|
||||
if (_voiceSocket.State == WebSocketState.Connected)
|
||||
{
|
||||
var user = _users[e.UserId, socket.CurrentServerId];
|
||||
bool value = e.IsSpeaking;
|
||||
if (user.IsSpeaking != value)
|
||||
{
|
||||
user.IsSpeaking = value;
|
||||
var channel = _channels[_voiceSocket.CurrentChannelId];
|
||||
RaiseUserIsSpeaking(user, channel, value);
|
||||
if (Config.TrackActivity)
|
||||
user.UpdateActivity();
|
||||
}
|
||||
}
|
||||
};*/
|
||||
|
||||
/*this.Connected += (s, e) =>
|
||||
{
|
||||
_voiceSocket.ParentCancelToken = _cancelToken;
|
||||
};*/
|
||||
}
|
||||
|
||||
public async Task Join(Channel channel)
|
||||
{
|
||||
if (channel == null) throw new ArgumentNullException(nameof(channel));
|
||||
if (channel.Type != ChannelType.Voice)
|
||||
throw new ArgumentException("Channel must be a voice channel.", nameof(channel));
|
||||
if (channel.Server != VoiceSocket.Server)
|
||||
throw new ArgumentException("This is channel is not part of the current server.", nameof(channel));
|
||||
if (channel == VoiceSocket.Channel) return;
|
||||
if (VoiceSocket.Server == null)
|
||||
throw new InvalidOperationException("This client has been closed.");
|
||||
|
||||
SendVoiceUpdate(channel.Server.Id, channel.Id);
|
||||
await Task.Run(() => VoiceSocket.WaitForConnection(_cancelToken));
|
||||
}
|
||||
|
||||
public async Task Connect(RestClient rest = null)
|
||||
{
|
||||
var cancelSource = new CancellationTokenSource();
|
||||
_cancelToken = cancelSource.Token;
|
||||
|
||||
Task[] tasks;
|
||||
if (rest != null)
|
||||
tasks = new Task[] { GatewaySocket.Connect(rest, _cancelToken) };
|
||||
else
|
||||
tasks = new Task[0];
|
||||
|
||||
await _taskManager.Start(tasks, cancelSource);
|
||||
}
|
||||
|
||||
public Task Disconnect() => _taskManager.Stop(true);
|
||||
|
||||
private async Task Cleanup()
|
||||
{
|
||||
var server = VoiceSocket.Server;
|
||||
VoiceSocket.Server = null;
|
||||
VoiceSocket.Channel = null;
|
||||
|
||||
await Service.RemoveClient(server, this).ConfigureAwait(false);
|
||||
SendVoiceUpdate(server.Id, null);
|
||||
|
||||
await VoiceSocket.Disconnect().ConfigureAwait(false);
|
||||
if (_ownsGateway)
|
||||
await GatewaySocket.Disconnect().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async void OnReceivedDispatch(object sender, WebSocketEventEventArgs e)
|
||||
{
|
||||
try
|
||||
{
|
||||
switch (e.Type)
|
||||
{
|
||||
case "VOICE_STATE_UPDATE":
|
||||
{
|
||||
var data = e.Payload.ToObject<VoiceStateUpdateEvent>(_serializer);
|
||||
if (data.GuildId == VoiceSocket.Server?.Id && data.UserId == Service.Client.CurrentUser?.Id)
|
||||
{
|
||||
if (data.ChannelId == null)
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
else
|
||||
{
|
||||
var channel = Service.Client.GetChannel(data.ChannelId.Value);
|
||||
if (channel != null)
|
||||
VoiceSocket.Channel = channel;
|
||||
else
|
||||
{
|
||||
Logger.Warning("VOICE_STATE_UPDATE referenced an unknown channel, disconnecting.");
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "VOICE_SERVER_UPDATE":
|
||||
{
|
||||
var data = e.Payload.ToObject<VoiceServerUpdateEvent>(_serializer);
|
||||
if (data.GuildId == VoiceSocket.Server?.Id)
|
||||
{
|
||||
var client = Service.Client;
|
||||
var id = client.CurrentUser?.Id;
|
||||
if (id != null)
|
||||
{
|
||||
var host = "wss://" + e.Payload.Value<string>("endpoint").Split(':')[0];
|
||||
await VoiceSocket.Connect(host, data.Token, id.Value, GatewaySocket.SessionId, _cancelToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.Error($"Error handling {e.Type} event", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary> Sends a PCM frame to the voice server. Will block until space frees up in the outgoing buffer. </summary>
|
||||
/// <param name="data">PCM frame to send. This must be a single or collection of uncompressed 48Kz monochannel 20ms PCM frames. </param>
|
||||
/// <param name="count">Number of bytes in this frame. </param>
|
||||
public void Send(byte[] data, int offset, int count)
|
||||
{
|
||||
if (data == null) throw new ArgumentException(nameof(data));
|
||||
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
|
||||
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(count));
|
||||
if (VoiceSocket.Server == null) return; //Has been closed
|
||||
if (count == 0) return;
|
||||
|
||||
VoiceSocket.SendPCMFrames(data, offset, count);
|
||||
}
|
||||
|
||||
/// <summary> Clears the PCM buffer. </summary>
|
||||
public void Clear()
|
||||
{
|
||||
if (VoiceSocket.Server == null) return; //Has been closed
|
||||
VoiceSocket.ClearPCMFrames();
|
||||
}
|
||||
|
||||
/// <summary> Returns a task that completes once the voice output buffer is empty. </summary>
|
||||
public void Wait()
|
||||
{
|
||||
if (VoiceSocket.Server == null) return; //Has been closed
|
||||
VoiceSocket.WaitForQueue();
|
||||
}
|
||||
|
||||
public void SendVoiceUpdate(ulong? serverId, ulong? channelId)
|
||||
{
|
||||
GatewaySocket.SendUpdateVoice(serverId, channelId,
|
||||
(Service.Config.Mode | AudioMode.Outgoing) == 0,
|
||||
(Service.Config.Mode | AudioMode.Incoming) == 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
using Discord.Net.WebSockets;
|
||||
using Nito.AsyncEx;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Linq;
|
||||
@@ -8,8 +8,10 @@ namespace Discord.Audio
|
||||
{
|
||||
public class AudioService : IService
|
||||
{
|
||||
private AudioClient _defaultClient;
|
||||
private ConcurrentDictionary<ulong, IAudioClient> _voiceClients;
|
||||
private readonly AsyncLock _asyncLock;
|
||||
private AudioClient _defaultClient; //Only used for single server
|
||||
private VirtualClient _currentClient; //Only used for single server
|
||||
private ConcurrentDictionary<ulong, AudioClient> _voiceClients;
|
||||
private ConcurrentDictionary<User, bool> _talkingUsers;
|
||||
private int _nextClientId;
|
||||
|
||||
@@ -30,18 +32,20 @@ namespace Discord.Audio
|
||||
public AudioService(AudioServiceConfig config)
|
||||
{
|
||||
Config = config;
|
||||
}
|
||||
_asyncLock = new AsyncLock();
|
||||
|
||||
}
|
||||
void IService.Install(DiscordClient client)
|
||||
{
|
||||
Client = client;
|
||||
Config.Lock();
|
||||
|
||||
if (Config.EnableMultiserver)
|
||||
_voiceClients = new ConcurrentDictionary<ulong, IAudioClient>();
|
||||
_voiceClients = new ConcurrentDictionary<ulong, AudioClient>();
|
||||
else
|
||||
{
|
||||
var logger = Client.Log.CreateLogger("Voice");
|
||||
_defaultClient = new SimpleAudioClient(this, 0, logger);
|
||||
_defaultClient = new AudioClient(Client, null, 0);
|
||||
}
|
||||
_talkingUsers = new ConcurrentDictionary<User, bool>();
|
||||
|
||||
@@ -75,68 +79,30 @@ namespace Discord.Audio
|
||||
{
|
||||
if (server == null) throw new ArgumentNullException(nameof(server));
|
||||
|
||||
if (!Config.EnableMultiserver)
|
||||
if (Config.EnableMultiserver)
|
||||
{
|
||||
if (server == _defaultClient.Server)
|
||||
return (_defaultClient as SimpleAudioClient).CurrentClient;
|
||||
else
|
||||
return null;
|
||||
}
|
||||
else
|
||||
{
|
||||
IAudioClient client;
|
||||
AudioClient client;
|
||||
if (_voiceClients.TryGetValue(server.Id, out client))
|
||||
return client;
|
||||
else
|
||||
return null;
|
||||
}
|
||||
}
|
||||
private async Task<IAudioClient> CreateClient(Server server)
|
||||
{
|
||||
var client = _voiceClients.GetOrAdd(server.Id, _ => null); //Placeholder, so we can't have two clients connecting at once
|
||||
|
||||
if (client == null)
|
||||
else
|
||||
{
|
||||
int id = unchecked(++_nextClientId);
|
||||
|
||||
var gatewayLogger = Client.Log.CreateLogger($"Gateway #{id}");
|
||||
var voiceLogger = Client.Log.CreateLogger($"Voice #{id}");
|
||||
var gatewaySocket = new GatewaySocket(Client, gatewayLogger);
|
||||
var voiceClient = new AudioClient(this, id, server, Client.GatewaySocket, voiceLogger);
|
||||
|
||||
await voiceClient.Connect(true).ConfigureAwait(false);
|
||||
|
||||
/*voiceClient.VoiceSocket.FrameReceived += (s, e) =>
|
||||
{
|
||||
OnFrameReceieved(e);
|
||||
};
|
||||
voiceClient.VoiceSocket.UserIsSpeaking += (s, e) =>
|
||||
{
|
||||
var user = server.GetUser(e.UserId);
|
||||
OnUserIsSpeakingUpdated(user, e.IsSpeaking);
|
||||
};*/
|
||||
|
||||
//Update the placeholder only it still exists (RemoveClient wasnt called)
|
||||
if (!_voiceClients.TryUpdate(server.Id, voiceClient, null))
|
||||
{
|
||||
//If it was, cleanup
|
||||
await voiceClient.Disconnect().ConfigureAwait(false); ;
|
||||
await gatewaySocket.Disconnect().ConfigureAwait(false); ;
|
||||
}
|
||||
if (server == _currentClient.Server)
|
||||
return _currentClient;
|
||||
else
|
||||
return null;
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
//TODO: This isn't threadsafe
|
||||
internal async Task RemoveClient(Server server, IAudioClient client)
|
||||
|
||||
//Called from AudioClient.Disconnect
|
||||
internal async Task RemoveClient(Server server, AudioClient client)
|
||||
{
|
||||
if (Config.EnableMultiserver && server != null)
|
||||
using (await _asyncLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
if (_voiceClients.TryRemove(server.Id, out client))
|
||||
{
|
||||
await client.Disconnect();
|
||||
await (client as AudioClient).GatewaySocket.Disconnect();
|
||||
}
|
||||
if (_voiceClients.TryUpdate(server.Id, null, client))
|
||||
_voiceClients.TryRemove(server.Id, out client);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,16 +110,48 @@ namespace Discord.Audio
|
||||
{
|
||||
if (channel == null) throw new ArgumentNullException(nameof(channel));
|
||||
|
||||
if (!Config.EnableMultiserver)
|
||||
var server = channel.Server;
|
||||
using (await _asyncLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
await (_defaultClient as SimpleAudioClient).Connect(channel, false).ConfigureAwait(false);
|
||||
return _defaultClient;
|
||||
}
|
||||
else
|
||||
{
|
||||
var client = await CreateClient(channel.Server).ConfigureAwait(false);
|
||||
await client.Join(channel).ConfigureAwait(false);
|
||||
return client;
|
||||
if (Config.EnableMultiserver)
|
||||
{
|
||||
AudioClient client;
|
||||
if (!_voiceClients.TryGetValue(server.Id, out client))
|
||||
{
|
||||
client = new AudioClient(Client, server, unchecked(++_nextClientId));
|
||||
_voiceClients[server.Id] = client;
|
||||
|
||||
await client.Connect().ConfigureAwait(false);
|
||||
|
||||
/*voiceClient.VoiceSocket.FrameReceived += (s, e) =>
|
||||
{
|
||||
OnFrameReceieved(e);
|
||||
};
|
||||
voiceClient.VoiceSocket.UserIsSpeaking += (s, e) =>
|
||||
{
|
||||
var user = server.GetUser(e.UserId);
|
||||
OnUserIsSpeakingUpdated(user, e.IsSpeaking);
|
||||
};*/
|
||||
}
|
||||
|
||||
await client.Join(channel).ConfigureAwait(false);
|
||||
return client;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_defaultClient.Server != server)
|
||||
{
|
||||
await _defaultClient.Disconnect();
|
||||
_defaultClient.VoiceSocket.Server = server;
|
||||
await _defaultClient.Connect().ConfigureAwait(false);
|
||||
}
|
||||
var client = new VirtualClient(_defaultClient, server);
|
||||
_currentClient = client;
|
||||
|
||||
await client.Join(channel).ConfigureAwait(false);
|
||||
return client;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,15 +161,18 @@ namespace Discord.Audio
|
||||
|
||||
if (Config.EnableMultiserver)
|
||||
{
|
||||
IAudioClient client;
|
||||
AudioClient client;
|
||||
if (_voiceClients.TryRemove(server.Id, out client))
|
||||
await client.Disconnect().ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
IAudioClient client = GetClient(server);
|
||||
if (client != null)
|
||||
await (_defaultClient as SimpleAudioClient).Leave(client as SimpleAudioClient.VirtualClient).ConfigureAwait(false);
|
||||
using (await _asyncLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
var client = GetClient(server) as VirtualClient;
|
||||
if (client != null)
|
||||
await _defaultClient.Disconnect().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace Discord.Net.WebSockets
|
||||
{
|
||||
public partial class VoiceWebSocket : WebSocket
|
||||
public partial class VoiceSocket : WebSocket
|
||||
{
|
||||
private const int MaxOpusSize = 4000;
|
||||
private const string EncryptedMode = "xsalsa20_poly1305";
|
||||
@@ -27,8 +27,7 @@ namespace Discord.Net.WebSockets
|
||||
|
||||
private readonly int _targetAudioBufferLength;
|
||||
private readonly ConcurrentDictionary<uint, OpusDecoder> _decoders;
|
||||
private readonly AudioClient _audioClient;
|
||||
private readonly AudioServiceConfig _config;
|
||||
private readonly AudioServiceConfig _audioConfig;
|
||||
private Task _sendTask, _receiveTask;
|
||||
private VoiceBuffer _sendBuffer;
|
||||
private OpusEncoder _encoder;
|
||||
@@ -41,6 +40,8 @@ namespace Discord.Net.WebSockets
|
||||
private ushort _sequence;
|
||||
private string _encryptionMode;
|
||||
private int _ping;
|
||||
private ulong? _userId;
|
||||
private string _sessionId;
|
||||
|
||||
public string Token { get; internal set; }
|
||||
public Server Server { get; internal set; }
|
||||
@@ -57,32 +58,37 @@ namespace Discord.Net.WebSockets
|
||||
internal void OnFrameReceived(ulong userId, ulong channelId, byte[] buffer, int offset, int count)
|
||||
=> FrameReceived(this, new InternalFrameEventArgs(userId, channelId, buffer, offset, count));
|
||||
|
||||
internal VoiceWebSocket(DiscordClient client, AudioClient audioClient, Logger logger)
|
||||
: base(client, logger)
|
||||
internal VoiceSocket(DiscordConfig config, AudioServiceConfig audioConfig, JsonSerializer serializer, Logger logger)
|
||||
: base(config, serializer, logger)
|
||||
{
|
||||
_audioClient = audioClient;
|
||||
_config = client.Audio().Config;
|
||||
_audioConfig = audioConfig;
|
||||
_decoders = new ConcurrentDictionary<uint, OpusDecoder>();
|
||||
_targetAudioBufferLength = _config.BufferLength / 20; //20 ms frames
|
||||
_targetAudioBufferLength = _audioConfig.BufferLength / 20; //20 ms frames
|
||||
_encodingBuffer = new byte[MaxOpusSize];
|
||||
_ssrcMapping = new ConcurrentDictionary<uint, ulong>();
|
||||
_encoder = new OpusEncoder(48000, _config.Channels, 20, _config.Bitrate, OpusApplication.MusicOrMixed);
|
||||
_sendBuffer = new VoiceBuffer((int)Math.Ceiling(_config.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize);
|
||||
_encoder = new OpusEncoder(48000, _audioConfig.Channels, 20, _audioConfig.Bitrate, OpusApplication.MusicOrMixed);
|
||||
_sendBuffer = new VoiceBuffer((int)Math.Ceiling(_audioConfig.BufferLength / (double)_encoder.FrameLength), _encoder.FrameSize);
|
||||
}
|
||||
|
||||
public Task Connect()
|
||||
=> BeginConnect();
|
||||
public Task Connect(string host, string token, ulong userId, string sessionId, CancellationToken parentCancelToken)
|
||||
{
|
||||
Host = host;
|
||||
Token = token;
|
||||
_userId = userId;
|
||||
_sessionId = sessionId;
|
||||
return BeginConnect(parentCancelToken);
|
||||
}
|
||||
private async Task Reconnect()
|
||||
{
|
||||
try
|
||||
{
|
||||
var cancelToken = ParentCancelToken.Value;
|
||||
await Task.Delay(_client.Config.ReconnectDelay, cancelToken).ConfigureAwait(false);
|
||||
var cancelToken = _parentCancelToken;
|
||||
await Task.Delay(_config.ReconnectDelay, cancelToken).ConfigureAwait(false);
|
||||
while (!cancelToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Connect().ConfigureAwait(false);
|
||||
await BeginConnect(_parentCancelToken).ConfigureAwait(false);
|
||||
break;
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
@@ -90,31 +96,35 @@ namespace Discord.Net.WebSockets
|
||||
{
|
||||
Logger.Error("Reconnect failed", ex);
|
||||
//Net is down? We can keep trying to reconnect until the user runs Disconnect()
|
||||
await Task.Delay(_client.Config.FailedReconnectDelay, cancelToken).ConfigureAwait(false);
|
||||
await Task.Delay(_config.FailedReconnectDelay, cancelToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
}
|
||||
public Task Disconnect() => _taskManager.Stop(true);
|
||||
public async Task Disconnect()
|
||||
{
|
||||
await _taskManager.Stop(true).ConfigureAwait(false);
|
||||
_userId = null;
|
||||
}
|
||||
|
||||
protected override async Task Run()
|
||||
{
|
||||
_udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
|
||||
|
||||
List<Task> tasks = new List<Task>();
|
||||
if (_config.Mode.HasFlag(AudioMode.Outgoing))
|
||||
if (_audioConfig.Mode.HasFlag(AudioMode.Outgoing))
|
||||
_sendTask = Task.Run(() => SendVoiceAsync(CancelToken));
|
||||
_receiveTask = Task.Run(() => ReceiveVoiceAsync(CancelToken));
|
||||
|
||||
SendIdentify();
|
||||
SendIdentify(_userId.Value, _sessionId);
|
||||
|
||||
#if !DOTNET5_4
|
||||
tasks.Add(WatcherAsync());
|
||||
#endif
|
||||
tasks.AddRange(_engine.GetTasks(CancelToken));
|
||||
tasks.Add(HeartbeatAsync(CancelToken));
|
||||
await _taskManager.Start(tasks, _cancelTokenSource).ConfigureAwait(false);
|
||||
await _taskManager.Start(tasks, _cancelSource).ConfigureAwait(false);
|
||||
}
|
||||
protected override async Task Cleanup()
|
||||
{
|
||||
@@ -148,7 +158,7 @@ namespace Discord.Net.WebSockets
|
||||
int packetLength, resultOffset, resultLength;
|
||||
IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0);
|
||||
|
||||
if ((_config.Mode & AudioMode.Incoming) != 0)
|
||||
if ((_audioConfig.Mode & AudioMode.Incoming) != 0)
|
||||
{
|
||||
decodingBuffer = new byte[MaxOpusSize];
|
||||
nonce = new byte[24];
|
||||
@@ -184,7 +194,7 @@ namespace Discord.Net.WebSockets
|
||||
int port = packet[68] | packet[69] << 8;
|
||||
|
||||
SendSelectProtocol(ip, port);
|
||||
if ((_config.Mode & AudioMode.Incoming) == 0)
|
||||
if ((_audioConfig.Mode & AudioMode.Incoming) == 0)
|
||||
return; //We dont need this thread anymore
|
||||
}
|
||||
else
|
||||
@@ -395,7 +405,7 @@ namespace Discord.Net.WebSockets
|
||||
var address = (await Dns.GetHostAddressesAsync(Host.Replace("wss://", "")).ConfigureAwait(false)).FirstOrDefault();
|
||||
_endpoint = new IPEndPoint(address, payload.Port);
|
||||
|
||||
if (_config.EnableEncryption)
|
||||
if (_audioConfig.EnableEncryption)
|
||||
{
|
||||
if (payload.Modes.Contains(EncryptedMode))
|
||||
{
|
||||
@@ -467,12 +477,12 @@ namespace Discord.Net.WebSockets
|
||||
|
||||
public override void SendHeartbeat()
|
||||
=> QueueMessage(new HeartbeatCommand());
|
||||
public void SendIdentify()
|
||||
public void SendIdentify(ulong id, string sessionId)
|
||||
=> QueueMessage(new IdentifyCommand
|
||||
{
|
||||
GuildId = Server.Id,
|
||||
UserId = _client.CurrentUser.Id,
|
||||
SessionId = _client.SessionId,
|
||||
UserId = id,
|
||||
SessionId = sessionId,
|
||||
Token = Token
|
||||
});
|
||||
public void SendSelectProtocol(string externalAddress, int externalPort)
|
||||
@@ -1,72 +0,0 @@
|
||||
using Discord.Logging;
|
||||
using Nito.AsyncEx;
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Discord.Audio
|
||||
{
|
||||
internal class SimpleAudioClient : AudioClient
|
||||
{
|
||||
internal class VirtualClient : IAudioClient
|
||||
{
|
||||
private readonly SimpleAudioClient _client;
|
||||
|
||||
ConnectionState IAudioClient.State => _client.VoiceSocket.State;
|
||||
Server IAudioClient.Server => _client.VoiceSocket.Server;
|
||||
Channel IAudioClient.Channel => _client.VoiceSocket.Channel;
|
||||
Stream IAudioClient.OutputStream => _client.OutputStream;
|
||||
|
||||
public VirtualClient(SimpleAudioClient client)
|
||||
{
|
||||
_client = client;
|
||||
}
|
||||
|
||||
Task IAudioClient.Disconnect() => _client.Leave(this);
|
||||
Task IAudioClient.Join(Channel channel) => _client.Join(channel);
|
||||
|
||||
void IAudioClient.Send(byte[] data, int offset, int count) => _client.Send(data, offset, count);
|
||||
void IAudioClient.Clear() => _client.Clear();
|
||||
void IAudioClient.Wait() => _client.Wait();
|
||||
}
|
||||
|
||||
private readonly AsyncLock _connectionLock;
|
||||
|
||||
internal VirtualClient CurrentClient { get; private set; }
|
||||
|
||||
public SimpleAudioClient(AudioService service, int id, Logger logger)
|
||||
: base(service, id, null, service.Client.GatewaySocket, logger)
|
||||
{
|
||||
_connectionLock = new AsyncLock();
|
||||
}
|
||||
|
||||
//Only disconnects if is current a member of this server
|
||||
public async Task Leave(VirtualClient client)
|
||||
{
|
||||
using (await _connectionLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
if (CurrentClient == client)
|
||||
{
|
||||
CurrentClient = null;
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task<IAudioClient> Connect(Channel channel, bool connectGateway)
|
||||
{
|
||||
using (await _connectionLock.LockAsync().ConfigureAwait(false))
|
||||
{
|
||||
bool changeServer = channel.Server != VoiceSocket.Server;
|
||||
if (changeServer || CurrentClient == null)
|
||||
{
|
||||
await Disconnect().ConfigureAwait(false);
|
||||
CurrentClient = new VirtualClient(this);
|
||||
VoiceSocket.Server = channel.Server;
|
||||
await Connect(connectGateway).ConfigureAwait(false);
|
||||
}
|
||||
await Join(channel).ConfigureAwait(false);
|
||||
return CurrentClient;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
29
src/Discord.Net.Audio/VirtualClient.cs
Normal file
29
src/Discord.Net.Audio/VirtualClient.cs
Normal file
@@ -0,0 +1,29 @@
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Discord.Audio
|
||||
{
|
||||
internal class VirtualClient : IAudioClient
|
||||
{
|
||||
private readonly AudioClient _client;
|
||||
|
||||
public Server Server { get; }
|
||||
|
||||
public ConnectionState State => _client.VoiceSocket.Server == Server ? _client.VoiceSocket.State : ConnectionState.Disconnected;
|
||||
public Channel Channel => _client.VoiceSocket.Server == Server ? _client.VoiceSocket.Channel : null;
|
||||
public Stream OutputStream => _client.VoiceSocket.Server == Server ? _client.OutputStream : null;
|
||||
|
||||
public VirtualClient(AudioClient client, Server server)
|
||||
{
|
||||
_client = client;
|
||||
Server = server;
|
||||
}
|
||||
|
||||
public Task Disconnect() => _client.Service.Leave(Server);
|
||||
public Task Join(Channel channel) => _client.Join(channel);
|
||||
|
||||
public void Send(byte[] data, int offset, int count) => _client.Send(data, offset, count);
|
||||
public void Clear() => _client.Clear();
|
||||
public void Wait() => _client.Wait();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user