1
0
Fork 0
mirror of https://github.com/beefytech/Beef.git synced 2025-07-05 15:56:00 +02:00

Async reading improvements

This commit is contained in:
Brian Fiete 2025-04-05 12:01:38 -04:00
parent ef0bc6033b
commit 42ba7d3df5

View file

@ -553,18 +553,36 @@ struct BfpOverlappedFile : BfpOverlapped
} }
}; };
struct OverlappedReadResult : OVERLAPPED
{
BfpFile* mFile;
intptr mBytesRead;
DWORD mErrorCode;
Array<uint8> mData;
int mRefCount;
void* GetPtr()
{
return mData.mVals;
}
};
struct BfpAsyncData struct BfpAsyncData
{ {
Array<uint8> mQueuedData;
HANDLE mEvent; HANDLE mEvent;
int mOverlappedReadCount;
BfpAsyncData() BfpAsyncData()
{ {
mOverlappedReadCount = 0;
mEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL); mEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
} }
~BfpAsyncData() ~BfpAsyncData()
{ {
::CloseHandle(mEvent); ::CloseHandle(mEvent);
BF_ASSERT(mOverlappedReadCount == 0);
} }
void SetEvent() void SetEvent()
@ -587,6 +605,111 @@ struct BfpAsyncData
return false; return false;
} }
} }
void Release(OverlappedReadResult* readResult)
{
AutoCrit autoCrit(gBfpCritSect);
BF_ASSERT((readResult->mRefCount == 1) || (readResult->mRefCount == 2));
readResult->mRefCount--;
if (readResult->mRefCount == 0)
{
BF_ASSERT(readResult->mData.IsEmpty());
delete readResult;
mOverlappedReadCount--;
}
}
int ReadQueued(void* buffer, int size)
{
AutoCrit autoCrit(gBfpCritSect);
if (mQueuedData.mSize == 0)
return 0;
int readSize = BF_MIN(size, mQueuedData.mSize);
memcpy(buffer, mQueuedData.mVals, readSize);
mQueuedData.RemoveRange(0, readSize);
return readSize;
}
void HandleResult(OverlappedReadResult* readResult, uint32 errorCode, uint32 bytesRead)
{
AutoCrit autoCrit(gBfpCritSect);
if (readResult->mRefCount == 2) // Only if we are still waiting
{
readResult->mErrorCode = errorCode;
readResult->mBytesRead = bytesRead;
SetEvent();
}
else
{
mQueuedData.Insert(mQueuedData.mSize, (uint8*)readResult->GetPtr(), bytesRead);
readResult->mData.Clear();
}
Release(readResult);
}
int FinishRead(OverlappedReadResult* readResult, void* buffer, int size, DWORD& errorCode)
{
AutoCrit autoCrit(gBfpCritSect);
if (readResult->mRefCount == 2)
{
Release(readResult);
return -2; // Still executing
}
errorCode = readResult->mErrorCode;
if (errorCode != 0)
{
readResult->mData.Clear();
Release(readResult);
return -1;
}
BF_ASSERT(size >= readResult->mBytesRead);
memcpy(buffer, readResult->GetPtr(), readResult->mBytesRead);
int bytesRead = (int)readResult->mBytesRead;
readResult->mData.Clear();
Release(readResult);
return bytesRead;
}
OverlappedReadResult* AllocBuffer(int size)
{
AutoCrit autoCrit(gBfpCritSect);
OverlappedReadResult* readResult = new OverlappedReadResult();
memset(readResult, 0, sizeof(OverlappedReadResult));
readResult->mErrorCode = -1;
readResult->mData.Resize(size);
readResult->mRefCount = 2;
mOverlappedReadCount++;
return readResult;
}
};
struct BfOverlappedReleaser
{
BfpAsyncData* mAsyncData;
OverlappedReadResult* mOverlapped;
BfOverlappedReleaser()
{
mAsyncData = NULL;
mOverlapped = NULL;
}
BfOverlappedReleaser(BfpAsyncData* asyncData, OverlappedReadResult* overlapped)
{
mAsyncData = asyncData;
mOverlapped = overlapped;
}
~BfOverlappedReleaser()
{
if (mOverlapped != NULL)
mAsyncData->Release(mOverlapped);
}
}; };
struct BfpFile struct BfpFile
@ -618,6 +741,12 @@ struct BfpFile
} }
}; };
static void WINAPI OverlappedReadComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{
OverlappedReadResult* readResult = (OverlappedReadResult*)lpOverlapped;
readResult->mFile->mAsyncData->HandleResult(readResult, dwErrorCode, dwNumberOfBytesTransfered);
}
struct BfpFileWatcher : public BfpOverlapped struct BfpFileWatcher : public BfpOverlapped
{ {
String mPath; String mPath;
@ -3142,21 +3271,6 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Write(BfpFile* file, const void* buffer,
return bytesWritten; return bytesWritten;
} }
struct OverlappedReadResult : OVERLAPPED
{
BfpFile* mFile;
intptr mBytesRead;
DWORD mErrorCode;
};
static void WINAPI OverlappedReadComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{
OverlappedReadResult* readResult = (OverlappedReadResult*)lpOverlapped;
readResult->mErrorCode = dwErrorCode;
readResult->mBytesRead = dwNumberOfBytesTransfered;
readResult->mFile->mAsyncData->SetEvent();
}
BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr size, int timeoutMS, BfpFileResult* outResult) BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr size, int timeoutMS, BfpFileResult* outResult)
{ {
bool forceNormalRead = false; bool forceNormalRead = false;
@ -3218,30 +3332,50 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
while (true) while (true)
{ {
OverlappedReadResult overlapped; int readSize = file->mAsyncData->ReadQueued(buffer, (int)size);
memset(&overlapped, 0, sizeof(OverlappedReadResult)); if (readSize > 0)
overlapped.mFile = file; {
OUTRESULT(BfpFileResult_Ok);
return readSize;
}
OverlappedReadResult* overlapped = file->mAsyncData->AllocBuffer((int)size);
overlapped->mFile = file;
BfOverlappedReleaser overlappedReleaser(file->mAsyncData, overlapped);
//TODO: this doesn't set file stream location. It only works for streams like pipes, sockets, etc //TODO: this doesn't set file stream location. It only works for streams like pipes, sockets, etc
if (::ReadFileEx(file->mHandle, buffer, (uint32)size, &overlapped, OverlappedReadComplete)) if (::ReadFileEx(file->mHandle, overlapped->GetPtr(), (uint32)size, overlapped, OverlappedReadComplete))
{ {
if (!file->mAsyncData->WaitAndResetEvent(timeoutMS)) file->mAsyncData->WaitAndResetEvent(timeoutMS);
DWORD errorCode = 0;
int readResult = file->mAsyncData->FinishRead(overlapped, buffer, (int)size, errorCode);
overlappedReleaser.mOverlapped = NULL;
if (readResult != -2)
{ {
::CancelIoEx(file->mHandle, &overlapped); if (errorCode == 0)
// There's a chance we completed before we were cancelled -- check on that {
if (!file->mAsyncData->WaitAndResetEvent(0)) OUTRESULT(BfpFileResult_Ok);
return readResult;
}
else if (errorCode == ERROR_OPERATION_ABORTED)
{ {
OUTRESULT(BfpFileResult_Timeout); OUTRESULT(BfpFileResult_Timeout);
return 0; return 0;
} }
else
{
OUTRESULT(BfpFileResult_Ok);
return 0;
}
} }
else
if (overlapped.mErrorCode == 0)
{ {
::CancelIoEx(file->mHandle, overlapped);
// Clear event set by CancelIoEx
file->mAsyncData->WaitAndResetEvent(0);
}
else if (overlapped.mErrorCode == ERROR_OPERATION_ABORTED)
{
OUTRESULT(BfpFileResult_Timeout); OUTRESULT(BfpFileResult_Timeout);
return 0; return 0;
} }
@ -3251,15 +3385,15 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
int lastError = ::GetLastError(); int lastError = ::GetLastError();
if (lastError == ERROR_PIPE_LISTENING) if (lastError == ERROR_PIPE_LISTENING)
{ {
overlapped.hEvent = file->mAsyncData->mEvent; overlapped->hEvent = file->mAsyncData->mEvent;
if (!::ConnectNamedPipe(file->mHandle, &overlapped)) if (!::ConnectNamedPipe(file->mHandle, overlapped))
{ {
int lastError = ::GetLastError(); int lastError = ::GetLastError();
if (lastError == ERROR_IO_PENDING) if (lastError == ERROR_IO_PENDING)
{ {
if (!file->mAsyncData->WaitAndResetEvent(timeoutMS)) if (!file->mAsyncData->WaitAndResetEvent(timeoutMS))
{ {
::CancelIoEx(file->mHandle, &overlapped); ::CancelIoEx(file->mHandle, overlapped);
// Clear event set by CancelIoEx // Clear event set by CancelIoEx
file->mAsyncData->WaitAndResetEvent(0); file->mAsyncData->WaitAndResetEvent(0);
@ -3278,9 +3412,6 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
return 0; return 0;
} }
} }
OUTRESULT(BfpFileResult_Ok);
return overlapped.mBytesRead;
} }
} }