Cleaned up audio code

This commit is contained in:
RogueException
2017-02-26 10:57:28 -04:00
parent 2db60749ca
commit 8e0c65498b
21 changed files with 495 additions and 276 deletions

View File

@@ -0,0 +1,156 @@
using Discord.Logging;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Audio.Streams
{
///<summary> Wraps another stream with a timed buffer. </summary>
public class BufferedWriteStream : AudioOutStream
{
private struct Frame
{
public Frame(byte[] buffer, int bytes)
{
Buffer = buffer;
Bytes = bytes;
}
public readonly byte[] Buffer;
public readonly int Bytes;
}
private static readonly byte[] _silenceFrame = new byte[0];
private readonly AudioOutStream _next;
private readonly CancellationTokenSource _cancelTokenSource;
private readonly CancellationToken _cancelToken;
private readonly Task _task;
private readonly ConcurrentQueue<Frame> _queuedFrames;
private readonly ConcurrentQueue<byte[]> _bufferPool;
private readonly SemaphoreSlim _queueLock;
private readonly Logger _logger;
private readonly int _ticksPerFrame, _queueLength;
private bool _isPreloaded;
internal BufferedWriteStream(AudioOutStream next, int samplesPerFrame, int bufferMillis, CancellationToken cancelToken, Logger logger, int maxFrameSize = 1500)
{
//maxFrameSize = 1275 was too limiting at 128*1024
_next = next;
_ticksPerFrame = samplesPerFrame / 48;
_logger = logger;
_queueLength = (bufferMillis + (_ticksPerFrame - 1)) / _ticksPerFrame; //Round up
_cancelTokenSource = new CancellationTokenSource();
_cancelToken = CancellationTokenSource.CreateLinkedTokenSource(_cancelTokenSource.Token, cancelToken).Token;
_queuedFrames = new ConcurrentQueue<Frame>();
_bufferPool = new ConcurrentQueue<byte[]>();
for (int i = 0; i < _queueLength; i++)
_bufferPool.Enqueue(new byte[maxFrameSize]);
_queueLock = new SemaphoreSlim(_queueLength, _queueLength);
_task = Run();
}
private Task Run()
{
uint num = 0;
return Task.Run(async () =>
{
try
{
while (!_isPreloaded && !_cancelToken.IsCancellationRequested)
await Task.Delay(1).ConfigureAwait(false);
long nextTick = Environment.TickCount;
while (!_cancelToken.IsCancellationRequested)
{
const int limit = 1;
long tick = Environment.TickCount;
long dist = nextTick - tick;
if (dist <= limit)
{
Frame frame;
if (_queuedFrames.TryDequeue(out frame))
{
await _next.WriteAsync(frame.Buffer, 0, frame.Bytes).ConfigureAwait(false);
_bufferPool.Enqueue(frame.Buffer);
_queueLock.Release();
nextTick += _ticksPerFrame;
#if DEBUG
var _ = _logger.DebugAsync($"{num++}: Sent {frame.Bytes} bytes ({_queuedFrames.Count} frames buffered)");
#endif
}
else if (dist == 0)
{
await _next.WriteAsync(_silenceFrame, 0, _silenceFrame.Length).ConfigureAwait(false);
nextTick += _ticksPerFrame;
#if DEBUG
var _ = _logger.DebugAsync($"{num++}: Buffer underrun");
#endif
}
}
else
await Task.Delay((int)(dist - (limit - 1))).ConfigureAwait(false);
}
}
catch (OperationCanceledException) { }
});
}
public override async Task WriteAsync(byte[] data, int offset, int count, CancellationToken cancelToken)
{
if (cancelToken.CanBeCanceled)
cancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, _cancelToken).Token;
else
cancelToken = _cancelToken;
await _queueLock.WaitAsync(-1, cancelToken).ConfigureAwait(false);
byte[] buffer;
if (!_bufferPool.TryDequeue(out buffer))
{
#if DEBUG
var _ = _logger.DebugAsync($"Buffer overflow"); //Should never happen because of the queueLock
#endif
return;
}
Buffer.BlockCopy(data, offset, buffer, 0, count);
_queuedFrames.Enqueue(new Frame(buffer, count));
#if DEBUG
//var _ await _logger.DebugAsync($"Queued {count} bytes ({_queuedFrames.Count} frames buffered)");
#endif
if (!_isPreloaded && _queuedFrames.Count == _queueLength)
{
#if DEBUG
var _ = _logger.DebugAsync($"Preloaded");
#endif
_isPreloaded = true;
}
}
public override async Task FlushAsync(CancellationToken cancelToken)
{
while (true)
{
cancelToken.ThrowIfCancellationRequested();
if (_queuedFrames.Count == 0)
return;
await Task.Delay(250, cancelToken).ConfigureAwait(false);
}
}
public override Task ClearAsync(CancellationToken cancelToken)
{
Frame ignored;
do
cancelToken.ThrowIfCancellationRequested();
while (_queuedFrames.TryDequeue(out ignored));
return Task.Delay(0);
}
protected override void Dispose(bool disposing)
{
if (disposing)
_cancelTokenSource.Cancel();
}
}
}

View File

@@ -1,22 +1,34 @@
namespace Discord.Audio
using System;
using System.Collections.Concurrent;
namespace Discord.Audio.Streams
{
internal class OpusDecodeStream : RTPReadStream
///<summary> Converts Opus to PCM </summary>
public class OpusDecodeStream : AudioInStream
{
private readonly BlockingCollection<byte[]> _queuedData; //TODO: Replace with max-length ring buffer
private readonly byte[] _buffer;
private readonly OpusDecoder _decoder;
internal OpusDecodeStream(AudioClient audioClient, byte[] secretKey, int samplingRate,
int channels = OpusConverter.MaxChannels, int bufferSize = 4000)
: base(audioClient, secretKey)
internal OpusDecodeStream(AudioClient audioClient, int samplingRate, int channels = OpusConverter.MaxChannels, int bufferSize = 4000)
{
_buffer = new byte[bufferSize];
_decoder = new OpusDecoder(samplingRate, channels);
_queuedData = new BlockingCollection<byte[]>(100);
}
public override int Read(byte[] buffer, int offset, int count)
{
var queuedData = _queuedData.Take();
Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count));
return queuedData.Length;
}
public override void Write(byte[] buffer, int offset, int count)
{
count = _decoder.DecodeFrame(buffer, offset, count, _buffer, 0);
return base.Read(_buffer, 0, count);
var newBuffer = new byte[count];
Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count);
_queuedData.Add(newBuffer);
}
protected override void Dispose(bool disposing)

View File

@@ -2,27 +2,28 @@
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Audio
namespace Discord.Audio.Streams
{
internal class OpusEncodeStream : RTPWriteStream
///<summary> Converts PCM to Opus </summary>
public class OpusEncodeStream : AudioOutStream
{
public const int SampleRate = 48000;
private readonly AudioOutStream _next;
private readonly OpusEncoder _encoder;
private readonly byte[] _buffer;
private int _frameSize;
private byte[] _partialFrameBuffer;
private int _partialFramePos;
private readonly OpusEncoder _encoder;
internal OpusEncodeStream(IAudioTarget target, byte[] secretKey, int channels, int samplesPerFrame, uint ssrc, int? bitrate = null)
: base(target, secretKey, samplesPerFrame, ssrc)
internal OpusEncodeStream(AudioOutStream next, int channels, int samplesPerFrame, int bitrate, AudioApplication application, int bufferSize = 4000)
{
_encoder = new OpusEncoder(SampleRate, channels);
_next = next;
_encoder = new OpusEncoder(SampleRate, channels, bitrate, application);
_frameSize = samplesPerFrame * channels * 2;
_buffer = new byte[bufferSize];
_partialFrameBuffer = new byte[_frameSize];
_encoder.SetForwardErrorCorrection(true);
if (bitrate != null)
_encoder.SetBitrate(bitrate.Value);
}
public override void Write(byte[] buffer, int offset, int count)
@@ -43,7 +44,7 @@ namespace Discord.Audio
_partialFramePos = 0;
int encFrameSize = _encoder.EncodeFrame(_partialFrameBuffer, 0, _frameSize, _buffer, 0);
await base.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false);
await _next.WriteAsync(_buffer, 0, encFrameSize, cancellationToken).ConfigureAwait(false);
}
else
{
@@ -54,10 +55,7 @@ namespace Discord.Audio
}
}
/*public override void Flush()
{
FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
}
/*
public override async Task FlushAsync(CancellationToken cancellationToken)
{
try
@@ -70,6 +68,15 @@ namespace Discord.Audio
await base.FlushAsync(cancellationToken).ConfigureAwait(false);
}*/
public override async Task FlushAsync(CancellationToken cancelToken)
{
await _next.FlushAsync(cancelToken).ConfigureAwait(false);
}
public override async Task ClearAsync(CancellationToken cancelToken)
{
await _next.ClearAsync(cancelToken).ConfigureAwait(false);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

View File

@@ -0,0 +1,23 @@
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Audio.Streams
{
///<summary> Wraps an IAudioClient, sending voice data on write. </summary>
public class OutputStream : AudioOutStream
{
private readonly DiscordVoiceAPIClient _client;
public OutputStream(IAudioClient client)
: this((client as AudioClient).ApiClient) { }
internal OutputStream(DiscordVoiceAPIClient client)
{
_client = client;
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken)
{
cancelToken.ThrowIfCancellationRequested();
await _client.SendAsync(buffer, offset, count).ConfigureAwait(false);
}
}
}

View File

@@ -2,11 +2,13 @@
using System.Collections.Concurrent;
using System.IO;
namespace Discord.Audio
namespace Discord.Audio.Streams
{
internal class RTPReadStream : Stream
///<summary> Reads the payload from an RTP frame </summary>
public class RTPReadStream : AudioInStream
{
private readonly BlockingCollection<byte[]> _queuedData; //TODO: Replace with max-length ring buffer
//private readonly BlockingCollection<RTPFrame> _queuedData; //TODO: Replace with max-length ring buffer
private readonly AudioClient _audioClient;
private readonly byte[] _buffer, _nonce, _secretKey;
@@ -23,6 +25,12 @@ namespace Discord.Audio
_nonce = new byte[24];
}
/*public RTPFrame ReadFrame()
{
var queuedData = _queuedData.Take();
Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count));
return queuedData.Length;
}*/
public override int Read(byte[] buffer, int offset, int count)
{
var queuedData = _queuedData.Take();
@@ -31,10 +39,8 @@ namespace Discord.Audio
}
public override void Write(byte[] buffer, int offset, int count)
{
Buffer.BlockCopy(buffer, 0, _nonce, 0, 12);
count = SecretBox.Decrypt(buffer, offset, count, _buffer, 0, _nonce, _secretKey);
var newBuffer = new byte[count];
Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count);
Buffer.BlockCopy(buffer, 0, newBuffer, 0, count);
_queuedData.Add(newBuffer);
}

View File

@@ -1,33 +1,32 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Audio
namespace Discord.Audio.Streams
{
internal class RTPWriteStream : AudioOutStream
///<summary> Wraps data in an RTP frame </summary>
public class RTPWriteStream : AudioOutStream
{
private readonly IAudioTarget _target;
private readonly byte[] _nonce, _secretKey;
private readonly AudioOutStream _next;
private readonly byte[] _header;
private int _samplesPerFrame;
private uint _ssrc, _timestamp = 0;
protected readonly byte[] _buffer;
internal RTPWriteStream(IAudioTarget target, byte[] secretKey, int samplesPerFrame, uint ssrc)
internal RTPWriteStream(AudioOutStream next, int samplesPerFrame, uint ssrc, int bufferSize = 4000)
{
_target = target;
_secretKey = secretKey;
_next = next;
_samplesPerFrame = samplesPerFrame;
_ssrc = ssrc;
_buffer = new byte[4000];
_nonce = new byte[24];
_nonce[0] = 0x80;
_nonce[1] = 0x78;
_nonce[8] = (byte)(_ssrc >> 24);
_nonce[9] = (byte)(_ssrc >> 16);
_nonce[10] = (byte)(_ssrc >> 8);
_nonce[11] = (byte)(_ssrc >> 0);
_buffer = new byte[bufferSize];
_header = new byte[24];
_header[0] = 0x80;
_header[1] = 0x78;
_header[8] = (byte)(_ssrc >> 24);
_header[9] = (byte)(_ssrc >> 16);
_header[10] = (byte)(_ssrc >> 8);
_header[11] = (byte)(_ssrc >> 0);
}
public override void Write(byte[] buffer, int offset, int count)
@@ -39,48 +38,28 @@ namespace Discord.Audio
cancellationToken.ThrowIfCancellationRequested();
unchecked
{
if (_nonce[3]++ == byte.MaxValue)
_nonce[2]++;
if (_header[3]++ == byte.MaxValue)
_header[2]++;
_timestamp += (uint)_samplesPerFrame;
_nonce[4] = (byte)(_timestamp >> 24);
_nonce[5] = (byte)(_timestamp >> 16);
_nonce[6] = (byte)(_timestamp >> 8);
_nonce[7] = (byte)(_timestamp >> 0);
_header[4] = (byte)(_timestamp >> 24);
_header[5] = (byte)(_timestamp >> 16);
_header[6] = (byte)(_timestamp >> 8);
_header[7] = (byte)(_timestamp >> 0);
}
Buffer.BlockCopy(_header, 0, _buffer, 0, 12); //Copy RTP header from to the buffer
Buffer.BlockCopy(buffer, offset, _buffer, 12, count);
count = SecretBox.Encrypt(buffer, offset, count, _buffer, 12, _nonce, _secretKey);
Buffer.BlockCopy(_nonce, 0, _buffer, 0, 12); //Copy the RTP header from nonce to buffer
await _target.SendAsync(_buffer, count + 12).ConfigureAwait(false);
await _next.WriteAsync(_buffer, 0, count + 12).ConfigureAwait(false);
}
public override void Flush()
public override async Task FlushAsync(CancellationToken cancelToken)
{
FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
}
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await _target.FlushAsync(cancellationToken).ConfigureAwait(false);
}
public override void Clear()
{
ClearAsync(CancellationToken.None).GetAwaiter().GetResult();
await _next.FlushAsync(cancelToken).ConfigureAwait(false);
}
public override async Task ClearAsync(CancellationToken cancelToken)
{
await _target.ClearAsync(cancelToken).ConfigureAwait(false);
await _next.ClearAsync(cancelToken).ConfigureAwait(false);
}
public override long Length { get { throw new NotSupportedException(); } }
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }
public override void SetLength(long value) { throw new NotSupportedException(); }
public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
}
}

View File

@@ -0,0 +1,42 @@
using System;
using System.Collections.Concurrent;
namespace Discord.Audio.Streams
{
///<summary> Decrypts an RTP frame using libsodium </summary>
public class SodiumDecryptStream : AudioInStream
{
private readonly BlockingCollection<byte[]> _queuedData; //TODO: Replace with max-length ring buffer
private readonly AudioClient _audioClient;
private readonly byte[] _buffer, _nonce, _secretKey;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
internal SodiumDecryptStream(AudioClient audioClient, byte[] secretKey, int bufferSize = 4000)
{
_audioClient = audioClient;
_secretKey = secretKey;
_buffer = new byte[bufferSize];
_queuedData = new BlockingCollection<byte[]>(100);
_nonce = new byte[24];
}
public override int Read(byte[] buffer, int offset, int count)
{
var queuedData = _queuedData.Take();
Buffer.BlockCopy(queuedData, 0, buffer, offset, Math.Min(queuedData.Length, count));
return queuedData.Length;
}
public override void Write(byte[] buffer, int offset, int count)
{
Buffer.BlockCopy(buffer, 0, _nonce, 0, 12); //Copy RTP header to nonce
count = SecretBox.Decrypt(buffer, offset, count, _buffer, 0, _nonce, _secretKey);
var newBuffer = new byte[count];
Buffer.BlockCopy(_buffer, 0, newBuffer, 0, count);
_queuedData.Add(newBuffer);
}
}
}

View File

@@ -0,0 +1,45 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Discord.Audio.Streams
{
///<summary> Encrypts an RTP frame using libsodium </summary>
public class SodiumEncryptStream : AudioOutStream
{
private readonly AudioOutStream _next;
private readonly byte[] _nonce, _secretKey;
//protected readonly byte[] _buffer;
internal SodiumEncryptStream(AudioOutStream next, byte[] secretKey/*, int bufferSize = 4000*/)
{
_next = next;
_secretKey = secretKey;
//_buffer = new byte[bufferSize]; //TODO: Can Sodium do an in-place encrypt?
_nonce = new byte[24];
}
public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Buffer.BlockCopy(buffer, offset, _nonce, 0, 12); //Copy nonce from RTP header
count = SecretBox.Encrypt(buffer, offset + 12, count - 12, buffer, 12, _nonce, _secretKey);
await _next.WriteAsync(buffer, 0, count + 12, cancellationToken).ConfigureAwait(false);
}
public override async Task FlushAsync(CancellationToken cancelToken)
{
await _next.FlushAsync(cancelToken).ConfigureAwait(false);
}
public override async Task ClearAsync(CancellationToken cancelToken)
{
await _next.ClearAsync(cancelToken).ConfigureAwait(false);
}
}
}