From ef7a025c3fd1a2beb78f795a792147549d2cc6b2 Mon Sep 17 00:00:00 2001 From: rfm Date: Sat, 12 May 2007 17:11:35 +0000 Subject: [PATCH] stream and runloop fixups git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@25147 72102866-910b-0410-8b05-ffd578937521 --- ChangeLog | 13 +++ Source/GSStream.h | 30 +++--- Source/GSStream.m | 189 ++++++++++++++++++++++------------- Source/unix/GSRunLoopCtxt.m | 66 +++++------- Source/unix/NSStream.m | 41 ++------ Source/win32/GSRunLoopCtxt.m | 6 +- Source/win32/NSStreamWin32.m | 48 ++------- 7 files changed, 191 insertions(+), 202 deletions(-) diff --git a/ChangeLog b/ChangeLog index a55ad849e..beecbb74f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +2007-05-12 Richard Frith-Macdonald + + * Source/GSStream.h: New runloop management + * Source/GSStream.m: Support scheduling in multiple runloops to match + documented MacOS-X behavior. Trigger instant event on error or eof. + * Source/unix/NSStream.m: Don't send error and eof events asap, defer + until the runloop runs as this is how MacOS-X behaves. + * Source/win32/NSStreamWin32.m: ditto. + * Source/unix/GSRunLoopCtxt.m: Fix failure to trigger immediate events + when there are no non-immediate input sources in th loop. Fix busy + poll when there are no non-immediate sources in the loop. + * Source/win32/GSRunLoopCtxt.m: ditto + 2007-05-12 Richard Frith-Macdonald * Source/NSURLProtocol.m: Fix multiple release of headers. diff --git a/Source/GSStream.h b/Source/GSStream.h index 34792c99a..3a467298d 100644 --- a/Source/GSStream.h +++ b/Source/GSStream.h @@ -53,6 +53,7 @@ #include #include +#include /** * Convenience methods used to add streams to the run loop. @@ -71,8 +72,7 @@ BOOL _delegateValid;/* whether the delegate responds*/\ NSError *_lastError; /* last error occured */\ NSStreamStatus _currentStatus;/* current status */\ - NSMutableArray *_modes; /* currently scheduled modes. */\ - NSRunLoop *_runloop; /* currently scheduled loop. */\ + NSMapTable *_loops; /* Run loops and their modes. */\ void *_loopID; /* file descriptor etc. */\ int _events; /* Signalled events. */\ } @@ -103,6 +103,11 @@ IVARS */ - (void*) _loopID; +/** + * Place the stream in all the scheduled runloops. + */ +- (void) _schedule; + /** * send an event to delegate */ @@ -128,6 +133,12 @@ IVARS * say whether there is unhandled data for the stream. */ - (BOOL) _unhandledData; + +/** + * Remove the stream from all the scheduled runloops. + */ +- (void) _unschedule; + @end @interface GSInputStream : NSInputStream @@ -151,11 +162,6 @@ IVARS NSData *_data; unsigned long _pointer; } - -/** - * this is the bridge method for asynchronized operation. Do not call. - */ -- (void) _dispatch; @end /** @@ -168,11 +174,6 @@ IVARS unsigned _capacity; unsigned long _pointer; } - -/** - * this is the bridge method for asynchronized operation. Do not call. - */ -- (void) _dispatch; @end /** @@ -184,11 +185,6 @@ IVARS NSMutableData *_data; unsigned long _pointer; } - -/** - * this is the bridge method for asynchronized operation. Do not call. - */ -- (void) _dispatch; @end #endif diff --git a/Source/GSStream.m b/Source/GSStream.m index 2bc55b6d6..6e5b3c17f 100644 --- a/Source/GSStream.m +++ b/Source/GSStream.m @@ -150,15 +150,7 @@ static RunLoopEventType typeForStream(NSStream *aStream) { NSDebugMLog(@"Attempt to close already closed stream %@", self); } - if (_runloop) - { - unsigned i = [_modes count]; - - while (i-- > 0) - { - [_runloop removeStream: self mode: [_modes objectAtIndex: i]]; - } - } + [self _unschedule]; [self _setStatus: NSStreamStatusClosed]; /* We don't want to send any events the the delegate after the * stream has been closed. @@ -173,8 +165,11 @@ static RunLoopEventType typeForStream(NSStream *aStream) { [self close]; } - DESTROY(_runloop); - DESTROY(_modes); + if (_loops != 0) + { + NSFreeMapTable(_loops); + _loops = 0; + } DESTROY(_properties); DESTROY(_lastError); [super dealloc]; @@ -192,7 +187,8 @@ static RunLoopEventType typeForStream(NSStream *aStream) _delegate = self; _properties = nil; _lastError = nil; - _modes = [NSMutableArray new]; + _loops = NSCreateMapTable(NSObjectMapKeyCallBacks, + NSObjectMapValueCallBacks, 1); _currentStatus = NSStreamStatusNotOpen; _loopID = (void*)self; } @@ -206,15 +202,7 @@ static RunLoopEventType typeForStream(NSStream *aStream) NSDebugMLog(@"Attempt to re-open stream %@", self); } [self _setStatus: NSStreamStatusOpen]; - if (_runloop) - { - unsigned i = [_modes count]; - - while (i-- > 0) - { - [_runloop addStream: self mode: [_modes objectAtIndex: i]]; - } - } + [self _schedule]; [self _sendEvent: NSStreamEventOpenCompleted]; } @@ -228,20 +216,24 @@ static RunLoopEventType typeForStream(NSStream *aStream) extra: (void*)extra forMode: (NSString*)mode { +NSLog(@"Event on %p", self); [self _dispatch]; } - (void) removeFromRunLoop: (NSRunLoop *)aRunLoop forMode: (NSString *)mode { - if (_runloop == aRunLoop) + if (aRunLoop != nil && mode != nil) { - if ([_modes containsObject: mode]) + NSMutableArray *modes; + + modes = (NSMutableArray*)NSMapGet(_loops, (void*)aRunLoop); + if ([modes containsObject: mode]) { - [_runloop removeStream: self mode: mode]; - [_modes removeObject: mode]; - if ([_modes count] == 0) + [aRunLoop removeStream: self mode: mode]; + [modes removeObject: mode]; + if ([modes count] == 0) { - DESTROY(_runloop); + NSMapRemove(_loops, (void*)aRunLoop); } } } @@ -249,21 +241,30 @@ static RunLoopEventType typeForStream(NSStream *aStream) - (void) scheduleInRunLoop: (NSRunLoop *)aRunLoop forMode: (NSString *)mode { - NSAssert(!_runloop || _runloop == aRunLoop, - @"Attempt to schedule in more than one runloop."); - ASSIGN(_runloop, aRunLoop); - if ([_modes containsObject: mode] == NO) + if (aRunLoop != nil && mode != nil) { - mode = [mode copy]; - [_modes addObject: mode]; - RELEASE(mode); - /* We only add open streams to the runloop .. subclasses may add - * streams when they are in the process of opening if they need - * to do so. - */ - if ([self _isOpened]) + NSMutableArray *modes; + + modes = (NSMutableArray*)NSMapGet(_loops, (void*)aRunLoop); + if (modes == nil) { - [_runloop addStream: self mode: mode]; + modes = [[NSMutableArray alloc] initWithCapacity: 1]; + NSMapInsert(_loops, (void*)aRunLoop, (void*)modes); + RELEASE(modes); + } + if ([modes containsObject: mode] == NO) + { + mode = [mode copy]; + [modes addObject: mode]; + RELEASE(mode); + /* We only add open streams to the runloop .. subclasses may add + * streams when they are in the process of opening if they need + * to do so. + */ + if ([self _isOpened]) + { + [aRunLoop addStream: self mode: mode]; + } } } } @@ -332,6 +333,10 @@ static RunLoopEventType typeForStream(NSStream *aStream) { } +- (void) _schedule +{ +} + - (void) _sendEvent: (NSStreamEvent)event { } @@ -348,6 +353,11 @@ static RunLoopEventType typeForStream(NSStream *aStream) { return NO; } + +- (void) _unschedule +{ +} + @end @implementation GSStream (Private) @@ -379,6 +389,25 @@ static RunLoopEventType typeForStream(NSStream *aStream) _currentStatus = NSStreamStatusError; } +- (void) _schedule +{ + NSMapEnumerator enumerator; + NSRunLoop *k; + NSMutableArray *v; + + enumerator = NSEnumerateMapTable(_loops); + while (NSNextMapEnumeratorPair(&enumerator, (void **)(&k), (void**)&v)) + { + unsigned i = [v count]; + + while (i-- > 0) + { + [k addStream: self mode: [v objectAtIndex: i]]; + } + } + NSEndMapTableEnumeration(&enumerator); +} + - (void) _sendEvent: (NSStreamEvent)event { if (event == NSStreamEventNone) @@ -490,6 +519,25 @@ static RunLoopEventType typeForStream(NSStream *aStream) return NO; } +- (void) _unschedule +{ + NSMapEnumerator enumerator; + NSRunLoop *k; + NSMutableArray *v; + + enumerator = NSEnumerateMapTable(_loops); + while (NSNextMapEnumeratorPair(&enumerator, (void **)(&k), (void**)&v)) + { + unsigned i = [v count]; + + while (i-- > 0) + { + [k removeStream: self mode: [v objectAtIndex: i]]; + } + } + NSEndMapTableEnumeration(&enumerator); +} + - (BOOL) runLoopShouldBlock: (BOOL*)trigger { if (_events @@ -501,23 +549,43 @@ static RunLoopEventType typeForStream(NSStream *aStream) *trigger = NO; return NO; } - if (_currentStatus == NSStreamStatusError && - (_events & NSStreamEventErrorOccurred) == NSStreamEventErrorOccurred) + if (_currentStatus == NSStreamStatusError) { - /* If an error has occurred (and been handled), - * we should not watch for any events at all. - */ - *trigger = NO; - return NO; + if ((_events & NSStreamEventErrorOccurred) == 0) + { + /* An error has occurred but not been handled, + * so we should trigger an error event at once. + */ + *trigger = YES; + return NO; + } + else + { + /* An error has occurred (and been handled), + * so we should not watch for any events at all. + */ + *trigger = NO; + return NO; + } } - if (_currentStatus == NSStreamStatusAtEnd && - (_events & NSStreamEventEndEncountered) == NSStreamEventEndEncountered) + if (_currentStatus == NSStreamStatusAtEnd) { - /* If an error has occurred (and been handled), - * we should not watch for any events at all. - */ - *trigger = NO; - return NO; + if ((_events & NSStreamEventEndEncountered) == 0) + { + /* An end of stream has occurred but not been handled, + * so we should trigger an end of stream event at once. + */ + *trigger = YES; + return NO; + } + else + { + /* An end of stream has occurred (and been handled), + * so we should not watch for any events at all. + */ + *trigger = NO; + return NO; + } } if (_loopID == (void*)self) @@ -585,17 +653,6 @@ static RunLoopEventType typeForStream(NSStream *aStream) { return YES; } - if (_currentStatus == NSStreamStatusAtEnd) - { - if ((_events & NSStreamEventEndEncountered) == 0) - { - /* We have not sent the appropriate event yet, so the - * client must not have issued a write:maxLength: - * (which is the point at which we should send). - */ - return YES; - } - } return NO; } @@ -675,7 +732,6 @@ static RunLoopEventType typeForStream(NSStream *aStream) else { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } return copySize; } @@ -757,7 +813,6 @@ static RunLoopEventType typeForStream(NSStream *aStream) if (len == 0) { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; return 0; } } diff --git a/Source/unix/GSRunLoopCtxt.m b/Source/unix/GSRunLoopCtxt.m index 1c35cdd3f..280ab89ce 100644 --- a/Source/unix/GSRunLoopCtxt.m +++ b/Source/unix/GSRunLoopCtxt.m @@ -382,14 +382,7 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt) fprintf(stderr, "\n"); } #endif - if (pollfds_count > 0) - { - poll_return = poll (pollfds, pollfds_count, milliseconds); - } - else - { - poll_return = 0; - } + poll_return = poll (pollfds, pollfds_count, milliseconds); #if 0 { unsigned int i; @@ -429,33 +422,33 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt) * Trigger any watchers which are set up to for every runloop wait. */ count = GSIArrayCount(_trigger); - while (completed == NO && count-- > 0) + while (count-- > 0) { GSRunLoopWatcher *watcher; watcher = (GSRunLoopWatcher*)GSIArrayItemAtIndex(_trigger, count).obj; - if (watcher->_invalidated == NO) - { - i = [contexts count]; - while (i-- > 0) - { - GSRunLoopCtxt *c = [contexts objectAtIndex: i]; + if (watcher->_invalidated == NO) + { + i = [contexts count]; + while (i-- > 0) + { + GSRunLoopCtxt *c = [contexts objectAtIndex: i]; - if (c != self) - { - [c endEvent: (void*)watcher for: watcher]; - } - } - /* - * The watcher is still valid - so call its - * receivers event handling method. - */ - [watcher->receiver receivedEvent: watcher->data - type: watcher->type - extra: watcher->data - forMode: mode]; - } - GSPrivateNotifyASAP(); + if (c != self) + { + [c endEvent: (void*)watcher for: watcher]; + } + } + /* + * The watcher is still valid - so call its + * receivers event handling method. + */ + [watcher->receiver receivedEvent: watcher->data + type: watcher->type + extra: watcher->data + forMode: mode]; + } + GSPrivateNotifyASAP(); } /* @@ -771,15 +764,8 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt) // NSDebugMLLog(@"NSRunLoop", @"select timeout %d,%d", timeout.tv_sec, timeout.tv_usec); - if (fdEnd >= 0) - { - select_return = select (fdEnd, &read_fds, &write_fds, - &exception_fds, select_timeout); - } - else - { - select_return = 0; - } + select_return = select (fdEnd, &read_fds, &write_fds, + &exception_fds, select_timeout); NSDebugMLLog(@"NSRunLoop", @"select returned %d", select_return); @@ -810,7 +796,7 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt) * Trigger any watchers which are set up to for every runloop wait. */ count = GSIArrayCount(_trigger); - while (completed == NO && count-- > 0) + while (count-- > 0) { GSRunLoopWatcher *watcher; diff --git a/Source/unix/NSStream.m b/Source/unix/NSStream.m index 70922f51a..f1b94ba11 100644 --- a/Source/unix/NSStream.m +++ b/Source/unix/NSStream.m @@ -424,7 +424,6 @@ static void setNonblocking(int fd) else if (readLen == 0) { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } return readLen; } @@ -462,7 +461,6 @@ static void setNonblocking(int fd) if (fd < 0) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } _loopID = (void*)(intptr_t)fd; @@ -554,31 +552,25 @@ static void setNonblocking(int fd) { int result; - if (_runloop) + if (NSCountMapTable(_loops) > 0) { setNonblocking((intptr_t)_loopID); } result = connect((intptr_t)_loopID, [self peerAddr], [self sockLen]); if (result < 0) { - if (errno == EINPROGRESS && _runloop != nil) + if (errno == EINPROGRESS && NSCountMapTable(_loops) > 0) { - unsigned i = [_modes count]; - /* * Need to set the status first, so that the run loop can tell * it needs to add the stream as waiting on writable, as an * indication of opened */ [self _setStatus: NSStreamStatusOpening]; - while (i-- > 0) - { - [_runloop addStream: self mode: [_modes objectAtIndex: i]]; - } + [self _schedule]; return; } [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } } @@ -630,7 +622,6 @@ static void setNonblocking(int fd) else if (readLen == 0) { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } return readLen; } @@ -656,13 +647,9 @@ static void setNonblocking(int fd) int error; int result; socklen_t len = sizeof(error); - unsigned i = [_modes count]; AUTORELEASE(RETAIN(self)); - while (i-- > 0) - { - [_runloop removeStream: self mode: [_modes objectAtIndex: i]]; - } + [self _unschedule]; result = getsockopt((intptr_t)_loopID, SOL_SOCKET, SO_ERROR, &error, &len); @@ -898,7 +885,6 @@ static void setNonblocking(int fd) if (fd < 0) { // make an error [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } _loopID = (void*)(intptr_t)fd; @@ -1036,31 +1022,25 @@ static void setNonblocking(int fd) { int result; - if (_runloop) + if (NSCountMapTable(_loops) > 0) { setNonblocking((intptr_t)_loopID); } result = connect((intptr_t)_loopID, [self peerAddr], [self sockLen]); if (result < 0) { - if (errno == EINPROGRESS && _runloop != nil) + if (errno == EINPROGRESS && NSCountMapTable(_loops) > 0) { - unsigned i = [_modes count]; - /* * Need to set the status first, so that the run loop can tell * it needs to add the stream as waiting on writable, as an * indication of opened */ [self _setStatus: NSStreamStatusOpening]; - while (i-- > 0) - { - [_runloop addStream: self mode: [_modes objectAtIndex: i]]; - } + [self _unschedule]; return; } [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } } @@ -1093,13 +1073,9 @@ static void setNonblocking(int fd) int error; socklen_t len = sizeof(error); int result; - unsigned i = [_modes count]; AUTORELEASE(RETAIN(self)); - while (i-- > 0) - { - [_runloop removeStream: self mode: [_modes objectAtIndex: i]]; - } + [self _schedule]; result = getsockopt((intptr_t)_loopID, SOL_SOCKET, SO_ERROR, &error, &len); if (result >= 0 && !error) @@ -1619,7 +1595,6 @@ static void setNonblocking(int fd) if (bindReturn < 0 || listenReturn < 0) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } setNonblocking((intptr_t)_loopID); diff --git a/Source/win32/GSRunLoopCtxt.m b/Source/win32/GSRunLoopCtxt.m index 0d268c3a7..053a04128 100644 --- a/Source/win32/GSRunLoopCtxt.m +++ b/Source/win32/GSRunLoopCtxt.m @@ -424,7 +424,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks = } else { - SleepEx(wait_timeout, TRUE); + SleepEx(wait_timeout, TRUE); wait_return = WAIT_OBJECT_0; } NSDebugMLLog(@"NSRunLoop", @"wait returned %d", wait_return); @@ -467,7 +467,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks = */ count = GSIArrayCount(_trigger); completed = NO; - while (completed == NO && count-- > 0) + while (count-- > 0) { GSRunLoopWatcher *watcher; @@ -503,7 +503,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks = return NO; } - // if there arent events + // if there aren't events if (wait_return == WAIT_TIMEOUT) { completed = YES; diff --git a/Source/win32/NSStreamWin32.m b/Source/win32/NSStreamWin32.m index 00882a13c..712369fcf 100644 --- a/Source/win32/NSStreamWin32.m +++ b/Source/win32/NSStreamWin32.m @@ -368,7 +368,6 @@ static void setNonblocking(SOCKET fd) if (h == INVALID_HANDLE_VALUE) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } [self _setLoopID: (void*)h]; @@ -418,7 +417,6 @@ static void setNonblocking(SOCKET fd) else if (readLen == 0) { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } return (int)readLen; } @@ -635,7 +633,6 @@ static void setNonblocking(SOCKET fd) { if (myStatus == NSStreamStatusError) { - [self _sendEvent: NSStreamEventErrorOccurred]; return -1; // Waiting for read. } if (myStatus == NSStreamStatusOpen) @@ -645,7 +642,6 @@ static void setNonblocking(SOCKET fd) * so we must be at EOF. */ [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } return 0; } @@ -808,19 +804,13 @@ static void setNonblocking(SOCKET fd) && WSAGetLastError() != WSAEWOULDBLOCK) {// make an error [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } // waiting on writable, as an indication of opened - if (_runloop) + if (NSCountMapTable(_loops) > 0) { - unsigned i = [_modes count]; - WSAEventSelect(_sock, _loopID, FD_ALL_EVENTS); - while (i-- > 0) - { - [_runloop addStream: self mode: [_modes objectAtIndex: i]]; - } + [self _schedule]; } [self _setStatus: NSStreamStatusOpening]; return; @@ -902,7 +892,6 @@ static void setNonblocking(SOCKET fd) else if (readLen == 0) { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } else { @@ -950,12 +939,7 @@ static void setNonblocking(SOCKET fd) if ([self streamStatus] == NSStreamStatusOpening) { - unsigned i = [_modes count]; - - while (i-- > 0) - { - [_runloop removeStream: self mode: [_modes objectAtIndex: i]]; - } + [self _unschedule]; if (error == 0) { unsigned len = sizeof(error); @@ -1029,7 +1013,6 @@ static void setNonblocking(SOCKET fd) if ([self _isOpened]) { [self _setStatus: NSStreamStatusAtEnd]; - [self _sendEvent: NSStreamEventEndEncountered]; } } } @@ -1123,7 +1106,6 @@ static void setNonblocking(SOCKET fd) if (h == INVALID_HANDLE_VALUE) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } else if (_shouldAppend == NO) @@ -1131,7 +1113,6 @@ static void setNonblocking(SOCKET fd) if (SetEndOfFile(h) == 0) // Truncate to current file pointer (0) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; CloseHandle(h); return; } @@ -1589,20 +1570,13 @@ static void setNonblocking(SOCKET fd) && WSAGetLastError() != WSAEWOULDBLOCK) {// make an error [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } // waiting on writable, as an indication of opened - if (_runloop) + if (NSCountMapTable(_loops) > 0) { - unsigned i = [_modes count]; - WSAEventSelect(_sock, _loopID, FD_ALL_EVENTS); - - while (i-- > 0) - { - [_runloop addStream: self mode: [_modes objectAtIndex: i]]; - } + [self _schedule]; } [self _setStatus: NSStreamStatusOpening]; return; @@ -1686,13 +1660,7 @@ static void setNonblocking(SOCKET fd) if ([self streamStatus] == NSStreamStatusOpening) { - unsigned i = [_modes count]; - - while (i-- > 0) - { - [_runloop removeStream: self mode: [_modes objectAtIndex: i]]; - } - + [self _unschedule]; if (error == 0) { unsigned len = sizeof(error); @@ -2294,7 +2262,6 @@ done: if (bindReturn < 0 || listenReturn < 0) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } setNonblocking(_sock); @@ -2508,7 +2475,6 @@ done: if (handle == INVALID_HANDLE_VALUE) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } @@ -2531,7 +2497,6 @@ done: else if (errno != ERROR_IO_PENDING) { [self _recordError]; - [self _sendEvent: NSStreamEventErrorOccurred]; return; } } @@ -2543,7 +2508,6 @@ done: if (alreadyConnected == YES) { [self _setStatus: NSStreamStatusOpen]; - [self _sendEvent: NSStreamEventHasBytesAvailable]; } }