From 6c84f23ae1626e49660b823f5cbbb8b3e816a913 Mon Sep 17 00:00:00 2001 From: Richard Frith-MacDonald Date: Mon, 8 Feb 2010 10:34:27 +0000 Subject: [PATCH] Coimplete operation queue implementation git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@29508 72102866-910b-0410-8b05-ffd578937521 --- ChangeLog | 7 + Headers/Foundation/NSOperation.h | 19 ++ Source/NSObject.m | 2 +- Source/NSOperation.m | 320 +++++++++++++++++++++++++------ 4 files changed, 292 insertions(+), 56 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2dca2cd89..5b07fc379 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2010-02-08 Richard Frith-Macdonald + + * Headers/Foundation/NSOperation.h: + * Source/NSOperation.m: + Complete implementation of NSOperationQueue (OSX 10.6 apart from the + methods using objc2 blocks). + 2010-02-06 Richard Frith-Macdonald * Headers/Foundation/NSOperation.h: diff --git a/Headers/Foundation/NSOperation.h b/Headers/Foundation/NSOperation.h index 1118192a0..0b0e5246b 100644 --- a/Headers/Foundation/NSOperation.h +++ b/Headers/Foundation/NSOperation.h @@ -191,11 +191,30 @@ enum { @private id _internal; } +#if OS_API_VERSION(100600, GS_API_LATEST) +/** If called from within the -main method of an operation which is + * currently being executed by a queue, this returns the queue instance + * in use. + */ ++ (id) currentQueue; + +/** Returns the default queue on the main thread. + */ ++ (id) mainQueue; +#endif /** Adds an operation to the receiver. */ - (void) addOperation: (NSOperation *)op; +#if OS_API_VERSION(100600, GS_API_LATEST) +/** Adds multiple operations to the receiver and (optionally) waits for + * all the operations in the queue to finish. + */ +- (void) addOperations: (NSArray *)ops + waitUntilFinished: (BOOL)shouldWait; +#endif + /** Cancels all outstanding operations in the queue. */ - (void) cancelAllOperations; diff --git a/Source/NSObject.m b/Source/NSObject.m index e9d176693..5208ff161 100644 --- a/Source/NSObject.m +++ b/Source/NSObject.m @@ -1532,7 +1532,7 @@ objc_create_block_classes_as_subclasses_of(Class super) __attribute__((weak)); - (NSString*) description { return [NSString stringWithFormat: @"<%s: %p>", - GSClassNameFromObject(self), self]; + GSNameFromClass([self class]), self]; } /** diff --git a/Source/NSOperation.m b/Source/NSOperation.m index d1c44d0c8..6be4b0673 100644 --- a/Source/NSOperation.m +++ b/Source/NSOperation.m @@ -35,6 +35,7 @@ #import "Foundation/NSLock.h" #import "Foundation/NSKeyValueObserving.h" #import "Foundation/NSThread.h" +#import "GSPrivate.h" #define GSInternal NSOperationInternal #include "GSInternal.h" @@ -314,18 +315,16 @@ static NSArray *empty = nil; - (void) setQueuePriority: (NSOperationQueuePriority)pri { - if (pri < NSOperationQueuePriorityVeryLow) + if (pri <= NSOperationQueuePriorityVeryLow) pri = NSOperationQueuePriorityVeryLow; - else if (pri < NSOperationQueuePriorityLow) + else if (pri <= NSOperationQueuePriorityLow) pri = NSOperationQueuePriorityLow; - else if (pri < NSOperationQueuePriorityNormal) + else if (pri < NSOperationQueuePriorityHigh) pri = NSOperationQueuePriorityNormal; - else if (pri > NSOperationQueuePriorityVeryHigh) - pri = NSOperationQueuePriorityVeryHigh; - else if (pri > NSOperationQueuePriorityHigh) + else if (pri < NSOperationQueuePriorityVeryHigh) pri = NSOperationQueuePriorityHigh; - else if (pri > NSOperationQueuePriorityNormal) - pri = NSOperationQueuePriorityNormal; + else + pri = NSOperationQueuePriorityVeryHigh; if (pri != internal->priority) { @@ -471,12 +470,13 @@ GS_END_INTERNAL(NSOperationQueue) @interface NSOperationQueue (Private) ++ (void) _mainQueue; - (void) observeValueForKeyPath: (NSString *)keyPath ofObject: (id)object change: (NSDictionary *)change context: (void *)context; - (void) _thread; -- (void) _update; +- (void) _updateThreads; @end static NSInteger maxThreads = 200; // FIXME ... how many really? @@ -492,8 +492,31 @@ sortFunc(id o1, id o2, void *ctxt) return NSOrderedSame; } +static NSString *threadKey = @"NSOperationQueue"; +static NSOperationQueue *mainQueue = nil; + @implementation NSOperationQueue ++ (id) currentQueue +{ + return [[[NSThread currentThread] threadDictionary] objectForKey: threadKey]; +} + ++ (void) initialize +{ + if (mainQueue == nil) + { + [self performSelectorOnMainThread: @selector(_mainQueue) + withObject: nil + waitUntilDone: YES]; + } +} + ++ (id) mainQueue +{ + return mainQueue; +} + - (void) addOperation: (NSOperation *)op { if (op == nil || NO == [op isKindOfClass: [NSOperation class]]) @@ -504,8 +527,6 @@ sortFunc(id o1, id o2, void *ctxt) } [internal->lock lock]; if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op] - && NO == [op isCancelled] - && NO == [op isExecuting] && NO == [op isFinished]) { [op addObserver: self @@ -528,6 +549,103 @@ sortFunc(id o1, id o2, void *ctxt) [internal->lock unlock]; } +- (void) addOperations: (NSArray *)ops + waitUntilFinished: (BOOL)shouldWait +{ + NSUInteger count; + + if (ops == nil || NO == [ops isKindOfClass: [NSArray class]]) + { + [NSException raise: NSInvalidArgumentException + format: @"[%@-%@] object is not an NSArray", + NSStringFromClass([self class]), NSStringFromSelector(_cmd)]; + } + count = [ops count]; + if (count > 0) + { + BOOL invalidArg = NO; + GS_BEGINITEMBUF(buf, count, id) + NSUInteger index; + NSUInteger toAdd = count; + + [ops getObjects: buf]; + for (index = 0; index < count; index++) + { + NSOperation *op = buf[index]; + + if (NO == [op isKindOfClass: [NSOperation class]]) + { + invalidArg = YES; + toAdd = 0; + break; + } + if (YES == [op isFinished]) + { + buf[index] = nil; + toAdd--; + } + } + if (toAdd > 0) + { + [internal->lock lock]; + [self willChangeValueForKey: @"operationCount"]; + [self willChangeValueForKey: @"operations"]; + for (index = 0; index < count; index++) + { + NSOperation *op = buf[index]; + + if (op == nil) + { + continue; // Not added + } + if (NSNotFound + != [internal->operations indexOfObjectIdenticalTo: op]) + { + buf[index] = nil; // Not added + toAdd--; + continue; + } + [op addObserver: self + forKeyPath: @"isReady" + options: NSKeyValueObservingOptionNew + context: NULL]; + [internal->operations addObject: op]; + if (NO == [op isReady]) + { + buf[index] = nil; // Not yet ready + } + } + [self didChangeValueForKey: @"operationCount"]; + [self didChangeValueForKey: @"operations"]; + for (index = 0; index < count; index++) + { + NSOperation *op = buf[index]; + + if (op != nil) + { + [self observeValueForKeyPath: @"isReady" + ofObject: op + change: nil + context: nil]; + } + } + [internal->lock unlock]; + } + GS_ENDITEMBUF() + if (YES == invalidArg) + { + [NSException raise: NSInvalidArgumentException + format: @"[%@-%@] object at index %u is not an NSOperation", + NSStringFromClass([self class]), NSStringFromSelector(_cmd), + index]; + } + } + if (YES == shouldWait) + { + [self waitUntilAllOperationsAreFinished]; + } +} + - (void) cancelAllOperations { NSUInteger index; @@ -636,6 +754,8 @@ sortFunc(id o1, id o2, void *ctxt) - (void) setMaxConcurrentOperationCount: (NSInteger)cnt { + BOOL unSuspend = NO; + if (cnt < 0 && cnt != NSOperationQueueDefaultMaxConcurrentOperationCount) { @@ -646,12 +766,28 @@ sortFunc(id o1, id o2, void *ctxt) [internal->lock lock]; if (cnt != internal->count) { + if (cnt > internal->count) + { + unSuspend = YES; // May need to add more threads. + } [self willChangeValueForKey: @"maxConcurrentOperationCount"]; internal->count = cnt; [self didChangeValueForKey: @"maxConcurrentOperationCount"]; - [self _update]; } [internal->lock unlock]; + if (YES == unSuspend) + { + [internal->cond lock]; + if ([internal->waiting count] > 0) + { + [internal->cond unlockWithCondition: 1]; + [self _updateThreads]; + } + else + { + [internal->cond unlockWithCondition: 0]; + } + } } - (void) setName: (NSString*)s @@ -670,15 +806,33 @@ sortFunc(id o1, id o2, void *ctxt) - (void) setSuspended: (BOOL)flag { + BOOL unSuspend = NO; + [internal->lock lock]; if (flag != internal->suspended) { + if (YES == flag) + { + unSuspend = YES; + } [self willChangeValueForKey: @"suspended"]; internal->suspended = flag; [self didChangeValueForKey: @"suspended"]; - [self _update]; } [internal->lock unlock]; + if (YES == unSuspend) + { + [internal->cond lock]; + if ([internal->waiting count] > 0) + { + [internal->cond unlockWithCondition: 1]; + [self _updateThreads]; + } + else + { + [internal->cond unlockWithCondition: 0]; + } + } } - (void) waitUntilAllOperationsAreFinished @@ -700,6 +854,16 @@ sortFunc(id o1, id o2, void *ctxt) @implementation NSOperationQueue (Private) ++ (void) _mainQueue +{ + if (mainQueue == nil) + { + mainQueue = [self new]; + [[[NSThread currentThread] threadDictionary] setObject: mainQueue + forKey: threadKey]; + } +} + - (void) observeValueForKeyPath: (NSString *)keyPath ofObject: (id)object change: (NSDictionary *)change @@ -726,9 +890,7 @@ sortFunc(id o1, id o2, void *ctxt) if ([internal->waiting count] > 0) { [internal->cond unlockWithCondition: 1]; - [internal->lock lock]; - [self _update]; - [internal->lock unlock]; + [self _updateThreads]; } else { @@ -738,11 +900,19 @@ sortFunc(id o1, id o2, void *ctxt) - (void) _thread { + BOOL terminate = NO; + + /* Record which operation queue the current thread is running. + */ + [[[NSThread currentThread] threadDictionary] setObject: self + forKey: threadKey]; + [internal->lock lock]; - while ([internal->operations count] > 0) + while (NO == terminate) { NSOperation *op; NSUInteger index; + NSDate *timeout; /* Unlock the queue while we are waiting for another operation * to perform. @@ -750,48 +920,83 @@ sortFunc(id o1, id o2, void *ctxt) [internal->lock unlock]; /* Wait for an operation to become available. + * If there's nothing to do within 5 seconds, we might terminate, + * but only if there are no operations still queued. + * The figure 5 was chosen because it seems to be what OSX 10.6 uses. */ - [internal->cond lockWhenCondition: 1]; - [internal->waiting sortUsingFunction: sortFunc context: 0]; - op = [[internal->waiting lastObject] retain]; - [internal->waiting removeLastObject]; - if ([internal->waiting count] == 0) + timeout = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0]; + if (NO == [internal->cond lockWhenCondition: 1 beforeDate: timeout]) { - [internal->cond unlockWithCondition: 0]; + terminate = YES; + } + [timeout release]; + if (NO == terminate) + { + op = nil; + + if ((unsigned)internal->threads > (unsigned)internal->count) + { + /* We know that the 'unlimited' thread count is -1 so an + * unsigned comparison is safe to tell is we need to end + * a thread. + */ + terminate = YES; // This thread is not needed + } + else if (internal->count != 0 && NO == internal->suspended + && [internal->waiting count] > 0) + { + /* If concurrent operations are permitted an the queue + * is not suspended, and there are operations waiting, + * then we can get one. + */ + [internal->waiting sortUsingFunction: sortFunc context: 0]; + op = [[internal->waiting objectAtIndex: 0] retain]; + [internal->waiting removeObjectAtIndex: 0]; + } + if ([internal->waiting count] == 0) + { + [internal->cond unlockWithCondition: 0]; + } + else + { + [internal->cond unlockWithCondition: 1]; + } + + /* Restore the queue lock + */ + [internal->lock lock]; + if (YES == [op isReady]) + { + if (NO == [op isCancelled]) + { + internal->idle--; + + /* Unlock the queue while the operation is executing. + */ + [internal->lock unlock]; + [op start]; + [op waitUntilFinished]; + + /* Lock the queue again to perform cleanup etc. + */ + [internal->lock lock]; + internal->idle++; + } + [self willChangeValueForKey: @"operations"]; + [self willChangeValueForKey: @"operationCount"]; + [internal->operations removeObjectIdenticalTo: op]; + [self didChangeValueForKey: @"operationCount"]; + [self didChangeValueForKey: @"operations"]; + } + [op release]; } else { - [internal->cond unlockWithCondition: 1]; + /* Restore the queue lock so we can clean up operations in queue. + */ + [internal->lock lock]; } - /* Restore the queue lock so we can track the idle count. - */ - [internal->lock lock]; - if (YES == [op isReady]) - { - if (NO == [op isCancelled]) - { - internal->idle--; - - /* Unlock the queue while the operation is executing. - */ - [internal->lock unlock]; - [op start]; - [op waitUntilFinished]; - - /* Lock the queue again to perform cleanup etc. - */ - [internal->lock lock]; - internal->idle++; - } - [self willChangeValueForKey: @"operations"]; - [self willChangeValueForKey: @"operationCount"]; - [internal->operations removeObjectIdenticalTo: op]; - [self didChangeValueForKey: @"operationCount"]; - [self didChangeValueForKey: @"operations"]; - } - [op release]; - /* And now make sure we clean up any finished operations. */ index = [internal->operations count]; @@ -807,17 +1012,21 @@ sortFunc(id o1, id o2, void *ctxt) [self didChangeValueForKey: @"operations"]; } } + if ([internal->operations count] > 0) + { + terminate = NO; // Still stuff to do + } } internal->idle--; internal->threads--; [internal->lock unlock]; } -/* NB. This must only be called from a locked section of code! - * It's just to check to see if a new thread needs to be started. +/* Just check to see if a new thread needs to be started. */ -- (void) _update +- (void) _updateThreads { + [internal->lock lock]; if (0 == internal->idle && NO == [self isSuspended] && [self maxConcurrentOperationCount] != 0 @@ -842,6 +1051,7 @@ sortFunc(id o1, id o2, void *ctxt) withObject: nil]; } } + [internal->lock unlock]; } @end