Added a limit to BufferedAudioTarget's internal buffer.

This commit is contained in:
RogueException
2016-12-31 20:07:20 -04:00
parent c34841c53d
commit 58cb8cfb20
4 changed files with 82 additions and 47 deletions

View File

@@ -13,7 +13,7 @@ using System.Threading.Tasks;
namespace Discord.Audio
{
public class AudioClient : IAudioClient, IDisposable
internal class AudioClient : IAudioClient, IDisposable
{
public event Func<Task> Connected
{
@@ -74,7 +74,7 @@ namespace Discord.Audio
ApiClient.SentGatewayMessage += async opCode => await _audioLogger.DebugAsync($"Sent {opCode}").ConfigureAwait(false);
ApiClient.SentDiscovery += async () => await _audioLogger.DebugAsync($"Sent Discovery").ConfigureAwait(false);
ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
//ApiClient.SentData += async bytes => await _audioLogger.DebugAsync($"Sent {bytes} Bytes").ConfigureAwait(false);
ApiClient.ReceivedEvent += ProcessMessageAsync;
ApiClient.ReceivedPacket += ProcessPacketAsync;
ApiClient.Disconnected += async ex =>
@@ -170,10 +170,10 @@ namespace Discord.Audio
await Discord.ApiClient.SendVoiceStateUpdateAsync(Guild.Id, null, false, false).ConfigureAwait(false);
}
public Stream CreateOpusStream(int samplesPerFrame)
public Stream CreateOpusStream(int samplesPerFrame, int bufferMillis)
{
CheckSamplesPerFrame(samplesPerFrame);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _cancelTokenSource.Token);
return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc);
}
public Stream CreateDirectOpusStream(int samplesPerFrame)
@@ -182,13 +182,13 @@ namespace Discord.Audio
var target = new DirectAudioTarget(ApiClient);
return new RTPWriteStream(target, _secretKey, samplesPerFrame, _ssrc);
}
public Stream CreatePCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null)
public Stream CreatePCMStream(int samplesPerFrame, int channels, int? bitrate, int bufferMillis)
{
CheckSamplesPerFrame(samplesPerFrame);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, _cancelTokenSource.Token);
var target = new BufferedAudioTarget(ApiClient, samplesPerFrame, bufferMillis, _cancelTokenSource.Token);
return new OpusEncodeStream(target, _secretKey, channels, samplesPerFrame, _ssrc, bitrate);
}
public Stream CreateDirectPCMStream(int samplesPerFrame, int channels = 2, int? bitrate = null)
public Stream CreateDirectPCMStream(int samplesPerFrame, int channels, int? bitrate)
{
CheckSamplesPerFrame(samplesPerFrame);
var target = new DirectAudioTarget(ApiClient);

View File

@@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -8,62 +7,98 @@ namespace Discord.Audio
{
internal class BufferedAudioTarget : IAudioTarget, IDisposable
{
private static readonly byte[] _silencePacket = new byte[] { 0xF8, 0xFF, 0xFE };
private struct Frame
{
public Frame(byte[] buffer, int bytes)
{
Buffer = buffer;
Bytes = bytes;
}
public readonly byte[] Buffer;
public readonly int Bytes;
}
private static readonly byte[] _silenceFrame = new byte[] { 0xF8, 0xFF, 0xFE };
private Task _task;
private DiscordVoiceAPIClient _client;
private CancellationTokenSource _cancelTokenSource;
private ConcurrentQueue<byte[]> _queue;
private CancellationToken _cancelToken;
private ConcurrentQueue<Frame> _queuedFrames;
private ConcurrentQueue<byte[]> _bufferPool;
private SemaphoreSlim _queueLock;
private int _ticksPerFrame;
internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, CancellationToken cancelToken)
internal BufferedAudioTarget(DiscordVoiceAPIClient client, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken)
{
_client = client;
long ticksPerFrame = samplesPerFrame / 48;
_ticksPerFrame = samplesPerFrame / 48;
int queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up
_cancelTokenSource = new CancellationTokenSource();
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token;
_queue = new ConcurrentQueue<byte[]>(); //TODO: We need a better queue
_cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token;
_queuedFrames = new ConcurrentQueue<Frame>();
_bufferPool = new ConcurrentQueue<byte[]>();
for (int i = 0; i < queueLength; i++)
_bufferPool.Enqueue(new byte[1275]);
_queueLock = new SemaphoreSlim(queueLength, queueLength);
_task = Run(ticksPerFrame, cancelToken);
_task = Run();
}
private Task Run(long ticksPerFrame, CancellationToken cancelToken)
private Task Run()
{
return Task.Run(async () =>
{
long nextTick = Environment.TickCount;
while (!cancelToken.IsCancellationRequested)
try
{
long tick = Environment.TickCount;
long dist = nextTick - tick;
if (dist <= 0)
long nextTick = Environment.TickCount;
while (!_cancelToken.IsCancellationRequested)
{
byte[] buffer;
if (_queue.TryDequeue(out buffer))
await _client.SendAsync(buffer, buffer.Length).ConfigureAwait(false);
else
await _client.SendAsync(_silencePacket, _silencePacket.Length).ConfigureAwait(false);
nextTick += ticksPerFrame;
long tick = Environment.TickCount;
long dist = nextTick - tick;
if (dist <= 0)
{
Frame frame;
if (_queuedFrames.TryDequeue(out frame))
{
#if NETSTANDARD1_3
Console.WriteLine("Pop");
#endif
await _client.SendAsync(frame.Buffer, frame.Bytes).ConfigureAwait(false);
_bufferPool.Enqueue(frame.Buffer);
_queueLock.Release();
}
else
await _client.SendAsync(_silenceFrame, _silenceFrame.Length).ConfigureAwait(false);
nextTick += _ticksPerFrame;
}
else if (dist > 1)
await Task.Delay((int)dist).ConfigureAwait(false);
}
else if (dist > 1)
await Task.Delay((int)dist).ConfigureAwait(false);
}
catch (OperationCanceledException) { }
});
}
public Task SendAsync(byte[] buffer, int count)
public async Task SendAsync(byte[] data, int count)
{
byte[] newBuffer = new byte[count];
Buffer.BlockCopy(buffer, 0, newBuffer, 0, count);
_queue.Enqueue(newBuffer);
return Task.Delay(0);
await _queueLock.WaitAsync(-1, _cancelToken).ConfigureAwait(false);
#if NETSTANDARD1_3
Console.WriteLine("Push");
#endif
byte[] buffer;
_bufferPool.TryDequeue(out buffer);
Buffer.BlockCopy(data, 0, buffer, 0, count);
_queuedFrames.Enqueue(new Frame(buffer, count));
}
public async Task FlushAsync()
{
while (true)
{
if (_queue.Count == 0)
if (_queuedFrames.Count == 0)
return;
await Task.Delay(250).ConfigureAwait(false);
}