1
0
Fork 0
mirror of https://github.com/beefytech/Beef.git synced 2025-06-09 03:52:19 +02:00

Overlapped io tweaks

This commit is contained in:
Brian Fiete 2025-04-05 15:32:57 -04:00
parent d542f9a34b
commit 0286570c1e

View file

@ -559,7 +559,17 @@ struct OverlappedReadResult : OVERLAPPED
intptr mBytesRead; intptr mBytesRead;
DWORD mErrorCode; DWORD mErrorCode;
Array<uint8> mData; Array<uint8> mData;
int mRefCount; bool mPending;
bool mAttemptedRead;
OverlappedReadResult()
{
mFile = NULL;
mBytesRead = 0;
mErrorCode = 0;
mPending = false;
mAttemptedRead = false;
}
void* GetPtr() void* GetPtr()
{ {
@ -569,20 +579,20 @@ struct OverlappedReadResult : OVERLAPPED
struct BfpAsyncData struct BfpAsyncData
{ {
BfpFile* mFile;
Array<uint8> mQueuedData; Array<uint8> mQueuedData;
HANDLE mEvent; HANDLE mEvent;
int mOverlappedReadCount; OverlappedReadResult mOverlappedResult;
BfpAsyncData() BfpAsyncData(BfpFile* file)
{ {
mOverlappedReadCount = 0; mFile = file;
mEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL); mEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
} }
~BfpAsyncData() ~BfpAsyncData()
{ {
::CloseHandle(mEvent); ::CloseHandle(mEvent);
BF_ASSERT(mOverlappedReadCount == 0);
} }
void SetEvent() void SetEvent()
@ -606,120 +616,94 @@ struct BfpAsyncData
} }
} }
void Release(OverlappedReadResult* readResult) int ReadQueued(void* buffer, int size, int timeoutMS, bool& didWait)
{ {
AutoCrit autoCrit(gBfpCritSect); gBfpCritSect.Lock();
BF_ASSERT((readResult->mRefCount == 1) || (readResult->mRefCount == 2));
readResult->mRefCount--;
if (readResult->mRefCount == 0)
{
BF_ASSERT(readResult->mData.IsEmpty());
delete readResult;
mOverlappedReadCount--;
}
}
int ReadQueued(void* buffer, int size)
{
AutoCrit autoCrit(gBfpCritSect);
if (mQueuedData.mSize == 0) if (mQueuedData.mSize == 0)
{
if (mOverlappedResult.mPending)
{
gBfpCritSect.Unlock();
WaitAndResetEvent(timeoutMS);
didWait = true;
gBfpCritSect.Lock();
}
if (mQueuedData.mSize == 0)
{
gBfpCritSect.Unlock();
return 0; return 0;
}
}
int readSize = BF_MIN(size, mQueuedData.mSize); int readSize = BF_MIN(size, mQueuedData.mSize);
memcpy(buffer, mQueuedData.mVals, readSize); memcpy(buffer, mQueuedData.mVals, readSize);
mQueuedData.RemoveRange(0, readSize); mQueuedData.RemoveRange(0, readSize);
gBfpCritSect.Unlock();
return readSize; return readSize;
} }
void HandleResult(OverlappedReadResult* readResult, uint32 errorCode, uint32 bytesRead) void HandleResult(uint32 errorCode, uint32 bytesRead)
{ {
AutoCrit autoCrit(gBfpCritSect); AutoCrit autoCrit(gBfpCritSect);
if (readResult->mRefCount == 2) // Only if we are still waiting BF_ASSERT(mOverlappedResult.mPending);
mOverlappedResult.mPending = false;
mOverlappedResult.mErrorCode = errorCode;
mOverlappedResult.mBytesRead = bytesRead;
if (mOverlappedResult.mAttemptedRead) // Already tried to read and failed
{ {
readResult->mErrorCode = errorCode; mQueuedData.Insert(mQueuedData.mSize, (uint8*)mOverlappedResult.GetPtr(), bytesRead);
readResult->mBytesRead = bytesRead; mOverlappedResult.mData.Clear();
}
SetEvent(); SetEvent();
} }
else
{
mQueuedData.Insert(mQueuedData.mSize, (uint8*)readResult->GetPtr(), bytesRead);
readResult->mData.Clear();
}
Release(readResult);
}
int FinishRead(OverlappedReadResult* readResult, void* buffer, int size, DWORD& errorCode) void AbortOverlapped()
{ {
AutoCrit autoCrit(gBfpCritSect); AutoCrit autoCrit(gBfpCritSect);
if (readResult->mRefCount == 2) BF_ASSERT(mOverlappedResult.mPending);
mOverlappedResult.mPending = false;
mOverlappedResult.mData.Clear();
}
int FinishRead(void* buffer, int size, DWORD& errorCode)
{
AutoCrit autoCrit(gBfpCritSect);
BF_ASSERT(!mOverlappedResult.mAttemptedRead);
mOverlappedResult.mAttemptedRead = true;
if (mOverlappedResult.mPending)
{ {
Release(readResult);
return -2; // Still executing return -2; // Still executing
} }
errorCode = readResult->mErrorCode; if (mOverlappedResult.mErrorCode != 0)
if (errorCode != 0)
{ {
readResult->mData.Clear();
Release(readResult);
return -1; return -1;
} }
BF_ASSERT(size >= readResult->mBytesRead); BF_ASSERT(size >= mOverlappedResult.mBytesRead);
memcpy(buffer, readResult->GetPtr(), readResult->mBytesRead); memcpy(buffer, mOverlappedResult.GetPtr(), mOverlappedResult.mBytesRead);
int bytesRead = (int)readResult->mBytesRead; int bytesRead = (int)mOverlappedResult.mBytesRead;
readResult->mData.Clear(); mOverlappedResult.mData.Clear();
Release(readResult);
return bytesRead; return bytesRead;
} }
OverlappedReadResult* AllocBuffer(int size) OverlappedReadResult* StartOverlapped(int size)
{ {
AutoCrit autoCrit(gBfpCritSect); AutoCrit autoCrit(gBfpCritSect);
OverlappedReadResult* readResult = new OverlappedReadResult(); BF_ASSERT(!mOverlappedResult.mPending);
memset(readResult, 0, sizeof(OverlappedReadResult)); BF_ASSERT(mOverlappedResult.mData.IsEmpty());
readResult->mErrorCode = -1;
readResult->mData.Resize(size);
readResult->mRefCount = 2;
mOverlappedReadCount++;
return readResult;
}
};
struct BfOverlappedReleaser memset(&mOverlappedResult, 0, sizeof(OVERLAPPED));
{ mOverlappedResult.mFile = mFile;
BfpAsyncData* mAsyncData; mOverlappedResult.mBytesRead = 0;
OverlappedReadResult* mOverlapped; mOverlappedResult.mPending = true;
bool mProducerFree; mOverlappedResult.mAttemptedRead = false;
bool mConsumerFree; mOverlappedResult.mErrorCode = -1;
mOverlappedResult.mData.Resize(size);
BfOverlappedReleaser() return &mOverlappedResult;
{
mAsyncData = NULL;
mOverlapped = NULL;
mProducerFree = true;
mConsumerFree = true;
}
BfOverlappedReleaser(BfpAsyncData* asyncData, OverlappedReadResult* overlapped)
{
mAsyncData = asyncData;
mOverlapped = overlapped;
mProducerFree = true;
mConsumerFree = true;
}
~BfOverlappedReleaser()
{
if (mOverlapped != NULL)
{
if (mProducerFree)
mAsyncData->Release(mOverlapped);
if (mConsumerFree)
mAsyncData->Release(mOverlapped);
}
} }
}; };
@ -755,7 +739,7 @@ struct BfpFile
static void WINAPI OverlappedReadComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) static void WINAPI OverlappedReadComplete(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{ {
OverlappedReadResult* readResult = (OverlappedReadResult*)lpOverlapped; OverlappedReadResult* readResult = (OverlappedReadResult*)lpOverlapped;
readResult->mFile->mAsyncData->HandleResult(readResult, dwErrorCode, dwNumberOfBytesTransfered); readResult->mFile->mAsyncData->HandleResult(dwErrorCode, dwNumberOfBytesTransfered);
} }
struct BfpFileWatcher : public BfpOverlapped struct BfpFileWatcher : public BfpOverlapped
@ -3092,7 +3076,7 @@ BFP_EXPORT BfpFile* BFP_CALLTYPE BfpFile_Create(const char* path, BfpFileCreateK
if (isOverlapped) if (isOverlapped)
{ {
bfpFile->mAsyncData = new BfpAsyncData(); bfpFile->mAsyncData = new BfpAsyncData(bfpFile);
} }
return bfpFile; return bfpFile;
@ -3188,7 +3172,7 @@ BFP_EXPORT BfpFile* BFP_CALLTYPE BfpFile_Create(const char* path, BfpFileCreateK
bfpFile->mHandle = handle; bfpFile->mHandle = handle;
if ((createFlags & BfpFileCreateFlag_AllowTimeouts) != 0) if ((createFlags & BfpFileCreateFlag_AllowTimeouts) != 0)
bfpFile->mAsyncData = new BfpAsyncData(); bfpFile->mAsyncData = new BfpAsyncData(bfpFile);
if ((createFlags & BfpFileCreateFlag_Pipe) != 0) if ((createFlags & BfpFileCreateFlag_Pipe) != 0)
bfpFile->mIsPipe = true; bfpFile->mIsPipe = true;
@ -3335,12 +3319,19 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
if (file->mAsyncData != NULL) if (file->mAsyncData != NULL)
{ {
int readSize = file->mAsyncData->ReadQueued(buffer, (int)size); bool didWait = false;
int readSize = file->mAsyncData->ReadQueued(buffer, (int)size, timeoutMS, didWait);
if (readSize > 0) if (readSize > 0)
{ {
OUTRESULT(BfpFileResult_Ok); OUTRESULT(BfpFileResult_Ok);
return readSize; return readSize;
} }
if (didWait)
{
OUTRESULT(BfpFileResult_Timeout);
return 0;
}
} }
if ((timeoutMS != -1) && (!forceNormalRead)) if ((timeoutMS != -1) && (!forceNormalRead))
@ -3353,21 +3344,16 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
while (true) while (true)
{ {
OverlappedReadResult* overlapped = file->mAsyncData->AllocBuffer((int)size); OverlappedReadResult* overlapped = file->mAsyncData->StartOverlapped((int)size);
overlapped->mFile = file;
BfOverlappedReleaser overlappedReleaser(file->mAsyncData, overlapped);
//TODO: this doesn't set file stream location. It only works for streams like pipes, sockets, etc //TODO: this doesn't set file stream location. It only works for streams like pipes, sockets, etc
if (::ReadFileEx(file->mHandle, overlapped->GetPtr(), (uint32)size, overlapped, OverlappedReadComplete)) if (::ReadFileEx(file->mHandle, overlapped->GetPtr(), (uint32)size, overlapped, OverlappedReadComplete))
{ {
overlappedReleaser.mProducerFree = false;
file->mAsyncData->WaitAndResetEvent(timeoutMS); file->mAsyncData->WaitAndResetEvent(timeoutMS);
DWORD errorCode = 0; DWORD errorCode = 0;
int readResult = file->mAsyncData->FinishRead(overlapped, buffer, (int)size, errorCode); int readResult = file->mAsyncData->FinishRead(buffer, (int)size, errorCode);
overlappedReleaser.mConsumerFree = false; if (readResult != -2) // Still executing
if (readResult != -2)
{ {
if (errorCode == 0) if (errorCode == 0)
{ {
@ -3387,17 +3373,13 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
} }
else else
{ {
::CancelIoEx(file->mHandle, overlapped);
// Clear event set by CancelIoEx
file->mAsyncData->WaitAndResetEvent(0);
OUTRESULT(BfpFileResult_Timeout); OUTRESULT(BfpFileResult_Timeout);
return 0; return 0;
} }
} }
else else
{ {
overlapped->mData.Clear(); file->mAsyncData->AbortOverlapped();
int lastError = ::GetLastError(); int lastError = ::GetLastError();
if (lastError == ERROR_PIPE_LISTENING) if (lastError == ERROR_PIPE_LISTENING)
@ -3410,10 +3392,6 @@ BFP_EXPORT intptr BFP_CALLTYPE BfpFile_Read(BfpFile* file, void* buffer, intptr
{ {
if (!file->mAsyncData->WaitAndResetEvent(timeoutMS)) if (!file->mAsyncData->WaitAndResetEvent(timeoutMS))
{ {
::CancelIoEx(file->mHandle, overlapped);
// Clear event set by CancelIoEx
file->mAsyncData->WaitAndResetEvent(0);
OUTRESULT(BfpFileResult_Timeout); OUTRESULT(BfpFileResult_Timeout);
return 0; return 0;
} }