Fixed the voice things!

This commit is contained in:
Brandon Smith
2015-08-24 18:20:45 -03:00
parent 74517d0ef3
commit 7e5813dd94
2 changed files with 61 additions and 78 deletions

View File

@@ -2,7 +2,7 @@
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using System; using System;
using System.Collections.Generic; using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
@@ -36,7 +36,7 @@ namespace Discord
#if !DNXCORE50 #if !DNXCORE50
private OpusEncoder _encoder; private OpusEncoder _encoder;
private Queue<Packet> _sendQueue; private ConcurrentQueue<Packet> _sendQueue;
private UdpClient _udp; private UdpClient _udp;
private IPEndPoint _endpoint; private IPEndPoint _endpoint;
private bool _isReady; private bool _isReady;
@@ -51,7 +51,7 @@ namespace Discord
{ {
_connectWaitOnLogin = new ManualResetEventSlim(false); _connectWaitOnLogin = new ManualResetEventSlim(false);
#if !DNXCORE50 #if !DNXCORE50
_sendQueue = new Queue<Packet>(); _sendQueue = new ConcurrentQueue<Packet>();
_encoder = new OpusEncoder(48000, 1, 20, Application.Audio); _encoder = new OpusEncoder(48000, 1, 20, Application.Audio);
#endif #endif
} }
@@ -61,6 +61,7 @@ namespace Discord
{ {
_udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0)); _udp = new UdpClient(new IPEndPoint(IPAddress.Any, 0));
_udp.AllowNatTraversal(true); _udp.AllowNatTraversal(true);
_isReady = false;
} }
protected override void OnDisconnect() protected override void OnDisconnect()
{ {
@@ -68,16 +69,16 @@ namespace Discord
} }
#endif #endif
protected override Task[] CreateTasks(CancellationToken cancelToken) protected override Task[] CreateTasks()
{ {
return new Task[] return new Task[]
{ {
#if !DNXCORE50 #if !DNXCORE50
Task.Factory.StartNew(ReceiveAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result, ReceiveAsync(),
Task.Factory.StartNew(SendAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result, SendAsync(),
#endif #endif
Task.Factory.StartNew(WatcherAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result WatcherAsync()
}.Concat(base.CreateTasks(cancelToken)).ToArray(); }.Concat(base.CreateTasks()).ToArray();
} }
public async Task Login(string serverId, string userId, string sessionId, string token) public async Task Login(string serverId, string userId, string sessionId, string token)
@@ -124,24 +125,9 @@ namespace Discord
private async Task SendAsync() private async Task SendAsync()
{ {
var cancelToken = _disconnectToken.Token; var cancelToken = _disconnectToken.Token;
Packet packet;
try try
{ {
while (!cancelToken.IsCancellationRequested && !_isReady)
{
lock (_sendQueue)
{
while (_sendQueue.Count > 0)
{
var packet = _sendQueue.Dequeue();
_udp.Send(packet.Data, packet.Count);
}
}
await Task.Delay(_sendInterval);
}
if (cancelToken.IsCancellationRequested)
return;
uint timestamp = 0; uint timestamp = 0;
double nextTicks = 0.0; double nextTicks = 0.0;
double ticksPerFrame = Stopwatch.Frequency / 1000.0 * _encoder.FrameLength; double ticksPerFrame = Stopwatch.Frequency / 1000.0 * _encoder.FrameLength;
@@ -157,15 +143,12 @@ namespace Discord
rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF); rtpPacket[10] = (byte)((_ssrc >> 8) & 0xFF);
rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF); rtpPacket[11] = (byte)((_ssrc >> 0) & 0xFF);
if (sw.ElapsedTicks > nextTicks) if (_isReady && sw.ElapsedTicks > nextTicks)
{
lock (_sendQueue)
{ {
while (sw.ElapsedTicks > nextTicks) while (sw.ElapsedTicks > nextTicks)
{ {
if (_sendQueue.Count > 0) if (_sendQueue.TryDequeue(out packet))
{ {
var packet = _sendQueue.Dequeue();
ushort sequence = unchecked(_sequence++); ushort sequence = unchecked(_sequence++);
rtpPacket[2] = (byte)((sequence >> 8) & 0xFF); rtpPacket[2] = (byte)((sequence >> 8) & 0xFF);
rtpPacket[3] = (byte)((sequence >> 0) & 0xFF); rtpPacket[3] = (byte)((sequence >> 0) & 0xFF);
@@ -174,21 +157,21 @@ namespace Discord
rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF); rtpPacket[6] = (byte)((timestamp >> 8) & 0xFF);
rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF); rtpPacket[7] = (byte)((timestamp >> 0) & 0xFF);
Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count); Buffer.BlockCopy(packet.Data, 0, rtpPacket, 12, packet.Count);
_udp.Send(rtpPacket, packet.Count + 12); await _udp.SendAsync(rtpPacket, packet.Count + 12);
} }
timestamp = unchecked(timestamp + samplesPerFrame); timestamp = unchecked(timestamp + samplesPerFrame);
nextTicks += ticksPerFrame; nextTicks += ticksPerFrame;
} }
} }
} else
/*else await Task.Delay(1);
await Task.Delay(1);*/
} }
} }
catch { } catch { }
finally { _disconnectToken.Cancel(); } finally { _disconnectToken.Cancel(); }
} }
#endif #endif
//Closes the UDP socket when _disconnectToken is triggered, since UDPClient doesn't allow passing a canceltoken
private async Task WatcherAsync() private async Task WatcherAsync()
{ {
try try
@@ -212,6 +195,10 @@ namespace Discord
{ {
case 2: //READY case 2: //READY
{ {
#if !DNXCORE50
if (!_isReady)
{
#endif
var payload = (msg.Payload as JToken).ToObject<VoiceWebSocketEvents.Ready>(); var payload = (msg.Payload as JToken).ToObject<VoiceWebSocketEvents.Ready>();
_heartbeatInterval = payload.HeartbeatInterval; _heartbeatInterval = payload.HeartbeatInterval;
_ssrc = payload.SSRC; _ssrc = payload.SSRC;
@@ -221,10 +208,9 @@ namespace Discord
_mode = "plain"; _mode = "plain";
_udp.Connect(_endpoint); _udp.Connect(_endpoint);
lock (_rand)
_sequence = (ushort)_rand.Next(0, ushort.MaxValue); _sequence = (ushort)_rand.Next(0, ushort.MaxValue);
_isReady = false; //No thread issue here because SendAsync doesn't start until _isReady is true
_sendQueue.Enqueue(new Packet(new byte[70] { _udp.Send(new byte[70] {
(byte)((_ssrc >> 24) & 0xFF), (byte)((_ssrc >> 24) & 0xFF),
(byte)((_ssrc >> 16) & 0xFF), (byte)((_ssrc >> 16) & 0xFF),
(byte)((_ssrc >> 8) & 0xFF), (byte)((_ssrc >> 8) & 0xFF),
@@ -233,8 +219,8 @@ namespace Discord
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0 0x0, 0x0, 0x0, 0x0, 0x0, 0x0 }, 70);
}, 70)); }
#else #else
_connectWaitOnLogin.Set(); _connectWaitOnLogin.Set();
#endif #endif
@@ -275,6 +261,7 @@ namespace Discord
_myIp = Encoding.ASCII.GetString(buffer, 4, 70 - 6).TrimEnd('\0'); _myIp = Encoding.ASCII.GetString(buffer, 4, 70 - 6).TrimEnd('\0');
_isReady = true;
var login2 = new VoiceWebSocketCommands.Login2(); var login2 = new VoiceWebSocketCommands.Login2();
login2.Payload.Protocol = "udp"; login2.Payload.Protocol = "udp";
login2.Payload.SocketData.Address = _myIp; login2.Payload.SocketData.Address = _myIp;

View File

@@ -53,8 +53,7 @@ namespace Discord
OnConnect(); OnConnect();
_lastHeartbeat = DateTime.UtcNow; _lastHeartbeat = DateTime.UtcNow;
_tasks = Task.WhenAll(CreateTasks(cancelToken)) _tasks = Task.Factory.ContinueWhenAll(CreateTasks(), x =>
.ContinueWith(x =>
{ {
//Do not clean up until both tasks have ended //Do not clean up until both tasks have ended
_heartbeatInterval = 0; _heartbeatInterval = 0;
@@ -96,12 +95,12 @@ namespace Discord
RaiseConnected(); RaiseConnected();
} }
protected virtual Task[] CreateTasks(CancellationToken cancelToken) protected virtual Task[] CreateTasks()
{ {
return new Task[] return new Task[]
{ {
Task.Factory.StartNew(ReceiveAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result, ReceiveAsync(),
Task.Factory.StartNew(SendAsync, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result SendAsync()
}; };
} }
@@ -132,7 +131,6 @@ namespace Discord
while (!result.EndOfMessage); while (!result.EndOfMessage);
//TODO: Remove this //TODO: Remove this
if (this is DiscordVoiceSocket)
System.Diagnostics.Debug.WriteLine(">>> " + builder.ToString()); System.Diagnostics.Debug.WriteLine(">>> " + builder.ToString());
await ProcessMessage(builder.ToString()); await ProcessMessage(builder.ToString());
@@ -161,7 +159,7 @@ namespace Discord
} }
while (_sendQueue.TryDequeue(out bytes)) while (_sendQueue.TryDequeue(out bytes))
await SendMessage(bytes, cancelToken); await SendMessage(bytes, cancelToken);
await Task.Delay(_sendInterval); await Task.Delay(_sendInterval, cancelToken);
} }
} }
catch { } catch { }
@@ -174,7 +172,6 @@ namespace Discord
protected void QueueMessage(object message) protected void QueueMessage(object message)
{ {
//TODO: Remove this //TODO: Remove this
if (this is DiscordVoiceSocket)
System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message)); System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message));
var bytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); var bytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
_sendQueue.Enqueue(bytes); _sendQueue.Enqueue(bytes);
@@ -182,7 +179,6 @@ namespace Discord
protected Task SendMessage(object message, CancellationToken cancelToken) protected Task SendMessage(object message, CancellationToken cancelToken)
{ {
//TODO: Remove this //TODO: Remove this
if (this is DiscordVoiceSocket)
System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message)); System.Diagnostics.Debug.WriteLine("<<< " + JsonConvert.SerializeObject(message));
return SendMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), cancelToken); return SendMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), cancelToken);
} }