#include #include "threadtools.h" #include "refcount.h" #include "utllinkedlist.h" #include "utlvector.h" #include "functors.h" #include "vstdlib.h" #ifndef JOBTHREAD_H #define JOBTHREAD_H #ifdef AddJob #undef AddJob #undef GetJob #endif #ifdef VSTDLIB_DLL_EXPORT #define JOB_INTERFACE DLL_EXPORT #define JOB_OVERLOAD DLL_GLOBAL_EXPORT #define JOB_CLASS DLL_CLASS_EXPORT #else #define JOB_INTERFACE DLL_IMPORT #define JOB_OVERLOAD DLL_GLOBAL_IMPORT #define JOB_CLASS DLL_CLASS_IMPORT #endif #if defined( _WIN32 ) #pragma once #endif class CJob; enum JobStatusEnum_t { JOB_OK, JOB_STATUS_PENDING, JOB_STATUS_INPROGRESS, JOB_STATUS_ABORTED, JOB_STATUS_UNSERVICED, }; typedef int JobStatus_t; enum JobFlags_t { JF_IO = (1 << 0), JF_BOOST_THREAD = (1 << 1), JF_SERIAL = (1 << 2), JF_QUEUE = (1 << 3), }; enum JobPriority_t { JP_LOW, JP_NORMAL, JP_HIGH, JP_IMMEDIATE, JP_NUM_PRIORITIES, JP_FRAME = JP_NORMAL, JP_FRAME_SEGMENT = JP_HIGH, }; #define TP_MAX_POOL_THREADS 64 struct ThreadPoolStartParams_t { ThreadPoolStartParams_t(bool bIOThreads = false, unsigned nThreads = (unsigned)-1, int* pAffinities = NULL, ThreeState_t fDistribute = TRS_NONE, unsigned nStackSize = (unsigned)-1, int iThreadPriority = SHRT_MIN) : bIOThreads(bIOThreads), nThreads(nThreads), nThreadsMax(-1), fDistribute(fDistribute), nStackSize(nStackSize), iThreadPriority(iThreadPriority) { bExecOnThreadPoolThreadsOnly = false; #if defined( DEDICATED ) && IsPlatformLinux() bEnableOnLinuxDedicatedServer = false; #endif bUseAffinityTable = (pAffinities != NULL) && (fDistribute == TRS_TRUE) && (nThreads != (unsigned)-1); if (bUseAffinityTable) { nThreads = MIN(TP_MAX_POOL_THREADS, nThreads); for (unsigned int i = 0; i < nThreads; i++) { iAffinityTable[i] = pAffinities[i]; } } } int nThreads; int nThreadsMax; ThreeState_t fDistribute; int nStackSize; int iThreadPriority; int iAffinityTable[TP_MAX_POOL_THREADS]; bool bIOThreads : 1; bool bUseAffinityTable : 1; bool bExecOnThreadPoolThreadsOnly : 1; #if defined( DEDICATED ) && IsPlatformLinux() bool bEnableOnLinuxDedicatedServer : 1; #endif }; typedef bool (*JobFilter_t)(CJob*); enum ThreadPoolMessages_t { TPM_EXIT, TPM_SUSPEND, }; #ifdef Yield #undef Yield #endif abstract_class IThreadPool : public IRefCounted { public: virtual ~IThreadPool() {}; virtual bool Start(const ThreadPoolStartParams_t& startParams = ThreadPoolStartParams_t()) = 0; virtual bool Stop(int timeout = TT_INFINITE) = 0; virtual unsigned GetJobCount() = 0; virtual int NumThreads() = 0; virtual int NumIdleThreads() = 0; virtual int SuspendExecution() = 0; virtual int ResumeExecution() = 0; virtual int YieldWait(CThreadEvent** pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE) = 0; virtual int YieldWait(CJob**, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE) = 0; virtual void Yield(unsigned timeout) = 0; bool YieldWait(CThreadEvent& event, unsigned timeout = TT_INFINITE); bool YieldWait(CJob*, unsigned timeout = TT_INFINITE); virtual void AddJob(CJob*) = 0; virtual void AddFunctor(CFunctor* pFunctor, CJob** ppJob = NULL, const char* pszDescription = NULL, unsigned flags = 0) { AddFunctorInternal(RetAddRef(pFunctor), ppJob, pszDescription, flags); } virtual void ChangePriority(CJob* p, JobPriority_t priority) = 0; int ExecuteAll(JobFilter_t pfnFilter = NULL) { return ExecuteToPriority(JP_LOW, pfnFilter); } virtual int ExecuteToPriority(JobPriority_t toPriority, JobFilter_t pfnFilter = NULL) = 0; virtual int AbortAll() = 0; virtual void AddPerFrameJob(CJob*) = 0; #define DEFINE_NONMEMBER_ADD_CALL(N) \ template \ CJob *AddCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ if ( !NumIdleThreads() ) \ { \ pJob = GetDummyJob(); \ FunctorDirectCall( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ } \ \ return pJob; \ } #define DEFINE_MEMBER_ADD_CALL(N) \ template \ CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ if ( !NumIdleThreads() ) \ { \ pJob = GetDummyJob(); \ FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ } \ \ return pJob; \ } #define DEFINE_CONST_MEMBER_ADD_CALL(N) \ template \ CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ if ( !NumIdleThreads() ) \ { \ pJob = GetDummyJob(); \ FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ } \ \ return pJob; \ } #define DEFINE_REF_COUNTING_MEMBER_ADD_CALL(N) \ template \ CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ if ( !NumIdleThreads() ) \ { \ pJob = GetDummyJob(); \ FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ } \ \ return pJob; \ } #define DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL(N) \ template \ CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ if ( !NumIdleThreads() ) \ { \ pJob = GetDummyJob(); \ FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ } \ \ return pJob; \ } #define DEFINE_NONMEMBER_QUEUE_CALL(N) \ template \ CJob *QueueCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ return pJob; \ } #define DEFINE_MEMBER_QUEUE_CALL(N) \ template \ CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ return pJob; \ } #define DEFINE_CONST_MEMBER_QUEUE_CALL(N) \ template \ CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ return pJob; \ } #define DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL(N) \ template \ CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ return pJob; \ } #define DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL(N) \ template \ CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ CJob *pJob; \ AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ \ return pJob; \ } FUNC_GENERATE_ALL(DEFINE_NONMEMBER_ADD_CALL); FUNC_GENERATE_ALL(DEFINE_MEMBER_ADD_CALL); FUNC_GENERATE_ALL(DEFINE_CONST_MEMBER_ADD_CALL); FUNC_GENERATE_ALL(DEFINE_REF_COUNTING_MEMBER_ADD_CALL); FUNC_GENERATE_ALL(DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL); FUNC_GENERATE_ALL(DEFINE_NONMEMBER_QUEUE_CALL); FUNC_GENERATE_ALL(DEFINE_MEMBER_QUEUE_CALL); FUNC_GENERATE_ALL(DEFINE_CONST_MEMBER_QUEUE_CALL); FUNC_GENERATE_ALL(DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL); FUNC_GENERATE_ALL(DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL); #undef DEFINE_NONMEMBER_ADD_CALL #undef DEFINE_MEMBER_ADD_CALL #undef DEFINE_CONST_MEMBER_ADD_CALL #undef DEFINE_REF_COUNTING_MEMBER_ADD_CALL #undef DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL #undef DEFINE_NONMEMBER_QUEUE_CALL #undef DEFINE_MEMBER_QUEUE_CALL #undef DEFINE_CONST_MEMBER_QUEUE_CALL #undef DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL #undef DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL private: virtual void AddFunctorInternal(CFunctor*, CJob** = NULL, const char* pszDescription = NULL, unsigned flags = 0) = 0; friend class CJob; virtual CJob* GetDummyJob() = 0; public: virtual void Distribute(bool bDistribute = true, int* pAffinityTable = NULL) = 0; virtual bool Start(const ThreadPoolStartParams_t& startParams, const char* pszNameOverride) = 0; virtual int YieldWaitPerFrameJobs() = 0; }; JOB_INTERFACE IThreadPool* CreateNewThreadPool(); JOB_INTERFACE void DestroyThreadPool(IThreadPool* pPool); JOB_INTERFACE void RunThreadPoolTests(); JOB_INTERFACE IThreadPool* g_pThreadPool; #ifdef _X360 JOB_INTERFACE IThreadPool* g_pAlternateThreadPool; #endif DECLARE_POINTER_HANDLE(ThreadPoolData_t); #define JOB_NO_DATA ((ThreadPoolData_t)-1) class CJob : public CRefCounted1 { public: CJob(JobPriority_t priority = JP_NORMAL) : m_status(JOB_STATUS_UNSERVICED), m_ThreadPoolData(JOB_NO_DATA), m_priority(priority), m_flags(0), m_pThreadPool(NULL), m_CompleteEvent(true), m_iServicingThread(-1) { } void SetPriority(JobPriority_t priority) { m_priority = priority; } JobPriority_t GetPriority() const { return m_priority; } void SetFlags(unsigned flags) { m_flags = flags; } unsigned GetFlags() const { return m_flags; } void SetServiceThread(int iServicingThread) { m_iServicingThread = (char)iServicingThread; } int GetServiceThread() const { return m_iServicingThread; } void ClearServiceThread() { m_iServicingThread = -1; } bool Executed() const { return (m_status == JOB_OK); } bool CanExecute() const { return (m_status == JOB_STATUS_PENDING || m_status == JOB_STATUS_UNSERVICED); } bool IsFinished() const { return (m_status != JOB_STATUS_PENDING && m_status != JOB_STATUS_INPROGRESS && m_status != JOB_STATUS_UNSERVICED); } JobStatus_t GetStatus() const { return m_status; } bool TryLock() { return m_mutex.TryLock(); } void Lock() { m_mutex.Lock(); } void Unlock() { m_mutex.Unlock(); } bool WaitForFinish(uint32 dwTimeout = TT_INFINITE) { if (!this) return true; return (!IsFinished()) ? g_pThreadPool->YieldWait(this, dwTimeout) : true; } bool WaitForFinishAndRelease(uint32 dwTimeout = TT_INFINITE) { if (!this) return true; bool bResult = WaitForFinish(dwTimeout); Release(); return bResult; } CThreadEvent* AccessEvent() { return &m_CompleteEvent; } JobStatus_t Execute(); JobStatus_t TryExecute(); JobStatus_t ExecuteAndRelease() { JobStatus_t status = Execute(); Release(); return status; } JobStatus_t TryExecuteAndRelease() { JobStatus_t status = TryExecute(); Release(); return status; } JobStatus_t Abort(bool bDiscard = true); virtual char const* Describe() { return "Job"; } private: friend class CThreadPool; JobStatus_t m_status; JobPriority_t m_priority; CThreadMutex m_mutex; unsigned char m_flags; char m_iServicingThread; short m_reserved; ThreadPoolData_t m_ThreadPoolData; IThreadPool* m_pThreadPool; CThreadEvent m_CompleteEvent; #if defined( THREAD_PARENT_STACK_TRACE_ENABLED ) void* m_ParentStackTrace[THREAD_PARENT_STACK_TRACE_LENGTH]; #endif private: CJob(const CJob& fromRequest); void operator=(const CJob& fromRequest); virtual JobStatus_t DoExecute() = 0; virtual JobStatus_t DoAbort(bool bDiscard) { return JOB_STATUS_ABORTED; } virtual void DoCleanup() {} }; class CFunctorJob : public CJob { public: CFunctorJob(CFunctor* pFunctor, const char* pszDescription = NULL) : m_pFunctor(pFunctor) { if (pszDescription) { Q_strncpy(m_szDescription, pszDescription, sizeof(m_szDescription)); } else { m_szDescription[0] = 0; } } virtual JobStatus_t DoExecute() { (*m_pFunctor)(); return JOB_OK; } const char* Describe() { return m_szDescription; } private: CRefPtr m_pFunctor; char m_szDescription[16]; }; class CJobSet { public: CJobSet(CJob* pJob = NULL) { if (pJob) { m_jobs.AddToTail(pJob); } } CJobSet(CJob** ppJobs, int nJobs) { if (ppJobs) { m_jobs.AddMultipleToTail(nJobs, ppJobs); } } ~CJobSet() { for (int i = 0; i < m_jobs.Count(); i++) { m_jobs[i]->Release(); } } void operator+=(CJob* pJob) { m_jobs.AddToTail(pJob); } void operator-=(CJob* pJob) { m_jobs.FindAndRemove(pJob); } void Execute(bool bRelease = true) { for (int i = 0; i < m_jobs.Count(); i++) { m_jobs[i]->Execute(); if (bRelease) { m_jobs[i]->Release(); } } if (bRelease) { m_jobs.RemoveAll(); } } void Abort(bool bRelease = true) { for (int i = 0; i < m_jobs.Count(); i++) { m_jobs[i]->Abort(); if (bRelease) { m_jobs[i]->Release(); } } if (bRelease) { m_jobs.RemoveAll(); } } void WaitForFinish(bool bRelease = true) { for (int i = 0; i < m_jobs.Count(); i++) { m_jobs[i]->WaitForFinish(); if (bRelease) { m_jobs[i]->Release(); } } if (bRelease) { m_jobs.RemoveAll(); } } void WaitForFinish(IThreadPool* pPool, bool bRelease = true) { pPool->YieldWait(m_jobs.Base(), m_jobs.Count()); if (bRelease) { for (int i = 0; i < m_jobs.Count(); i++) { m_jobs[i]->Release(); } m_jobs.RemoveAll(); } } private: CUtlVectorFixed m_jobs; }; #define ThreadExecute g_pThreadPool->QueueCall #define ThreadExecuteRef g_pThreadPool->QueueRefCall #define BeginExecuteParallel() do { CJobSet jobSet #define EndExecuteParallel() jobSet.WaitForFinish( g_pThreadPool ); } while (0) #define ExecuteParallel jobSet += g_pThreadPool->QueueCall #define ExecuteRefParallel jobSet += g_pThreadPool->QueueCallRef #pragma warning(push) #pragma warning(disable:4389) #pragma warning(disable:4018) #pragma warning(disable:4701) #define DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL(N) \ template \ void IterRangeParallel(FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_SEPARATOR_##N FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ const int MAX_THREADS = 16; \ int nIdle = g_pThreadPool->NumIdleThreads(); \ ITERTYPE1 range = to - from; \ int nThreads = min( nIdle + 1, range ); \ if ( nThreads > MAX_THREADS ) \ { \ nThreads = MAX_THREADS; \ } \ if ( nThreads < 2 ) \ { \ FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ ITERTYPE1 nIncrement = range / nThreads; \ \ CJobSet jobSet; \ while ( --nThreads ) \ { \ ITERTYPE2 thisTo = from + nIncrement; \ jobSet += g_pThreadPool->AddCall( pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \ from = thisTo; \ } \ FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ jobSet.WaitForFinish( g_pThreadPool ); \ } \ \ } FUNC_GENERATE_ALL(DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL); #define DEFINE_MEMBER_ITER_RANGE_PARALLEL(N) \ template \ void IterRangeParallel(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_SEPARATOR_##N FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \ { \ const int MAX_THREADS = 16; \ int nIdle = g_pThreadPool->NumIdleThreads(); \ ITERTYPE1 range = to - from; \ int nThreads = min( nIdle + 1, range ); \ if ( nThreads > MAX_THREADS ) \ { \ nThreads = MAX_THREADS; \ } \ if ( nThreads < 2 ) \ { \ FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ } \ else \ { \ ITERTYPE1 nIncrement = range / nThreads; \ \ CJobSet jobSet; \ while ( --nThreads ) \ { \ ITERTYPE2 thisTo = from + nIncrement; \ jobSet += g_pThreadPool->AddCall( pObject, pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \ from = thisTo; \ } \ FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ jobSet.WaitForFinish( g_pThreadPool ); \ } \ \ } FUNC_GENERATE_ALL(DEFINE_MEMBER_ITER_RANGE_PARALLEL); template class CJobItemProcessor { public: typedef T ItemType_t; void Begin() {} void End() {} }; template class CFuncJobItemProcessor : public CJobItemProcessor { public: void Init(void (*pfnProcess)(T&), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL) { m_pfnProcess = pfnProcess; m_pfnBegin = pfnBegin; m_pfnEnd = pfnEnd; } void Begin() { if (m_pfnBegin) (*m_pfnBegin)(); } void Process(T& item) { (*m_pfnProcess)(item); } void End() { if (m_pfnEnd) (*m_pfnEnd)(); } protected: void (*m_pfnProcess)(T&); void (*m_pfnBegin)(); void (*m_pfnEnd)(); }; template class CMemberFuncJobItemProcessor : public CJobItemProcessor { public: void Init(OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(T&), void (FUNCTION_CLASS::* pfnBegin)() = NULL, void (FUNCTION_CLASS::* pfnEnd)() = NULL) { m_pObject = pObject; m_pfnProcess = pfnProcess; m_pfnBegin = pfnBegin; m_pfnEnd = pfnEnd; } void Begin() { if (m_pfnBegin) ((*m_pObject).*m_pfnBegin)(); } void Process(T& item) { ((*m_pObject).*m_pfnProcess)(item); } void End() { if (m_pfnEnd) ((*m_pObject).*m_pfnEnd)(); } protected: OBJECT_TYPE* m_pObject; void (FUNCTION_CLASS::* m_pfnProcess)(T&); void (FUNCTION_CLASS::* m_pfnBegin)(); void (FUNCTION_CLASS::* m_pfnEnd)(); }; template class CLoopFuncJobItemProcessor : public CJobItemProcessor { public: void Init(void (*pfnProcess)(T*, int, int), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL) { m_pfnProcess = pfnProcess; m_pfnBegin = pfnBegin; m_pfnEnd = pfnEnd; } void Begin() { if (m_pfnBegin) (*m_pfnBegin)(); } void Process(T* pContext, int nFirst, int nCount) { (*m_pfnProcess)(pContext, nFirst, nCount); } void End() { if (m_pfnEnd) (*m_pfnEnd)(); } protected: void (*m_pfnProcess)(T*, int, int); void (*m_pfnBegin)(); void (*m_pfnEnd)(); }; template class CLoopMemberFuncJobItemProcessor : public CJobItemProcessor { public: void Init(OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(T*, int, int), void (FUNCTION_CLASS::* pfnBegin)() = NULL, void (FUNCTION_CLASS::* pfnEnd)() = NULL) { m_pObject = pObject; m_pfnProcess = pfnProcess; m_pfnBegin = pfnBegin; m_pfnEnd = pfnEnd; } void Begin() { if (m_pfnBegin) ((*m_pObject).*m_pfnBegin)(); } void Process(T* item, int nFirst, int nCount) { ((*m_pObject).*m_pfnProcess)(item, nFirst, nCount); } void End() { if (m_pfnEnd) ((*m_pObject).*m_pfnEnd)(); } protected: OBJECT_TYPE* m_pObject; void (FUNCTION_CLASS::* m_pfnProcess)(T*, int, int); void (FUNCTION_CLASS::* m_pfnBegin)(); void (FUNCTION_CLASS::* m_pfnEnd)(); }; #pragma warning(push) #pragma warning(disable:4189) template class CParallelProcessor { public: CParallelProcessor() { m_pItems = m_pLimit = 0; } void Run(ITEM_TYPE* pItems, unsigned nItems, int nChunkSize = 1, int nMaxParallel = INT_MAX, IThreadPool* pThreadPool = NULL) { if (nItems == 0) return; #if defined(_X360) volatile int ignored = ID_TO_PREVENT_COMDATS_IN_PROFILES; #endif m_nChunkSize = nChunkSize; if (!pThreadPool) { pThreadPool = g_pThreadPool; } m_pItems = pItems; m_pLimit = pItems + nItems; int nJobs = nItems - 1; if (nJobs > nMaxParallel) { nJobs = nMaxParallel; } if (!pThreadPool) { DoExecute(); return; } int nThreads = pThreadPool->NumThreads(); if (nJobs > nThreads) { nJobs = nThreads; } if (nJobs > 0) { CJob** jobs = (CJob**)stackalloc(nJobs * sizeof(CJob**)); int i = nJobs; while (i--) { jobs[i] = pThreadPool->QueueCall(this, &CParallelProcessor::DoExecute); } DoExecute(); for (i = 0; i < nJobs; i++) { jobs[i]->Abort(); jobs[i]->Release(); } } else { DoExecute(); } } ITEM_PROCESSOR_TYPE m_ItemProcessor; private: void DoExecute() { if (m_pItems < m_pLimit) { #if defined(_X360) volatile int ignored = ID_TO_PREVENT_COMDATS_IN_PROFILES; #endif m_ItemProcessor.Begin(); ITEM_TYPE* pLimit = m_pLimit; int nChunkSize = m_nChunkSize; for (;;) { ITEM_TYPE* pCurrent = m_pItems.AtomicAdd(nChunkSize); ITEM_TYPE* pLast = MIN(pLimit, pCurrent + nChunkSize); while (pCurrent < pLast) { m_ItemProcessor.Process(*pCurrent); pCurrent++; } if (pCurrent >= pLimit) { break; } } m_ItemProcessor.End(); } } CInterlockedPtr m_pItems; ITEM_TYPE* m_pLimit; int m_nChunkSize; }; #pragma warning(pop) template inline void ParallelProcess(ITEM_TYPE* pItems, unsigned nItems, void (*pfnProcess)(ITEM_TYPE&), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pfnProcess, pfnBegin, pfnEnd); processor.Run(pItems, nItems, 1, nMaxParallel); } template inline void ParallelProcess(ITEM_TYPE* pItems, unsigned nItems, OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(ITEM_TYPE&), void (FUNCTION_CLASS::* pfnBegin)() = NULL, void (FUNCTION_CLASS::* pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pObject, pfnProcess, pfnBegin, pfnEnd); processor.Run(pItems, nItems, 1, nMaxParallel); } template inline void ParallelProcess(IThreadPool* pPool, ITEM_TYPE* pItems, unsigned nItems, void (*pfnProcess)(ITEM_TYPE&), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pfnProcess, pfnBegin, pfnEnd); processor.Run(pItems, nItems, 1, nMaxParallel, pPool); } template inline void ParallelProcess(IThreadPool* pPool, ITEM_TYPE* pItems, unsigned nItems, OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(ITEM_TYPE&), void (FUNCTION_CLASS::* pfnBegin)() = NULL, void (FUNCTION_CLASS::* pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pObject, pfnProcess, pfnBegin, pfnEnd); processor.Run(pItems, nItems, 1, nMaxParallel, pPool); } template inline void ParallelProcessChunks(ITEM_TYPE* pItems, unsigned nItems, void (*pfnProcess)(ITEM_TYPE&), int nChunkSize, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pfnProcess, NULL, NULL); processor.Run(pItems, nItems, nChunkSize, nMaxParallel); } template inline void ParallelProcessChunks(ITEM_TYPE* pItems, unsigned nItems, OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(ITEM_TYPE&), int nChunkSize, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pObject, pfnProcess, NULL, NULL); processor.Run(pItems, nItems, nChunkSize, nMaxParallel); } template inline void ParallelProcessChunks(IThreadPool* pPool, ITEM_TYPE* pItems, unsigned nItems, OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(ITEM_TYPE&), int nChunkSize, int nMaxParallel = INT_MAX) { CParallelProcessor > processor; processor.m_ItemProcessor.Init(pObject, pfnProcess, NULL, NULL); processor.Run(pItems, nItems, nChunkSize, nMaxParallel, pPool); } template class CParallelLoopProcessor { public: CParallelLoopProcessor() { m_nIndex = m_nLimit = 0; m_nChunkCount = 0; m_nActive = 0; } void Run(CONTEXT_TYPE* pContext, int nBegin, int nItems, int nChunkCount, int nMaxParallel = INT_MAX, IThreadPool* pThreadPool = NULL) { if (!nItems) return; if (!pThreadPool) { pThreadPool = g_pThreadPool; } m_pContext = pContext; m_nIndex = nBegin; m_nLimit = nBegin + nItems; nChunkCount = MAX(MIN(nItems, nChunkCount), 1); m_nChunkCount = (nItems + nChunkCount - 1) / nChunkCount; int nJobs = (nItems + m_nChunkCount - 1) / m_nChunkCount; if (nJobs > nMaxParallel) { nJobs = nMaxParallel; } if (!pThreadPool) { DoExecute(); return; } int nThreads = pThreadPool->NumThreads(); if (nJobs > nThreads) { nJobs = nThreads; } if (nJobs > 0) { CJob** jobs = (CJob**)stackalloc(nJobs * sizeof(CJob**)); int i = nJobs; while (i--) { jobs[i] = pThreadPool->QueueCall(this, &CParallelLoopProcessor::DoExecute); } DoExecute(); for (i = 0; i < nJobs; i++) { jobs[i]->Abort(); jobs[i]->Release(); } } else { DoExecute(); } } ITEM_PROCESSOR_TYPE m_ItemProcessor; private: void DoExecute() { m_ItemProcessor.Begin(); for (;;) { int nIndex = m_nIndex.AtomicAdd(m_nChunkCount); if (nIndex < m_nLimit) { int nCount = MIN(m_nChunkCount, m_nLimit - nIndex); m_ItemProcessor.Process(m_pContext, nIndex, nCount); } else { break; } } m_ItemProcessor.End(); --m_nActive; } CONTEXT_TYPE* m_pContext; CInterlockedInt m_nIndex; int m_nLimit; int m_nChunkCount; CInterlockedInt m_nActive; }; template < typename CONTEXT_TYPE > inline void ParallelLoopProcess(IThreadPool* pPool, CONTEXT_TYPE* pContext, int nStart, int nCount, void (*pfnProcess)(CONTEXT_TYPE*, int, int), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelLoopProcessor< CONTEXT_TYPE, CLoopFuncJobItemProcessor< CONTEXT_TYPE > > processor; processor.m_ItemProcessor.Init(pfnProcess, pfnBegin, pfnEnd); processor.Run(pContext, nStart, nCount, 1, nMaxParallel, pPool); } template < typename CONTEXT_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS > inline void ParallelLoopProcess(IThreadPool* pPool, CONTEXT_TYPE* pContext, int nStart, int nCount, OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(CONTEXT_TYPE*, int, int), void (FUNCTION_CLASS::* pfnBegin)() = NULL, void (FUNCTION_CLASS::* pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelLoopProcessor< CONTEXT_TYPE, CLoopMemberFuncJobItemProcessor > processor; processor.m_ItemProcessor.Init(pObject, pfnProcess, pfnBegin, pfnEnd); processor.Run(pContext, nStart, nCount, 1, nMaxParallel, pPool); } template < typename CONTEXT_TYPE > inline void ParallelLoopProcessChunks(IThreadPool* pPool, CONTEXT_TYPE* pContext, int nStart, int nCount, int nChunkSize, void (*pfnProcess)(CONTEXT_TYPE*, int, int), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelLoopProcessor< CONTEXT_TYPE, CLoopFuncJobItemProcessor< CONTEXT_TYPE > > processor; processor.m_ItemProcessor.Init(pfnProcess, pfnBegin, pfnEnd); processor.Run(pContext, nStart, nCount, nChunkSize, nMaxParallel, pPool); } template < typename CONTEXT_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS > inline void ParallelLoopProcessChunks(IThreadPool* pPool, CONTEXT_TYPE* pContext, int nStart, int nCount, int nChunkSize, OBJECT_TYPE* pObject, void (FUNCTION_CLASS::* pfnProcess)(CONTEXT_TYPE*, int, int), void (FUNCTION_CLASS::* pfnBegin)() = NULL, void (FUNCTION_CLASS::* pfnEnd)() = NULL, int nMaxParallel = INT_MAX) { CParallelLoopProcessor< CONTEXT_TYPE, CLoopMemberFuncJobItemProcessor > processor; processor.m_ItemProcessor.Init(pObject, pfnProcess, pfnBegin, pfnEnd); processor.Run(pContext, nStart, nCount, nChunkSize, nMaxParallel, pPool); } template class CParallelProcessorBase { protected: typedef CParallelProcessorBase ThisParallelProcessorBase_t; typedef Derived ThisParallelProcessorDerived_t; public: CParallelProcessorBase() { m_nActive = 0; } protected: void Run(int nMaxParallel = INT_MAX, int threadOverride = -1) { int i = g_pThreadPool->NumIdleThreads(); if (nMaxParallel < i) { i = nMaxParallel; } while (i-- > 0) { if (threadOverride == -1 || i == threadOverride - 1) { ++m_nActive; ThreadExecute(this, &ThisParallelProcessorBase_t::DoExecute)->Release(); } } if (threadOverride == -1 || threadOverride == 0) { ++m_nActive; DoExecute(); } while (m_nActive) { ThreadPause(); } } protected: void OnBegin() {} bool OnProcess() { return false; } void OnEnd() {} private: void DoExecute() { static_cast(this)->OnBegin(); while (static_cast(this)->OnProcess()) continue; static_cast(this)->OnEnd(); --m_nActive; } CInterlockedInt m_nActive; }; inline uintp FunctorExecuteThread(void* pParam) { CFunctor* pFunctor = (CFunctor*)pParam; (*pFunctor)(); pFunctor->Release(); return 0; } inline ThreadHandle_t ThreadExecuteSoloImpl(CFunctor* pFunctor, const char* pszName = NULL); inline ThreadHandle_t ThreadExecuteSolo(CJob* pJob) { return ThreadExecuteSoloImpl(CreateFunctor(pJob, &CJob::Execute), pJob->Describe()); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1) { return ThreadExecuteSoloImpl(CreateFunctor(a1), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2, T3 a3) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2, a3), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2, a3, a4), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2, a3, a4, a5), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2, a3, a4, a5, a6), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2, a3, a4, a5, a6, a7), pszName); } template inline ThreadHandle_t ThreadExecuteSolo(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8) { return ThreadExecuteSoloImpl(CreateFunctor(a1, a2, a3, a4, a5, a6, a7, a8), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2, T3 a3) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2, a3), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2, a3, a4), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2, a3, a4, a5), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7), pszName); } template inline ThreadHandle_t ThreadExecuteSoloRef(const char* pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8) { return ThreadExecuteSoloImpl(CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7, a8), pszName); } inline bool IThreadPool::YieldWait(CThreadEvent& theEvent, unsigned timeout) { CThreadEvent* pEvent = &theEvent; return (YieldWait(&pEvent, 1, true, timeout) != TW_TIMEOUT); } inline bool IThreadPool::YieldWait(CJob* pJob, unsigned timeout) { return (YieldWait(&pJob, 1, true, timeout) != TW_TIMEOUT); } inline JobStatus_t CJob::Execute() { if (IsFinished()) { return m_status; } AUTO_LOCK(m_mutex); AddRef(); JobStatus_t result; switch (m_status) { case JOB_STATUS_UNSERVICED: case JOB_STATUS_PENDING: { m_status = JOB_STATUS_INPROGRESS; #if defined( THREAD_PARENT_STACK_TRACE_ENABLED ) { CStackTop_ReferenceParentStack stackTop(m_ParentStackTrace, ARRAYSIZE(m_ParentStackTrace)); result = m_status = DoExecute(); } #else result = m_status = DoExecute(); #endif DoCleanup(); m_CompleteEvent.Set(); break; } case JOB_STATUS_INPROGRESS: AssertMsg(0, "Mutex Should have protected use while processing"); case JOB_OK: case JOB_STATUS_ABORTED: result = m_status; break; default: AssertMsg(m_status < JOB_OK, "Unknown job state"); result = m_status; } Release(); return result; } inline JobStatus_t CJob::TryExecute() { if (!IsFinished() && TryLock()) { Execute(); Unlock(); } return m_status; } inline JobStatus_t CJob::Abort(bool bDiscard) { if (IsFinished()) { return m_status; } AUTO_LOCK(m_mutex); AddRef(); JobStatus_t result; switch (m_status) { case JOB_STATUS_UNSERVICED: case JOB_STATUS_PENDING: { result = m_status = DoAbort(bDiscard); if (bDiscard) DoCleanup(); m_CompleteEvent.Set(); } break; case JOB_STATUS_ABORTED: case JOB_STATUS_INPROGRESS: case JOB_OK: result = m_status; break; default: AssertMsg(m_status < JOB_OK, "Unknown job state"); result = m_status; } Release(); return result; } #endif