Coimplete operation queue implementation

git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@29508 72102866-910b-0410-8b05-ffd578937521
This commit is contained in:
rfm 2010-02-08 10:34:27 +00:00
parent caa3898ca2
commit 1412e70c06
4 changed files with 292 additions and 56 deletions

View file

@ -1,3 +1,10 @@
2010-02-08 Richard Frith-Macdonald <rfm@gnu.org>
* Headers/Foundation/NSOperation.h:
* Source/NSOperation.m:
Complete implementation of NSOperationQueue (OSX 10.6 apart from the
methods using objc2 blocks).
2010-02-06 Richard Frith-Macdonald <rfm@gnu.org> 2010-02-06 Richard Frith-Macdonald <rfm@gnu.org>
* Headers/Foundation/NSOperation.h: * Headers/Foundation/NSOperation.h:

View file

@ -191,11 +191,30 @@ enum {
@private @private
id _internal; id _internal;
} }
#if OS_API_VERSION(100600, GS_API_LATEST)
/** If called from within the -main method of an operation which is
* currently being executed by a queue, this returns the queue instance
* in use.
*/
+ (id) currentQueue;
/** Returns the default queue on the main thread.
*/
+ (id) mainQueue;
#endif
/** Adds an operation to the receiver. /** Adds an operation to the receiver.
*/ */
- (void) addOperation: (NSOperation *)op; - (void) addOperation: (NSOperation *)op;
#if OS_API_VERSION(100600, GS_API_LATEST)
/** Adds multiple operations to the receiver and (optionally) waits for
* all the operations in the queue to finish.
*/
- (void) addOperations: (NSArray *)ops
waitUntilFinished: (BOOL)shouldWait;
#endif
/** Cancels all outstanding operations in the queue. /** Cancels all outstanding operations in the queue.
*/ */
- (void) cancelAllOperations; - (void) cancelAllOperations;

View file

@ -1532,7 +1532,7 @@ objc_create_block_classes_as_subclasses_of(Class super) __attribute__((weak));
- (NSString*) description - (NSString*) description
{ {
return [NSString stringWithFormat: @"<%s: %p>", return [NSString stringWithFormat: @"<%s: %p>",
GSClassNameFromObject(self), self]; GSNameFromClass([self class]), self];
} }
/** /**

View file

@ -35,6 +35,7 @@
#import "Foundation/NSLock.h" #import "Foundation/NSLock.h"
#import "Foundation/NSKeyValueObserving.h" #import "Foundation/NSKeyValueObserving.h"
#import "Foundation/NSThread.h" #import "Foundation/NSThread.h"
#import "GSPrivate.h"
#define GSInternal NSOperationInternal #define GSInternal NSOperationInternal
#include "GSInternal.h" #include "GSInternal.h"
@ -314,18 +315,16 @@ static NSArray *empty = nil;
- (void) setQueuePriority: (NSOperationQueuePriority)pri - (void) setQueuePriority: (NSOperationQueuePriority)pri
{ {
if (pri < NSOperationQueuePriorityVeryLow) if (pri <= NSOperationQueuePriorityVeryLow)
pri = NSOperationQueuePriorityVeryLow; pri = NSOperationQueuePriorityVeryLow;
else if (pri < NSOperationQueuePriorityLow) else if (pri <= NSOperationQueuePriorityLow)
pri = NSOperationQueuePriorityLow; pri = NSOperationQueuePriorityLow;
else if (pri < NSOperationQueuePriorityNormal) else if (pri < NSOperationQueuePriorityHigh)
pri = NSOperationQueuePriorityNormal; pri = NSOperationQueuePriorityNormal;
else if (pri > NSOperationQueuePriorityVeryHigh) else if (pri < NSOperationQueuePriorityVeryHigh)
pri = NSOperationQueuePriorityVeryHigh;
else if (pri > NSOperationQueuePriorityHigh)
pri = NSOperationQueuePriorityHigh; pri = NSOperationQueuePriorityHigh;
else if (pri > NSOperationQueuePriorityNormal) else
pri = NSOperationQueuePriorityNormal; pri = NSOperationQueuePriorityVeryHigh;
if (pri != internal->priority) if (pri != internal->priority)
{ {
@ -471,12 +470,13 @@ GS_END_INTERNAL(NSOperationQueue)
@interface NSOperationQueue (Private) @interface NSOperationQueue (Private)
+ (void) _mainQueue;
- (void) observeValueForKeyPath: (NSString *)keyPath - (void) observeValueForKeyPath: (NSString *)keyPath
ofObject: (id)object ofObject: (id)object
change: (NSDictionary *)change change: (NSDictionary *)change
context: (void *)context; context: (void *)context;
- (void) _thread; - (void) _thread;
- (void) _update; - (void) _updateThreads;
@end @end
static NSInteger maxThreads = 200; // FIXME ... how many really? static NSInteger maxThreads = 200; // FIXME ... how many really?
@ -492,8 +492,31 @@ sortFunc(id o1, id o2, void *ctxt)
return NSOrderedSame; return NSOrderedSame;
} }
static NSString *threadKey = @"NSOperationQueue";
static NSOperationQueue *mainQueue = nil;
@implementation NSOperationQueue @implementation NSOperationQueue
+ (id) currentQueue
{
return [[[NSThread currentThread] threadDictionary] objectForKey: threadKey];
}
+ (void) initialize
{
if (mainQueue == nil)
{
[self performSelectorOnMainThread: @selector(_mainQueue)
withObject: nil
waitUntilDone: YES];
}
}
+ (id) mainQueue
{
return mainQueue;
}
- (void) addOperation: (NSOperation *)op - (void) addOperation: (NSOperation *)op
{ {
if (op == nil || NO == [op isKindOfClass: [NSOperation class]]) if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
@ -504,8 +527,6 @@ sortFunc(id o1, id o2, void *ctxt)
} }
[internal->lock lock]; [internal->lock lock];
if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op] if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]
&& NO == [op isCancelled]
&& NO == [op isExecuting]
&& NO == [op isFinished]) && NO == [op isFinished])
{ {
[op addObserver: self [op addObserver: self
@ -528,6 +549,103 @@ sortFunc(id o1, id o2, void *ctxt)
[internal->lock unlock]; [internal->lock unlock];
} }
- (void) addOperations: (NSArray *)ops
waitUntilFinished: (BOOL)shouldWait
{
NSUInteger count;
if (ops == nil || NO == [ops isKindOfClass: [NSArray class]])
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] object is not an NSArray",
NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
}
count = [ops count];
if (count > 0)
{
BOOL invalidArg = NO;
GS_BEGINITEMBUF(buf, count, id)
NSUInteger index;
NSUInteger toAdd = count;
[ops getObjects: buf];
for (index = 0; index < count; index++)
{
NSOperation *op = buf[index];
if (NO == [op isKindOfClass: [NSOperation class]])
{
invalidArg = YES;
toAdd = 0;
break;
}
if (YES == [op isFinished])
{
buf[index] = nil;
toAdd--;
}
}
if (toAdd > 0)
{
[internal->lock lock];
[self willChangeValueForKey: @"operationCount"];
[self willChangeValueForKey: @"operations"];
for (index = 0; index < count; index++)
{
NSOperation *op = buf[index];
if (op == nil)
{
continue; // Not added
}
if (NSNotFound
!= [internal->operations indexOfObjectIdenticalTo: op])
{
buf[index] = nil; // Not added
toAdd--;
continue;
}
[op addObserver: self
forKeyPath: @"isReady"
options: NSKeyValueObservingOptionNew
context: NULL];
[internal->operations addObject: op];
if (NO == [op isReady])
{
buf[index] = nil; // Not yet ready
}
}
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
for (index = 0; index < count; index++)
{
NSOperation *op = buf[index];
if (op != nil)
{
[self observeValueForKeyPath: @"isReady"
ofObject: op
change: nil
context: nil];
}
}
[internal->lock unlock];
}
GS_ENDITEMBUF()
if (YES == invalidArg)
{
[NSException raise: NSInvalidArgumentException
format: @"[%@-%@] object at index %u is not an NSOperation",
NSStringFromClass([self class]), NSStringFromSelector(_cmd),
index];
}
}
if (YES == shouldWait)
{
[self waitUntilAllOperationsAreFinished];
}
}
- (void) cancelAllOperations - (void) cancelAllOperations
{ {
NSUInteger index; NSUInteger index;
@ -636,6 +754,8 @@ sortFunc(id o1, id o2, void *ctxt)
- (void) setMaxConcurrentOperationCount: (NSInteger)cnt - (void) setMaxConcurrentOperationCount: (NSInteger)cnt
{ {
BOOL unSuspend = NO;
if (cnt < 0 if (cnt < 0
&& cnt != NSOperationQueueDefaultMaxConcurrentOperationCount) && cnt != NSOperationQueueDefaultMaxConcurrentOperationCount)
{ {
@ -646,12 +766,28 @@ sortFunc(id o1, id o2, void *ctxt)
[internal->lock lock]; [internal->lock lock];
if (cnt != internal->count) if (cnt != internal->count)
{ {
if (cnt > internal->count)
{
unSuspend = YES; // May need to add more threads.
}
[self willChangeValueForKey: @"maxConcurrentOperationCount"]; [self willChangeValueForKey: @"maxConcurrentOperationCount"];
internal->count = cnt; internal->count = cnt;
[self didChangeValueForKey: @"maxConcurrentOperationCount"]; [self didChangeValueForKey: @"maxConcurrentOperationCount"];
[self _update];
} }
[internal->lock unlock]; [internal->lock unlock];
if (YES == unSuspend)
{
[internal->cond lock];
if ([internal->waiting count] > 0)
{
[internal->cond unlockWithCondition: 1];
[self _updateThreads];
}
else
{
[internal->cond unlockWithCondition: 0];
}
}
} }
- (void) setName: (NSString*)s - (void) setName: (NSString*)s
@ -670,15 +806,33 @@ sortFunc(id o1, id o2, void *ctxt)
- (void) setSuspended: (BOOL)flag - (void) setSuspended: (BOOL)flag
{ {
BOOL unSuspend = NO;
[internal->lock lock]; [internal->lock lock];
if (flag != internal->suspended) if (flag != internal->suspended)
{ {
if (YES == flag)
{
unSuspend = YES;
}
[self willChangeValueForKey: @"suspended"]; [self willChangeValueForKey: @"suspended"];
internal->suspended = flag; internal->suspended = flag;
[self didChangeValueForKey: @"suspended"]; [self didChangeValueForKey: @"suspended"];
[self _update];
} }
[internal->lock unlock]; [internal->lock unlock];
if (YES == unSuspend)
{
[internal->cond lock];
if ([internal->waiting count] > 0)
{
[internal->cond unlockWithCondition: 1];
[self _updateThreads];
}
else
{
[internal->cond unlockWithCondition: 0];
}
}
} }
- (void) waitUntilAllOperationsAreFinished - (void) waitUntilAllOperationsAreFinished
@ -700,6 +854,16 @@ sortFunc(id o1, id o2, void *ctxt)
@implementation NSOperationQueue (Private) @implementation NSOperationQueue (Private)
+ (void) _mainQueue
{
if (mainQueue == nil)
{
mainQueue = [self new];
[[[NSThread currentThread] threadDictionary] setObject: mainQueue
forKey: threadKey];
}
}
- (void) observeValueForKeyPath: (NSString *)keyPath - (void) observeValueForKeyPath: (NSString *)keyPath
ofObject: (id)object ofObject: (id)object
change: (NSDictionary *)change change: (NSDictionary *)change
@ -726,9 +890,7 @@ sortFunc(id o1, id o2, void *ctxt)
if ([internal->waiting count] > 0) if ([internal->waiting count] > 0)
{ {
[internal->cond unlockWithCondition: 1]; [internal->cond unlockWithCondition: 1];
[internal->lock lock]; [self _updateThreads];
[self _update];
[internal->lock unlock];
} }
else else
{ {
@ -738,11 +900,19 @@ sortFunc(id o1, id o2, void *ctxt)
- (void) _thread - (void) _thread
{ {
BOOL terminate = NO;
/* Record which operation queue the current thread is running.
*/
[[[NSThread currentThread] threadDictionary] setObject: self
forKey: threadKey];
[internal->lock lock]; [internal->lock lock];
while ([internal->operations count] > 0) while (NO == terminate)
{ {
NSOperation *op; NSOperation *op;
NSUInteger index; NSUInteger index;
NSDate *timeout;
/* Unlock the queue while we are waiting for another operation /* Unlock the queue while we are waiting for another operation
* to perform. * to perform.
@ -750,48 +920,83 @@ sortFunc(id o1, id o2, void *ctxt)
[internal->lock unlock]; [internal->lock unlock];
/* Wait for an operation to become available. /* Wait for an operation to become available.
* If there's nothing to do within 5 seconds, we might terminate,
* but only if there are no operations still queued.
* The figure 5 was chosen because it seems to be what OSX 10.6 uses.
*/ */
[internal->cond lockWhenCondition: 1]; timeout = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];
[internal->waiting sortUsingFunction: sortFunc context: 0]; if (NO == [internal->cond lockWhenCondition: 1 beforeDate: timeout])
op = [[internal->waiting lastObject] retain];
[internal->waiting removeLastObject];
if ([internal->waiting count] == 0)
{ {
[internal->cond unlockWithCondition: 0]; terminate = YES;
}
[timeout release];
if (NO == terminate)
{
op = nil;
if ((unsigned)internal->threads > (unsigned)internal->count)
{
/* We know that the 'unlimited' thread count is -1 so an
* unsigned comparison is safe to tell is we need to end
* a thread.
*/
terminate = YES; // This thread is not needed
}
else if (internal->count != 0 && NO == internal->suspended
&& [internal->waiting count] > 0)
{
/* If concurrent operations are permitted an the queue
* is not suspended, and there are operations waiting,
* then we can get one.
*/
[internal->waiting sortUsingFunction: sortFunc context: 0];
op = [[internal->waiting objectAtIndex: 0] retain];
[internal->waiting removeObjectAtIndex: 0];
}
if ([internal->waiting count] == 0)
{
[internal->cond unlockWithCondition: 0];
}
else
{
[internal->cond unlockWithCondition: 1];
}
/* Restore the queue lock
*/
[internal->lock lock];
if (YES == [op isReady])
{
if (NO == [op isCancelled])
{
internal->idle--;
/* Unlock the queue while the operation is executing.
*/
[internal->lock unlock];
[op start];
[op waitUntilFinished];
/* Lock the queue again to perform cleanup etc.
*/
[internal->lock lock];
internal->idle++;
}
[self willChangeValueForKey: @"operations"];
[self willChangeValueForKey: @"operationCount"];
[internal->operations removeObjectIdenticalTo: op];
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
}
[op release];
} }
else else
{ {
[internal->cond unlockWithCondition: 1]; /* Restore the queue lock so we can clean up operations in queue.
*/
[internal->lock lock];
} }
/* Restore the queue lock so we can track the idle count.
*/
[internal->lock lock];
if (YES == [op isReady])
{
if (NO == [op isCancelled])
{
internal->idle--;
/* Unlock the queue while the operation is executing.
*/
[internal->lock unlock];
[op start];
[op waitUntilFinished];
/* Lock the queue again to perform cleanup etc.
*/
[internal->lock lock];
internal->idle++;
}
[self willChangeValueForKey: @"operations"];
[self willChangeValueForKey: @"operationCount"];
[internal->operations removeObjectIdenticalTo: op];
[self didChangeValueForKey: @"operationCount"];
[self didChangeValueForKey: @"operations"];
}
[op release];
/* And now make sure we clean up any finished operations. /* And now make sure we clean up any finished operations.
*/ */
index = [internal->operations count]; index = [internal->operations count];
@ -807,17 +1012,21 @@ sortFunc(id o1, id o2, void *ctxt)
[self didChangeValueForKey: @"operations"]; [self didChangeValueForKey: @"operations"];
} }
} }
if ([internal->operations count] > 0)
{
terminate = NO; // Still stuff to do
}
} }
internal->idle--; internal->idle--;
internal->threads--; internal->threads--;
[internal->lock unlock]; [internal->lock unlock];
} }
/* NB. This must only be called from a locked section of code! /* Just check to see if a new thread needs to be started.
* It's just to check to see if a new thread needs to be started.
*/ */
- (void) _update - (void) _updateThreads
{ {
[internal->lock lock];
if (0 == internal->idle if (0 == internal->idle
&& NO == [self isSuspended] && NO == [self isSuspended]
&& [self maxConcurrentOperationCount] != 0 && [self maxConcurrentOperationCount] != 0
@ -842,6 +1051,7 @@ sortFunc(id o1, id o2, void *ctxt)
withObject: nil]; withObject: nil];
} }
} }
[internal->lock unlock];
} }
@end @end