mirror of
https://github.com/beefytech/Beef.git
synced 2025-06-08 03:28:20 +02:00
192 lines
No EOL
3.4 KiB
C++
192 lines
No EOL
3.4 KiB
C++
#include "ThreadPool.h"
|
|
#include "BeefPerf.h"
|
|
|
|
USING_NS_BF;
|
|
|
|
static BF_TLS_DECLSPEC ThreadPool* gPoolParent;
|
|
|
|
ThreadPool::Thread::Thread()
|
|
{
|
|
mCurJobThreadId = -1;
|
|
mActiveJob = NULL;
|
|
}
|
|
|
|
ThreadPool::Thread::~Thread()
|
|
{
|
|
BfpThread_Release(mBfpThread);
|
|
}
|
|
|
|
void ThreadPool::Thread::Proc()
|
|
{
|
|
bool isWorking = true;
|
|
|
|
BpSetThreadName("ThreadPoolProc");
|
|
BfpThread_SetName(NULL, "ThreadPoolProc", NULL);
|
|
|
|
gPoolParent = mThreadPool;
|
|
|
|
while (true)
|
|
{
|
|
Job* job = NULL;
|
|
//
|
|
{
|
|
AutoCrit autoCrit(mThreadPool->mCritSect);
|
|
if (!mThreadPool->mJobs.IsEmpty())
|
|
{
|
|
job = mThreadPool->mJobs[0];
|
|
job->mProcessing = true;
|
|
mThreadPool->mJobs.RemoveAt(0);
|
|
}
|
|
|
|
mActiveJob = job;
|
|
if (job == NULL)
|
|
mCurJobThreadId = -1;
|
|
else
|
|
mCurJobThreadId = job->mFromThreadId;
|
|
|
|
bool hasWork = job != NULL;
|
|
if (hasWork != isWorking)
|
|
{
|
|
isWorking = hasWork;
|
|
if (isWorking)
|
|
mThreadPool->mFreeThreads--;
|
|
else
|
|
mThreadPool->mFreeThreads++;
|
|
}
|
|
}
|
|
|
|
if ((job == NULL) && (mThreadPool->mShuttingDown))
|
|
{
|
|
break;
|
|
}
|
|
|
|
bool didCancel = false;
|
|
if ((mThreadPool->mShuttingDown) && (job->Cancel()))
|
|
didCancel = true;
|
|
|
|
if (job == NULL)
|
|
{
|
|
mThreadPool->mEvent.WaitFor();
|
|
continue;
|
|
}
|
|
|
|
if (!didCancel)
|
|
{
|
|
BP_ZONE("ThreadProc:Job");
|
|
job->Perform();
|
|
}
|
|
|
|
// Run dtor synchronized
|
|
AutoCrit autoCrit(mThreadPool->mCritSect);
|
|
mActiveJob = NULL;
|
|
delete job;
|
|
}
|
|
}
|
|
|
|
ThreadPool::ThreadPool(int maxThreads, int stackSize)
|
|
{
|
|
mMaxThreads = maxThreads;
|
|
mShuttingDown = false;
|
|
mStackSize = stackSize;
|
|
mFreeThreads = 0;
|
|
mRunningThreads = 0;
|
|
}
|
|
|
|
ThreadPool::~ThreadPool()
|
|
{
|
|
Shutdown();
|
|
}
|
|
|
|
void ThreadPool::Shutdown()
|
|
{
|
|
mShuttingDown = true;
|
|
mEvent.Set(true);
|
|
|
|
while (true)
|
|
{
|
|
Thread* thread = NULL;
|
|
//
|
|
{
|
|
AutoCrit autoCrit(mCritSect);
|
|
if (mThreads.IsEmpty())
|
|
break;
|
|
thread = mThreads.back();
|
|
mThreads.pop_back();
|
|
}
|
|
|
|
if (!BfpThread_WaitFor(thread->mBfpThread, -1))
|
|
break;
|
|
|
|
AutoCrit autoCrit(mCritSect);
|
|
delete thread;
|
|
mRunningThreads--;
|
|
}
|
|
|
|
BF_ASSERT(mRunningThreads == 0);
|
|
|
|
for (auto job : mJobs)
|
|
delete job;
|
|
mJobs.Clear();
|
|
}
|
|
|
|
void ThreadPool::SetStackSize(int stackSize)
|
|
{
|
|
mStackSize = stackSize;
|
|
}
|
|
|
|
static void WorkerProc(void* param)
|
|
{
|
|
((ThreadPool::Thread*)param)->Proc();
|
|
}
|
|
|
|
void ThreadPool::AddJob(Job* job, int maxWorkersPerProviderThread)
|
|
{
|
|
AutoCrit autoCrit(mCritSect);
|
|
|
|
BfpThreadId curThreadId = BfpThread_GetCurrentId();
|
|
|
|
job->mFromThreadId = curThreadId;
|
|
mJobs.Add(job);
|
|
mEvent.Set();
|
|
|
|
if (((int)mThreads.size() < mMaxThreads) && (mFreeThreads == 0))
|
|
{
|
|
int workersForUs = 0;
|
|
for (auto thread : mThreads)
|
|
{
|
|
if (thread->mCurJobThreadId == curThreadId)
|
|
workersForUs++;
|
|
}
|
|
if (workersForUs >= maxWorkersPerProviderThread)
|
|
return;
|
|
|
|
mRunningThreads++;
|
|
Thread* thread = new Thread();
|
|
thread->mThreadPool = this;
|
|
thread->mBfpThread = BfpThread_Create(WorkerProc, (void*)thread, mStackSize, BfpThreadCreateFlag_StackSizeReserve);
|
|
mThreads.Add(thread);
|
|
}
|
|
}
|
|
|
|
void ThreadPool::AddJob(BfpThreadStartProc proc, void* param, int maxWorkersPerProviderThread)
|
|
{
|
|
ProcJob* job = new ProcJob();
|
|
job->mProc = proc;
|
|
job->mParam = param;
|
|
AddJob(job, maxWorkersPerProviderThread);
|
|
}
|
|
|
|
bool ThreadPool::IsInJob()
|
|
{
|
|
return gPoolParent == this;
|
|
}
|
|
|
|
void ThreadPool::CancelAll()
|
|
{
|
|
AutoCrit autoCrit(mCritSect);
|
|
for (auto job : mJobs)
|
|
job->Cancel();
|
|
for (auto thread : mThreads)
|
|
if (thread->mActiveJob != NULL)
|
|
thread->mActiveJob->Cancel();
|
|
} |