1
0
Fork 0
mirror of https://github.com/beefytech/Beef.git synced 2025-06-10 12:32:20 +02:00
Beef/BeefLibs/corlib/src/Diagnostics/AsyncStreamReader.bf

301 lines
7.6 KiB
Beef

using System.IO;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace System.Diagnostics
{
internal delegate void UserCallBack(String data);
class AsyncStreamReader
{
internal const int32 DefaultBufferSize = 1024; // Byte buffer size
private const int32 MinBufferSize = 128;
private Stream stream;
private Encoding encoding;
private Decoder decoder;
private uint8[] byteBuffer;
private char8[] char8Buffer;
// Record the number of valid bytes in the byteBuffer, for a few checks.
// This is the maximum number of char8s we can get from one call to
// ReadBuffer. Used so ReadBuffer can tell when to copy data into
// a user's char8[] directly, instead of our internal char8[].
private int32 mMaxCharsPerBuffer;
// Store a backpointer to the process class, to check for user callbacks
private Process process;
// Delegate to call user function.
private UserCallBack userCallBack;
// Internal Cancel operation
private bool cancelOperation;
private WaitEvent eofEvent = new WaitEvent() ~ delete _;
private Queue<String> messageQueue ~ delete _;
private String sb;
private bool bLastCarriageReturn;
private Monitor mMonitor = new Monitor();
// Cache the last position scanned in sb when searching for lines.
private int currentLinePos;
internal this(Process process, Stream stream, UserCallBack callback, Encoding encoding)
: this(process, stream, callback, encoding, DefaultBufferSize)
{
}
internal ~this()
{
for (var msg in messageQueue)
delete msg;
Close();
}
// Creates a new AsyncStreamReader for the given stream. The
// character encoding is set by encoding and the buffer size,
// in number of 16-bit characters, is set by bufferSize.
//
internal this(Process process, Stream stream, UserCallBack callback, Encoding encoding, int32 bufferSize)
{
Debug.Assert(process != null && stream != null && encoding != null && callback != null, "Invalid arguments!");
Debug.Assert(stream.CanRead, "Stream must be readable!");
Debug.Assert(bufferSize > 0, "Invalid buffer size!");
Init(process, stream, callback, encoding, bufferSize);
messageQueue = new Queue<String>();
}
public virtual Encoding CurrentEncoding
{
get { return encoding; }
}
public virtual Stream BaseStream
{
get { return stream; }
}
private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int32 bufferSize)
{
this.process = process;
this.stream = stream;
this.encoding = encoding;
this.userCallBack = callback;
int32 useBufferSize = bufferSize;
if (useBufferSize < MinBufferSize) useBufferSize = MinBufferSize;
byteBuffer = new uint8[useBufferSize];
mMaxCharsPerBuffer = (int32)encoding.GetMaxCharCount(useBufferSize);
char8Buffer = new char8[mMaxCharsPerBuffer];
cancelOperation = false;
sb = null;
this.bLastCarriageReturn = false;
}
public virtual void Close()
{
Dispose(true);
}
void Dispose(bool disposing)
{
if (disposing)
{
if (stream != null)
stream.Close();
}
if (stream != null)
{
stream = null;
encoding = null;
decoder = null;
byteBuffer = null;
char8Buffer = null;
}
}
// User calls BeginRead to start the asynchronous read
internal void BeginReadLine()
{
if (cancelOperation)
{
cancelOperation = false;
}
if (sb == null)
{
sb = new String(DefaultBufferSize);
stream.BeginRead(byteBuffer, 0, byteBuffer.Count, new => ReadBuffer, null);
}
else
{
FlushMessageQueue();
}
}
internal void CancelOperation()
{
cancelOperation = true;
}
// This is the async callback function. Only one thread could/should call this.
private void ReadBuffer(IAsyncResult ar)
{
int byteLen = stream.EndRead(ar);
// We should ideally consume errors from operations getting cancelled
// so that we don't crash the unsuspecting parent with an unhandled exc.
// This seems to come in 2 forms of exceptions (depending on platform and scenario),
// namely OperationCanceledException and IOException (for errorcode that we don't
// map explicitly).
byteLen = 0; // Treat this as EOF
if (byteLen == 0)
{
// We're at EOF, we won't call this function again from here on.
using (mMonitor.Enter())
{
if (sb.Length != 0)
{
messageQueue.Enqueue(new String(sb));
sb.Clear();
}
messageQueue.Enqueue(null);
}
// UserCallback could throw, we should still set the eofEvent
FlushMessageQueue();
eofEvent.Set(true);
}
else
{
int char8Len = decoder.GetChars(byteBuffer, 0, byteLen, char8Buffer, 0);
sb.Append(char8Buffer, 0, char8Len);
GetLinesFromStringBuilder();
stream.BeginRead(byteBuffer, 0, byteBuffer.Count, new => ReadBuffer, null);
}
}
// Read lines stored in StringBuilder and the buffer we just read into.
// A line is defined as a sequence of characters followed by
// a carriage return ('\r'), a line feed ('\n'), or a carriage return
// immediately followed by a line feed. The resulting string does not
// contain the terminating carriage return and/or line feed. The returned
// value is null if the end of the input stream has been reached.
//
private void GetLinesFromStringBuilder()
{
int currentIndex = currentLinePos;
int lineStart = 0;
int len = sb.Length;
// skip a beginning '\n' char8acter of new block if last block ended
// with '\r'
if (bLastCarriageReturn && (len > 0) && sb[0] == '\n')
{
currentIndex = 1;
lineStart = 1;
bLastCarriageReturn = false;
}
while (currentIndex < len)
{
char8 ch = sb[currentIndex];
// Note the following common line feed char8s:
// \n - UNIX \r\n - DOS \r - Mac
if (ch == '\r' || ch == '\n')
{
String s = new String();
s.Append(sb, lineStart, currentIndex - lineStart);
lineStart = currentIndex + 1;
// skip the "\n" char8acter following "\r" char8acter
if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n'))
{
lineStart++;
currentIndex++;
}
using (mMonitor.Enter())
{
messageQueue.Enqueue(s);
}
}
currentIndex++;
}
if (sb[len - 1] == '\r')
{
bLastCarriageReturn = true;
}
// Keep the rest char8acaters which can't form a new line in string builder.
if (lineStart < len)
{
if (lineStart == 0)
{
// we found no breaklines, in this case we cache the position
// so next time we don't have to restart from the beginning
currentLinePos = currentIndex;
}
else
{
sb.Remove(0, lineStart);
currentLinePos = 0;
}
}
else
{
sb.Clear();
currentLinePos = 0;
}
FlushMessageQueue();
}
private void FlushMessageQueue()
{
while (true)
{
// When we call BeginReadLine, we also need to flush the queue
// So there could be a ---- between the ReadBuffer and BeginReadLine
// We need to take lock before DeQueue.
if (messageQueue.Count > 0)
{
using (mMonitor.Enter())
{
if (messageQueue.Count > 0)
{
String s = messageQueue.Dequeue();
// skip if the read is the read is cancelled
// this might happen inside UserCallBack
// However, continue to drain the queue
if (!cancelOperation)
{
userCallBack(s);
}
delete s;
}
}
}
else
{
break;
}
}
}
// Wait until we hit EOF. This is called from Process.WaitForExit
// We will lose some information if we don't do this.
internal void WaitUtilEOF()
{
if (eofEvent != null)
{
eofEvent.WaitFor();
}
}
}
}