doom3-bfg/neo/idlib/ParallelJobList.cpp
2012-11-26 12:58:24 -06:00

1297 lines
No EOL
37 KiB
C++

/*
===========================================================================
Doom 3 BFG Edition GPL Source Code
Copyright (C) 1993-2012 id Software LLC, a ZeniMax Media company.
This file is part of the Doom 3 BFG Edition GPL Source Code ("Doom 3 BFG Edition Source Code").
Doom 3 BFG Edition Source Code is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Doom 3 BFG Edition Source Code is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Doom 3 BFG Edition Source Code. If not, see <http://www.gnu.org/licenses/>.
In addition, the Doom 3 BFG Edition Source Code is also subject to certain additional terms. You should have received a copy of these additional terms immediately following the terms and conditions of the GNU General Public License which accompanied the Doom 3 BFG Edition Source Code. If not, please request a copy in writing from id Software at the address below.
If you have questions concerning this license or the applicable additional terms, you may contact in writing id Software LLC, c/o ZeniMax Media Inc., Suite 120, Rockville, Maryland 20850 USA.
===========================================================================
*/
#pragma hdrstop
#include "precompiled.h"
#include "ParallelJobList.h"
/*
================================================================================================
Job and Job-List names
================================================================================================
*/
const char * jobNames[] = {
ASSERT_ENUM_STRING( JOBLIST_RENDERER_FRONTEND, 0 ),
ASSERT_ENUM_STRING( JOBLIST_RENDERER_BACKEND, 1 ),
ASSERT_ENUM_STRING( JOBLIST_UTILITY, 9 ),
};
static const int MAX_REGISTERED_JOBS = 128;
struct registeredJob {
jobRun_t function;
const char * name;
} registeredJobs[MAX_REGISTERED_JOBS];
static int numRegisteredJobs;
const char * GetJobListName( jobListId_t id ) {
return jobNames[id];
}
/*
========================
IsRegisteredJob
========================
*/
static bool IsRegisteredJob( jobRun_t function ) {
for ( int i = 0; i < numRegisteredJobs; i++ ) {
if ( registeredJobs[i].function == function ) {
return true;
}
}
return false;
}
/*
========================
RegisterJob
========================
*/
void RegisterJob( jobRun_t function, const char * name ) {
if ( IsRegisteredJob( function ) ) {
return;
}
registeredJobs[numRegisteredJobs].function = function;
registeredJobs[numRegisteredJobs].name = name;
numRegisteredJobs++;
}
/*
========================
GetJobName
========================
*/
const char * GetJobName( jobRun_t function ) {
for ( int i = 0; i < numRegisteredJobs; i++ ) {
if ( registeredJobs[i].function == function ) {
return registeredJobs[i].name;
}
}
return "unknown";
}
/*
========================
idParallelJobRegistration::idParallelJobRegistration
========================
*/
idParallelJobRegistration::idParallelJobRegistration( jobRun_t function, const char * name ) {
RegisterJob( function, name );
}
int globalSpuLocalStoreActive;
void * globalSpuLocalStoreStart;
void * globalSpuLocalStoreEnd;
idSysMutex globalSpuLocalStoreMutex;
/*
================================================================================================
PS3
================================================================================================
*/
/*
================================================================================================
idParallelJobList_Threads
================================================================================================
*/
static idCVar jobs_longJobMicroSec( "jobs_longJobMicroSec", "10000", CVAR_INTEGER, "print a warning for jobs that take more than this number of microseconds" );
const static int MAX_THREADS = 32;
struct threadJobListState_t {
threadJobListState_t() :
jobList( NULL ),
version( 0xFFFFFFFF ),
signalIndex( 0 ),
lastJobIndex( 0 ),
nextJobIndex( -1 ) {}
threadJobListState_t( int _version ) :
jobList( NULL ),
version( _version ),
signalIndex( 0 ),
lastJobIndex( 0 ),
nextJobIndex( -1 ) {}
idParallelJobList_Threads * jobList;
int version;
int signalIndex;
int lastJobIndex;
int nextJobIndex;
};
struct threadStats_t {
unsigned int numExecutedJobs;
unsigned int numExecutedSyncs;
uint64 submitTime;
uint64 startTime;
uint64 endTime;
uint64 waitTime;
uint64 threadExecTime[MAX_THREADS];
uint64 threadTotalTime[MAX_THREADS];
};
class idParallelJobList_Threads {
public:
idParallelJobList_Threads( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs );
~idParallelJobList_Threads();
//------------------------
// These are called from the one thread that manages this list.
//------------------------
ID_INLINE void AddJob( jobRun_t function, void * data );
ID_INLINE void InsertSyncPoint( jobSyncType_t syncType );
void Submit( idParallelJobList_Threads * waitForJobList_, int parallelism );
void Wait();
bool TryWait();
bool IsSubmitted() const;
unsigned int GetNumExecutedJobs() const { return threadStats.numExecutedJobs; }
unsigned int GetNumSyncs() const { return threadStats.numExecutedSyncs; }
uint64 GetSubmitTimeMicroSec() const { return threadStats.submitTime; }
uint64 GetStartTimeMicroSec() const { return threadStats.startTime; }
uint64 GetFinishTimeMicroSec() const { return threadStats.endTime; }
uint64 GetWaitTimeMicroSec() const { return threadStats.waitTime; }
uint64 GetTotalProcessingTimeMicroSec() const;
uint64 GetTotalWastedTimeMicroSec() const;
uint64 GetUnitProcessingTimeMicroSec( int unit ) const;
uint64 GetUnitWastedTimeMicroSec( int unit ) const;
jobListId_t GetId() const { return listId; }
jobListPriority_t GetPriority() const { return listPriority; }
int GetVersion() { return version.GetValue(); }
bool WaitForOtherJobList();
//------------------------
// This is thread safe and called from the job threads.
//------------------------
enum runResult_t {
RUN_OK = 0,
RUN_PROGRESS = BIT( 0 ),
RUN_DONE = BIT( 1 ),
RUN_STALLED = BIT( 2 )
};
int RunJobs( unsigned int threadNum, threadJobListState_t & state, bool singleJob );
private:
static const int NUM_DONE_GUARDS = 4; // cycle through 4 guards so we can cyclicly chain job lists
bool threaded;
bool done;
bool hasSignal;
jobListId_t listId;
jobListPriority_t listPriority;
unsigned int maxJobs;
unsigned int maxSyncs;
unsigned int numSyncs;
int lastSignalJob;
idSysInterlockedInteger * waitForGuard;
idSysInterlockedInteger doneGuards[NUM_DONE_GUARDS];
int currentDoneGuard;
idSysInterlockedInteger version;
struct job_t {
jobRun_t function;
void * data;
int executed;
};
idList< job_t, TAG_JOBLIST > jobList;
idList< idSysInterlockedInteger, TAG_JOBLIST > signalJobCount;
idSysInterlockedInteger currentJob;
idSysInterlockedInteger fetchLock;
idSysInterlockedInteger numThreadsExecuting;
threadStats_t deferredThreadStats;
threadStats_t threadStats;
int RunJobsInternal( unsigned int threadNum, threadJobListState_t & state, bool singleJob );
static void Nop( void * data ) {}
static int JOB_SIGNAL;
static int JOB_SYNCHRONIZE;
static int JOB_LIST_DONE;
};
int idParallelJobList_Threads::JOB_SIGNAL;
int idParallelJobList_Threads::JOB_SYNCHRONIZE;
int idParallelJobList_Threads::JOB_LIST_DONE;
/*
========================
idParallelJobList_Threads::idParallelJobList_Threads
========================
*/
idParallelJobList_Threads::idParallelJobList_Threads( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs ) :
threaded( true ),
done( true ),
hasSignal( false ),
listId( id ),
listPriority( priority ),
numSyncs( 0 ),
lastSignalJob( 0 ),
waitForGuard( NULL ),
currentDoneGuard( 0 ),
jobList() {
assert( listPriority != JOBLIST_PRIORITY_NONE );
this->maxJobs = maxJobs;
this->maxSyncs = maxSyncs;
jobList.AssureSize( maxJobs + maxSyncs * 2 + 1 ); // syncs go in as dummy jobs and one more to update the doneCount
jobList.SetNum( 0 );
signalJobCount.AssureSize( maxSyncs + 1 ); // need one extra for submit
signalJobCount.SetNum( 0 );
memset( &deferredThreadStats, 0, sizeof( threadStats_t ) );
memset( &threadStats, 0, sizeof( threadStats_t ) );
}
/*
========================
idParallelJobList_Threads::~idParallelJobList_Threads
========================
*/
idParallelJobList_Threads::~idParallelJobList_Threads() {
Wait();
}
/*
========================
idParallelJobList_Threads::AddJob
========================
*/
ID_INLINE void idParallelJobList_Threads::AddJob( jobRun_t function, void * data ) {
assert( done );
#if defined( _DEBUG )
// make sure there isn't already a job with the same function and data in the list
if ( jobList.Num() < 1000 ) { // don't do this N^2 slow check on big lists
for ( int i = 0; i < jobList.Num(); i++ ) {
assert( jobList[i].function != function || jobList[i].data != data );
}
}
#endif
if ( 1 ) { // JDC: this never worked in tech5! !jobList.IsFull() ) {
job_t & job = jobList.Alloc();
job.function = function;
job.data = data;
job.executed = 0;
} else {
// debug output to show us what is overflowing
int currentJobCount[MAX_REGISTERED_JOBS] = {};
for ( int i = 0; i < jobList.Num(); ++i ) {
const char * jobName = GetJobName( jobList[ i ].function );
for ( int j = 0; j < numRegisteredJobs; ++j ) {
if ( jobName == registeredJobs[ j ].name ) {
currentJobCount[ j ]++;
break;
}
}
}
// print the quantity of each job type
for ( int i = 0; i < numRegisteredJobs; ++i ) {
if ( currentJobCount[ i ] > 0 ) {
idLib::Printf( "Job: %s, # %d", registeredJobs[ i ].name, currentJobCount[ i ] );
}
}
idLib::Error( "Can't add job '%s', too many jobs %d", GetJobName( function ), jobList.Num() );
}
}
/*
========================
idParallelJobList_Threads::InsertSyncPoint
========================
*/
ID_INLINE void idParallelJobList_Threads::InsertSyncPoint( jobSyncType_t syncType ) {
assert( done );
switch( syncType ) {
case SYNC_SIGNAL: {
assert( !hasSignal );
if ( jobList.Num() ) {
assert( !hasSignal );
signalJobCount.Alloc();
signalJobCount[signalJobCount.Num() - 1].SetValue( jobList.Num() - lastSignalJob );
lastSignalJob = jobList.Num();
job_t & job = jobList.Alloc();
job.function = Nop;
job.data = & JOB_SIGNAL;
hasSignal = true;
}
break;
}
case SYNC_SYNCHRONIZE: {
if ( hasSignal ) {
job_t & job = jobList.Alloc();
job.function = Nop;
job.data = & JOB_SYNCHRONIZE;
hasSignal = false;
numSyncs++;
}
break;
}
}
}
/*
========================
idParallelJobList_Threads::Submit
========================
*/
void idParallelJobList_Threads::Submit( idParallelJobList_Threads * waitForJobList, int parallelism ) {
assert( done );
assert( numSyncs <= maxSyncs );
assert( (unsigned int) jobList.Num() <= maxJobs + numSyncs * 2 );
assert( fetchLock.GetValue() == 0 );
done = false;
currentJob.SetValue( 0 );
memset( &deferredThreadStats, 0, sizeof( deferredThreadStats ) );
deferredThreadStats.numExecutedJobs = jobList.Num() - numSyncs * 2;
deferredThreadStats.numExecutedSyncs = numSyncs;
deferredThreadStats.submitTime = Sys_Microseconds();
deferredThreadStats.startTime = 0;
deferredThreadStats.endTime = 0;
deferredThreadStats.waitTime = 0;
if ( jobList.Num() == 0 ) {
return;
}
if ( waitForJobList != NULL ) {
waitForGuard = & waitForJobList->doneGuards[waitForJobList->currentDoneGuard];
} else {
waitForGuard = NULL;
}
currentDoneGuard = ( currentDoneGuard + 1 ) & ( NUM_DONE_GUARDS - 1 );
doneGuards[currentDoneGuard].SetValue( 1 );
signalJobCount.Alloc();
signalJobCount[signalJobCount.Num() - 1].SetValue( jobList.Num() - lastSignalJob );
job_t & job = jobList.Alloc();
job.function = Nop;
job.data = & JOB_LIST_DONE;
if ( threaded ) {
// hand over to the manager
void SubmitJobList( idParallelJobList_Threads * jobList, int parallelism );
SubmitJobList( this, parallelism );
} else {
// run all the jobs right here
threadJobListState_t state( GetVersion() );
RunJobs( 0, state, false );
}
}
/*
========================
idParallelJobList_Threads::Wait
========================
*/
void idParallelJobList_Threads::Wait() {
if ( jobList.Num() > 0 ) {
// don't lock up but return if the job list was never properly submitted
if ( !verify( !done && signalJobCount.Num() > 0 ) ) {
return;
}
bool waited = false;
uint64 waitStart = Sys_Microseconds();
while ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
Sys_Yield();
waited = true;
}
version.Increment();
while ( numThreadsExecuting.GetValue() > 0 ) {
Sys_Yield();
waited = true;
}
jobList.SetNum( 0 );
signalJobCount.SetNum( 0 );
numSyncs = 0;
lastSignalJob = 0;
uint64 waitEnd = Sys_Microseconds();
deferredThreadStats.waitTime = waited ? ( waitEnd - waitStart ) : 0;
}
memcpy( & threadStats, & deferredThreadStats, sizeof( threadStats ) );
done = true;
}
/*
========================
idParallelJobList_Threads::TryWait
========================
*/
bool idParallelJobList_Threads::TryWait() {
if ( jobList.Num() == 0 || signalJobCount[signalJobCount.Num() - 1].GetValue() <= 0 ) {
Wait();
return true;
}
return false;
}
/*
========================
idParallelJobList_Threads::IsSubmitted
========================
*/
bool idParallelJobList_Threads::IsSubmitted() const {
return !done;
}
/*
========================
idParallelJobList_Threads::GetTotalProcessingTimeMicroSec
========================
*/
uint64 idParallelJobList_Threads::GetTotalProcessingTimeMicroSec() const {
uint64 total = 0;
for ( int unit = 0; unit < MAX_THREADS; unit++ ) {
total += threadStats.threadExecTime[unit];
}
return total;
}
/*
========================
idParallelJobList_Threads::GetTotalWastedTimeMicroSec
========================
*/
uint64 idParallelJobList_Threads::GetTotalWastedTimeMicroSec() const {
uint64 total = 0;
for ( int unit = 0; unit < MAX_THREADS; unit++ ) {
total += threadStats.threadTotalTime[unit] - threadStats.threadExecTime[unit];
}
return total;
}
/*
========================
idParallelJobList_Threads::GetUnitProcessingTimeMicroSec
========================
*/
uint64 idParallelJobList_Threads::GetUnitProcessingTimeMicroSec( int unit ) const {
if ( unit < 0 || unit >= MAX_THREADS ) {
return 0;
}
return threadStats.threadExecTime[unit];
}
/*
========================
idParallelJobList_Threads::GetUnitWastedTimeMicroSec
========================
*/
uint64 idParallelJobList_Threads::GetUnitWastedTimeMicroSec( int unit ) const {
if ( unit < 0 || unit >= MAX_THREADS ) {
return 0;
}
return threadStats.threadTotalTime[unit] - threadStats.threadExecTime[unit];
}
#ifndef _DEBUG
volatile float longJobTime;
volatile jobRun_t longJobFunc;
volatile void * longJobData;
#endif
/*
========================
idParallelJobList_Threads::RunJobsInternal
========================
*/
int idParallelJobList_Threads::RunJobsInternal( unsigned int threadNum, threadJobListState_t & state, bool singleJob ) {
if ( state.version != version.GetValue() ) {
// trying to run an old version of this list that is already done
return RUN_DONE;
}
assert( threadNum < MAX_THREADS );
if ( deferredThreadStats.startTime == 0 ) {
deferredThreadStats.startTime = Sys_Microseconds(); // first time any thread is running jobs from this list
}
int result = RUN_OK;
do {
// run through all signals and syncs before the last job that has been or is being executed
// this loop is really an optimization to minimize the time spent in the fetchLock section below
for ( ; state.lastJobIndex < (int) currentJob.GetValue() && state.lastJobIndex < jobList.Num(); state.lastJobIndex++ ) {
if ( jobList[state.lastJobIndex].data == & JOB_SIGNAL ) {
state.signalIndex++;
assert( state.signalIndex < signalJobCount.Num() );
} else if ( jobList[state.lastJobIndex].data == & JOB_SYNCHRONIZE ) {
assert( state.signalIndex > 0 );
if ( signalJobCount[state.signalIndex - 1].GetValue() > 0 ) {
// stalled on a synchronization point
return ( result | RUN_STALLED );
}
} else if ( jobList[state.lastJobIndex].data == & JOB_LIST_DONE ) {
if ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
// stalled on a synchronization point
return ( result | RUN_STALLED );
}
}
}
// try to lock to fetch a new job
if ( fetchLock.Increment() == 1 ) {
// grab a new job
state.nextJobIndex = currentJob.Increment() - 1;
// run through any remaining signals and syncs (this should rarely iterate more than once)
for ( ; state.lastJobIndex <= state.nextJobIndex && state.lastJobIndex < jobList.Num(); state.lastJobIndex++ ) {
if ( jobList[state.lastJobIndex].data == & JOB_SIGNAL ) {
state.signalIndex++;
assert( state.signalIndex < signalJobCount.Num() );
} else if ( jobList[state.lastJobIndex].data == & JOB_SYNCHRONIZE ) {
assert( state.signalIndex > 0 );
if ( signalJobCount[state.signalIndex - 1].GetValue() > 0 ) {
// return this job to the list
currentJob.Decrement();
// release the fetch lock
fetchLock.Decrement();
// stalled on a synchronization point
return ( result | RUN_STALLED );
}
} else if ( jobList[state.lastJobIndex].data == & JOB_LIST_DONE ) {
if ( signalJobCount[signalJobCount.Num() - 1].GetValue() > 0 ) {
// return this job to the list
currentJob.Decrement();
// release the fetch lock
fetchLock.Decrement();
// stalled on a synchronization point
return ( result | RUN_STALLED );
}
// decrement the done count
doneGuards[currentDoneGuard].Decrement();
}
}
// release the fetch lock
fetchLock.Decrement();
} else {
// release the fetch lock
fetchLock.Decrement();
// another thread is fetching right now so consider stalled
return ( result | RUN_STALLED );
}
// if at the end of the job list we're done
if ( state.nextJobIndex >= jobList.Num() ) {
return ( result | RUN_DONE );
}
// execute the next job
{
uint64 jobStart = Sys_Microseconds();
jobList[state.nextJobIndex].function( jobList[state.nextJobIndex].data );
jobList[state.nextJobIndex].executed = 1;
uint64 jobEnd = Sys_Microseconds();
deferredThreadStats.threadExecTime[threadNum] += jobEnd - jobStart;
#ifndef _DEBUG
if ( jobs_longJobMicroSec.GetInteger() > 0 ) {
if ( jobEnd - jobStart > jobs_longJobMicroSec.GetInteger()
&& GetId() != JOBLIST_UTILITY ) {
longJobTime = ( jobEnd - jobStart ) * ( 1.0f / 1000.0f );
longJobFunc = jobList[state.nextJobIndex].function;
longJobData = jobList[state.nextJobIndex].data;
const char * jobName = GetJobName( jobList[state.nextJobIndex].function );
const char * jobListName = GetJobListName( GetId() );
idLib::Printf( "%1.1f milliseconds for a single '%s' job from job list %s on thread %d\n", longJobTime, jobName, jobListName, threadNum );
}
}
#endif
}
result |= RUN_PROGRESS;
// decrease the job count for the current signal
if ( signalJobCount[state.signalIndex].Decrement() == 0 ) {
// if this was the very last job of the job list
if ( state.signalIndex == signalJobCount.Num() - 1 ) {
deferredThreadStats.endTime = Sys_Microseconds();
return ( result | RUN_DONE );
}
}
} while( ! singleJob );
return result;
}
/*
========================
idParallelJobList_Threads::RunJobs
========================
*/
int idParallelJobList_Threads::RunJobs( unsigned int threadNum, threadJobListState_t & state, bool singleJob ) {
uint64 start = Sys_Microseconds();
numThreadsExecuting.Increment();
int result = RunJobsInternal( threadNum, state, singleJob );
numThreadsExecuting.Decrement();
deferredThreadStats.threadTotalTime[threadNum] += Sys_Microseconds() - start;
return result;
}
/*
========================
idParallelJobList_Threads::WaitForOtherJobList
========================
*/
bool idParallelJobList_Threads::WaitForOtherJobList() {
if ( waitForGuard != NULL ) {
if ( waitForGuard->GetValue() > 0 ) {
return true;
}
}
return false;
}
/*
================================================================================================
idParallelJobList
================================================================================================
*/
/*
========================
idParallelJobList::idParallelJobList
========================
*/
idParallelJobList::idParallelJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color ) {
assert( priority > JOBLIST_PRIORITY_NONE );
this->jobListThreads = new (TAG_JOBLIST) idParallelJobList_Threads( id, priority, maxJobs, maxSyncs );
this->color = color;
}
/*
========================
idParallelJobList::~idParallelJobList
========================
*/
idParallelJobList::~idParallelJobList() {
delete jobListThreads;
}
/*
========================
idParallelJobList::AddJob
========================
*/
void idParallelJobList::AddJob( jobRun_t function, void * data ) {
assert( IsRegisteredJob( function ) );
jobListThreads->AddJob( function, data );
}
/*
========================
idParallelJobList::AddJobSPURS
========================
*/
CellSpursJob128 * idParallelJobList::AddJobSPURS() {
return NULL;
}
/*
========================
idParallelJobList::InsertSyncPoint
========================
*/
void idParallelJobList::InsertSyncPoint( jobSyncType_t syncType ) {
jobListThreads->InsertSyncPoint( syncType );
}
/*
========================
idParallelJobList::Wait
========================
*/
void idParallelJobList::Wait() {
if ( jobListThreads != NULL ) {
jobListThreads->Wait();
}
}
/*
========================
idParallelJobList::TryWait
========================
*/
bool idParallelJobList::TryWait() {
bool done = true;
if ( jobListThreads != NULL ) {
done &= jobListThreads->TryWait();
}
return done;
}
/*
========================
idParallelJobList::Submit
========================
*/
void idParallelJobList::Submit( idParallelJobList * waitForJobList, int parallelism ) {
assert( waitForJobList != this );
jobListThreads->Submit( ( waitForJobList != NULL ) ? waitForJobList->jobListThreads : NULL, parallelism );
}
/*
========================
idParallelJobList::IsSubmitted
========================
*/
bool idParallelJobList::IsSubmitted() const {
return jobListThreads->IsSubmitted();
}
/*
========================
idParallelJobList::GetNumExecutedJobs
========================
*/
unsigned int idParallelJobList::GetNumExecutedJobs() const {
return jobListThreads->GetNumExecutedJobs();
}
/*
========================
idParallelJobList::GetNumSyncs
========================
*/
unsigned int idParallelJobList::GetNumSyncs() const {
return jobListThreads->GetNumSyncs();
}
/*
========================
idParallelJobList::GetSubmitTimeMicroSec
========================
*/
uint64 idParallelJobList::GetSubmitTimeMicroSec() const {
return jobListThreads->GetSubmitTimeMicroSec();
}
/*
========================
idParallelJobList::GetStartTimeMicroSec
========================
*/
uint64 idParallelJobList::GetStartTimeMicroSec() const {
return jobListThreads->GetStartTimeMicroSec();
}
/*
========================
idParallelJobList::GetFinishTimeMicroSec
========================
*/
uint64 idParallelJobList::GetFinishTimeMicroSec() const {
return jobListThreads->GetFinishTimeMicroSec();
}
/*
========================
idParallelJobList::GetWaitTimeMicroSec
========================
*/
uint64 idParallelJobList::GetWaitTimeMicroSec() const {
return jobListThreads->GetWaitTimeMicroSec();
}
/*
========================
idParallelJobList::GetTotalProcessingTimeMicroSec
========================
*/
uint64 idParallelJobList::GetTotalProcessingTimeMicroSec() const {
return jobListThreads->GetTotalProcessingTimeMicroSec();
}
/*
========================
idParallelJobList::GetTotalWastedTimeMicroSec
========================
*/
uint64 idParallelJobList::GetTotalWastedTimeMicroSec() const {
return jobListThreads->GetTotalWastedTimeMicroSec();
}
/*
========================
idParallelJobList::GetUnitProcessingTimeMicroSec
========================
*/
uint64 idParallelJobList::GetUnitProcessingTimeMicroSec( int unit ) const {
return jobListThreads->GetUnitProcessingTimeMicroSec( unit );
}
/*
========================
idParallelJobList::GetUnitWastedTimeMicroSec
========================
*/
uint64 idParallelJobList::GetUnitWastedTimeMicroSec( int unit ) const {
return jobListThreads->GetUnitWastedTimeMicroSec( unit );
}
/*
========================
idParallelJobList::GetId
========================
*/
jobListId_t idParallelJobList::GetId() const {
return jobListThreads->GetId();
}
/*
================================================================================================
idJobThread
================================================================================================
*/
const int JOB_THREAD_STACK_SIZE = 256 * 1024; // same size as the SPU local store
struct threadJobList_t {
idParallelJobList_Threads * jobList;
int version;
};
static idCVar jobs_prioritize( "jobs_prioritize", "1", CVAR_BOOL | CVAR_NOCHEAT, "prioritize job lists" );
class idJobThread : public idSysThread {
public:
idJobThread();
~idJobThread();
void Start( core_t core, unsigned int threadNum );
void AddJobList( idParallelJobList_Threads * jobList );
private:
threadJobList_t jobLists[MAX_JOBLISTS]; // cyclic buffer with job lists
unsigned int firstJobList; // index of the last job list the thread grabbed
unsigned int lastJobList; // index where the next job list to work on will be added
idSysMutex addJobMutex;
unsigned int threadNum;
virtual int Run();
};
/*
========================
idJobThread::idJobThread
========================
*/
idJobThread::idJobThread() :
firstJobList( 0 ),
lastJobList( 0 ),
threadNum( 0 ) {
}
/*
========================
idJobThread::~idJobThread
========================
*/
idJobThread::~idJobThread() {
}
/*
========================
idJobThread::Start
========================
*/
void idJobThread::Start( core_t core, unsigned int threadNum ) {
this->threadNum = threadNum;
StartWorkerThread( va( "JobListProcessor_%d", threadNum ), core, THREAD_NORMAL, JOB_THREAD_STACK_SIZE );
}
/*
========================
idJobThread::AddJobList
========================
*/
void idJobThread::AddJobList( idParallelJobList_Threads * jobList ) {
// must lock because multiple threads may try to add new job lists at the same time
addJobMutex.Lock();
// wait until there is space available because in rare cases multiple versions of the same job lists may still be queued
while( lastJobList - firstJobList >= MAX_JOBLISTS ) {
Sys_Yield();
}
assert( lastJobList - firstJobList < MAX_JOBLISTS );
jobLists[lastJobList & ( MAX_JOBLISTS - 1 )].jobList = jobList;
jobLists[lastJobList & ( MAX_JOBLISTS - 1 )].version = jobList->GetVersion();
lastJobList++;
addJobMutex.Unlock();
}
/*
========================
idJobThread::Run
========================
*/
int idJobThread::Run() {
threadJobListState_t threadJobListState[MAX_JOBLISTS];
int numJobLists = 0;
int lastStalledJobList = -1;
while ( !IsTerminating() ) {
// fetch any new job lists and add them to the local list
if ( numJobLists < MAX_JOBLISTS && firstJobList < lastJobList ) {
threadJobListState[numJobLists].jobList = jobLists[firstJobList & ( MAX_JOBLISTS - 1 )].jobList;
threadJobListState[numJobLists].version = jobLists[firstJobList & ( MAX_JOBLISTS - 1 )].version;
threadJobListState[numJobLists].signalIndex = 0;
threadJobListState[numJobLists].lastJobIndex = 0;
threadJobListState[numJobLists].nextJobIndex = -1;
numJobLists++;
firstJobList++;
}
if ( numJobLists == 0 ) {
break;
}
int currentJobList = 0;
jobListPriority_t priority = JOBLIST_PRIORITY_NONE;
if ( lastStalledJobList < 0 ) {
// find the job list with the highest priority
for ( int i = 0; i < numJobLists; i++ ) {
if ( threadJobListState[i].jobList->GetPriority() > priority && !threadJobListState[i].jobList->WaitForOtherJobList() ) {
priority = threadJobListState[i].jobList->GetPriority();
currentJobList = i;
}
}
} else {
// try to hide the stall with a job from a list that has equal or higher priority
currentJobList = lastStalledJobList;
priority = threadJobListState[lastStalledJobList].jobList->GetPriority();
for ( int i = 0; i < numJobLists; i++ ) {
if ( i != lastStalledJobList && threadJobListState[i].jobList->GetPriority() >= priority && !threadJobListState[i].jobList->WaitForOtherJobList() ) {
priority = threadJobListState[i].jobList->GetPriority();
currentJobList = i;
}
}
}
// if the priority is high then try to run through the whole list to reduce the overhead
// otherwise run a single job and re-evaluate priorities for the next job
bool singleJob = ( priority == JOBLIST_PRIORITY_HIGH ) ? false : jobs_prioritize.GetBool();
// try running one or more jobs from the current job list
int result = threadJobListState[currentJobList].jobList->RunJobs( threadNum, threadJobListState[currentJobList], singleJob );
if ( ( result & idParallelJobList_Threads::RUN_DONE ) != 0 ) {
// done with this job list so remove it from the local list
for ( int i = currentJobList; i < numJobLists - 1; i++ ) {
threadJobListState[i] = threadJobListState[i + 1];
}
numJobLists--;
lastStalledJobList = -1;
} else if ( ( result & idParallelJobList_Threads::RUN_STALLED ) != 0 ) {
// yield when stalled on the same job list again without making any progress
if ( currentJobList == lastStalledJobList ) {
if ( ( result & idParallelJobList_Threads::RUN_PROGRESS ) == 0 ) {
Sys_Yield();
}
}
lastStalledJobList = currentJobList;
} else {
lastStalledJobList = -1;
}
}
return 0;
}
/*
================================================================================================
idParallelJobManagerLocal
================================================================================================
*/
extern void Sys_CPUCount( int & logicalNum, int & coreNum, int & packageNum );
// WINDOWS LOGICAL PROCESSOR LIMITS:
//
// http://download.microsoft.com/download/5/7/7/577a5684-8a83-43ae-9272-ff260a9c20e2/Hyper-thread_Windows.doc
//
// Physical Logical (Cores + HT)
// Windows XP Home Edition 1 2
// Windows XP Professional 2 4
// Windows Server 2003, Standard Edition 4 8
// Windows Server 2003, Enterprise Edition 8 16
// Windows Server 2003, Datacenter Edition 32 32
//
// Windows Vista ? ?
//
// Windows 7 Starter 1 32/64
// Windows 7 Home Basic 1 32/64
// Windows 7 Professional 2 32/64
//
//
// Hyperthreading is not dead yet. Intel's Core i7 Processor is quad-core with HT for 8 logicals.
// DOOM3: We don't have that many jobs, so just set this fairly low so we don't spin up a ton of idle threads
#define MAX_JOB_THREADS 2
#define NUM_JOB_THREADS "2"
#define JOB_THREAD_CORES { CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY, \
CORE_ANY, CORE_ANY, CORE_ANY, CORE_ANY }
idCVar jobs_numThreads( "jobs_numThreads", NUM_JOB_THREADS, CVAR_INTEGER | CVAR_NOCHEAT, "number of threads used to crunch through jobs", 0, MAX_JOB_THREADS );
class idParallelJobManagerLocal : public idParallelJobManager {
public:
virtual ~idParallelJobManagerLocal() {}
virtual void Init();
virtual void Shutdown();
virtual idParallelJobList * AllocJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color );
virtual void FreeJobList( idParallelJobList * jobList );
virtual int GetNumJobLists() const;
virtual int GetNumFreeJobLists() const;
virtual idParallelJobList * GetJobList( int index );
virtual int GetNumProcessingUnits();
virtual void WaitForAllJobLists();
void Submit( idParallelJobList_Threads * jobList, int parallelism );
private:
idJobThread threads[MAX_JOB_THREADS];
unsigned int maxThreads;
int numPhysicalCpuCores;
int numLogicalCpuCores;
int numCpuPackages;
idStaticList< idParallelJobList *, MAX_JOBLISTS > jobLists;
};
idParallelJobManagerLocal parallelJobManagerLocal;
idParallelJobManager * parallelJobManager = &parallelJobManagerLocal;
/*
========================
SubmitJobList
========================
*/
void SubmitJobList( idParallelJobList_Threads * jobList, int parallelism ) {
parallelJobManagerLocal.Submit( jobList, parallelism );
}
/*
========================
idParallelJobManagerLocal::Init
========================
*/
void idParallelJobManagerLocal::Init() {
// on consoles this will have specific cores for the threads, but on PC they will all be CORE_ANY
core_t cores[] = JOB_THREAD_CORES;
assert( sizeof( cores ) / sizeof( cores[0] ) >= MAX_JOB_THREADS );
for ( int i = 0; i < MAX_JOB_THREADS; i++ ) {
threads[i].Start( cores[i], i );
}
maxThreads = jobs_numThreads.GetInteger();
Sys_CPUCount( numPhysicalCpuCores, numLogicalCpuCores, numCpuPackages );
}
/*
========================
idParallelJobManagerLocal::Shutdown
========================
*/
void idParallelJobManagerLocal::Shutdown() {
for ( int i = 0; i < MAX_JOB_THREADS; i++ ) {
threads[i].StopThread();
}
}
/*
========================
idParallelJobManagerLocal::AllocJobList
========================
*/
idParallelJobList * idParallelJobManagerLocal::AllocJobList( jobListId_t id, jobListPriority_t priority, unsigned int maxJobs, unsigned int maxSyncs, const idColor * color ) {
for ( int i = 0; i < jobLists.Num(); i++ ) {
if ( jobLists[i]->GetId() == id ) {
// idStudio may cause job lists to be allocated multiple times
}
}
idParallelJobList * jobList = new (TAG_JOBLIST) idParallelJobList( id, priority, maxJobs, maxSyncs, color );
jobLists.Append( jobList );
return jobList;
}
/*
========================
idParallelJobManagerLocal::FreeJobList
========================
*/
void idParallelJobManagerLocal::FreeJobList( idParallelJobList * jobList ) {
if ( jobList == NULL ) {
return;
}
// wait for all job threads to finish because job list deletion is not thread safe
for ( unsigned int i = 0; i < maxThreads; i++ ) {
threads[i].WaitForThread();
}
int index = jobLists.FindIndex( jobList );
assert( index >= 0 && jobLists[index] == jobList );
jobLists[index]->Wait();
delete jobLists[index];
jobLists.RemoveIndexFast( index );
}
/*
========================
idParallelJobManagerLocal::GetNumJobLists
========================
*/
int idParallelJobManagerLocal::GetNumJobLists() const {
return jobLists.Num();
}
/*
========================
idParallelJobManagerLocal::GetNumFreeJobLists
========================
*/
int idParallelJobManagerLocal::GetNumFreeJobLists() const {
return MAX_JOBLISTS - jobLists.Num();
}
/*
========================
idParallelJobManagerLocal::GetJobList
========================
*/
idParallelJobList * idParallelJobManagerLocal::GetJobList( int index ) {
return jobLists[index];
}
/*
========================
idParallelJobManagerLocal::GetNumProcessingUnits
========================
*/
int idParallelJobManagerLocal::GetNumProcessingUnits() {
return maxThreads;
}
/*
========================
idParallelJobManagerLocal::WaitForAllJobLists
========================
*/
void idParallelJobManagerLocal::WaitForAllJobLists() {
// wait for all job lists to complete
for ( int i = 0; i < jobLists.Num(); i++ ) {
jobLists[i]->Wait();
}
}
/*
========================
idParallelJobManagerLocal::Submit
========================
*/
void idParallelJobManagerLocal::Submit( idParallelJobList_Threads * jobList, int parallelism ) {
if ( jobs_numThreads.IsModified() ) {
maxThreads = idMath::ClampInt( 0, MAX_JOB_THREADS, jobs_numThreads.GetInteger() );
jobs_numThreads.ClearModified();
}
// determine the number of threads to use
int numThreads = maxThreads;
if ( parallelism == JOBLIST_PARALLELISM_DEFAULT ) {
numThreads = maxThreads;
} else if ( parallelism == JOBLIST_PARALLELISM_MAX_CORES ) {
numThreads = numLogicalCpuCores;
} else if ( parallelism == JOBLIST_PARALLELISM_MAX_THREADS ) {
numThreads = MAX_JOB_THREADS;
} else if ( parallelism > MAX_JOB_THREADS ) {
numThreads = MAX_JOB_THREADS;
} else {
numThreads = parallelism;
}
if ( numThreads <= 0 ) {
threadJobListState_t state( jobList->GetVersion() );
jobList->RunJobs( 0, state, false );
return;
}
for ( int i = 0; i < numThreads; i++ ) {
threads[i].AddJobList( jobList );
threads[i].SignalWork();
}
}