stream and runloop fixups

git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@25147 72102866-910b-0410-8b05-ffd578937521
This commit is contained in:
Richard Frith-MacDonald 2007-05-12 17:11:35 +00:00
parent 262a167f83
commit fc815c4e5a
7 changed files with 191 additions and 202 deletions

View file

@ -1,3 +1,16 @@
2007-05-12 Richard Frith-Macdonald <rfm@gnu.org>
* Source/GSStream.h: New runloop management
* Source/GSStream.m: Support scheduling in multiple runloops to match
documented MacOS-X behavior. Trigger instant event on error or eof.
* Source/unix/NSStream.m: Don't send error and eof events asap, defer
until the runloop runs as this is how MacOS-X behaves.
* Source/win32/NSStreamWin32.m: ditto.
* Source/unix/GSRunLoopCtxt.m: Fix failure to trigger immediate events
when there are no non-immediate input sources in th loop. Fix busy
poll when there are no non-immediate sources in the loop.
* Source/win32/GSRunLoopCtxt.m: ditto
2007-05-12 Richard Frith-Macdonald <rfm@gnu.org>
* Source/NSURLProtocol.m: Fix multiple release of headers.

View file

@ -53,6 +53,7 @@
#include <Foundation/NSStream.h>
#include <Foundation/NSRunLoop.h>
#include <Foundation/NSMapTable.h>
/**
* Convenience methods used to add streams to the run loop.
@ -71,8 +72,7 @@
BOOL _delegateValid;/* whether the delegate responds*/\
NSError *_lastError; /* last error occured */\
NSStreamStatus _currentStatus;/* current status */\
NSMutableArray *_modes; /* currently scheduled modes. */\
NSRunLoop *_runloop; /* currently scheduled loop. */\
NSMapTable *_loops; /* Run loops and their modes. */\
void *_loopID; /* file descriptor etc. */\
int _events; /* Signalled events. */\
}
@ -103,6 +103,11 @@ IVARS
*/
- (void*) _loopID;
/**
* Place the stream in all the scheduled runloops.
*/
- (void) _schedule;
/**
* send an event to delegate
*/
@ -128,6 +133,12 @@ IVARS
* say whether there is unhandled data for the stream.
*/
- (BOOL) _unhandledData;
/**
* Remove the stream from all the scheduled runloops.
*/
- (void) _unschedule;
@end
@interface GSInputStream : NSInputStream
@ -151,11 +162,6 @@ IVARS
NSData *_data;
unsigned long _pointer;
}
/**
* this is the bridge method for asynchronized operation. Do not call.
*/
- (void) _dispatch;
@end
/**
@ -168,11 +174,6 @@ IVARS
unsigned _capacity;
unsigned long _pointer;
}
/**
* this is the bridge method for asynchronized operation. Do not call.
*/
- (void) _dispatch;
@end
/**
@ -184,11 +185,6 @@ IVARS
NSMutableData *_data;
unsigned long _pointer;
}
/**
* this is the bridge method for asynchronized operation. Do not call.
*/
- (void) _dispatch;
@end
#endif

View file

@ -150,15 +150,7 @@ static RunLoopEventType typeForStream(NSStream *aStream)
{
NSDebugMLog(@"Attempt to close already closed stream %@", self);
}
if (_runloop)
{
unsigned i = [_modes count];
while (i-- > 0)
{
[_runloop removeStream: self mode: [_modes objectAtIndex: i]];
}
}
[self _unschedule];
[self _setStatus: NSStreamStatusClosed];
/* We don't want to send any events the the delegate after the
* stream has been closed.
@ -173,8 +165,11 @@ static RunLoopEventType typeForStream(NSStream *aStream)
{
[self close];
}
DESTROY(_runloop);
DESTROY(_modes);
if (_loops != 0)
{
NSFreeMapTable(_loops);
_loops = 0;
}
DESTROY(_properties);
DESTROY(_lastError);
[super dealloc];
@ -192,7 +187,8 @@ static RunLoopEventType typeForStream(NSStream *aStream)
_delegate = self;
_properties = nil;
_lastError = nil;
_modes = [NSMutableArray new];
_loops = NSCreateMapTable(NSObjectMapKeyCallBacks,
NSObjectMapValueCallBacks, 1);
_currentStatus = NSStreamStatusNotOpen;
_loopID = (void*)self;
}
@ -206,15 +202,7 @@ static RunLoopEventType typeForStream(NSStream *aStream)
NSDebugMLog(@"Attempt to re-open stream %@", self);
}
[self _setStatus: NSStreamStatusOpen];
if (_runloop)
{
unsigned i = [_modes count];
while (i-- > 0)
{
[_runloop addStream: self mode: [_modes objectAtIndex: i]];
}
}
[self _schedule];
[self _sendEvent: NSStreamEventOpenCompleted];
}
@ -228,20 +216,24 @@ static RunLoopEventType typeForStream(NSStream *aStream)
extra: (void*)extra
forMode: (NSString*)mode
{
NSLog(@"Event on %p", self);
[self _dispatch];
}
- (void) removeFromRunLoop: (NSRunLoop *)aRunLoop forMode: (NSString *)mode
{
if (_runloop == aRunLoop)
if (aRunLoop != nil && mode != nil)
{
if ([_modes containsObject: mode])
NSMutableArray *modes;
modes = (NSMutableArray*)NSMapGet(_loops, (void*)aRunLoop);
if ([modes containsObject: mode])
{
[_runloop removeStream: self mode: mode];
[_modes removeObject: mode];
if ([_modes count] == 0)
[aRunLoop removeStream: self mode: mode];
[modes removeObject: mode];
if ([modes count] == 0)
{
DESTROY(_runloop);
NSMapRemove(_loops, (void*)aRunLoop);
}
}
}
@ -249,21 +241,30 @@ static RunLoopEventType typeForStream(NSStream *aStream)
- (void) scheduleInRunLoop: (NSRunLoop *)aRunLoop forMode: (NSString *)mode
{
NSAssert(!_runloop || _runloop == aRunLoop,
@"Attempt to schedule in more than one runloop.");
ASSIGN(_runloop, aRunLoop);
if ([_modes containsObject: mode] == NO)
if (aRunLoop != nil && mode != nil)
{
mode = [mode copy];
[_modes addObject: mode];
RELEASE(mode);
/* We only add open streams to the runloop .. subclasses may add
* streams when they are in the process of opening if they need
* to do so.
*/
if ([self _isOpened])
NSMutableArray *modes;
modes = (NSMutableArray*)NSMapGet(_loops, (void*)aRunLoop);
if (modes == nil)
{
[_runloop addStream: self mode: mode];
modes = [[NSMutableArray alloc] initWithCapacity: 1];
NSMapInsert(_loops, (void*)aRunLoop, (void*)modes);
RELEASE(modes);
}
if ([modes containsObject: mode] == NO)
{
mode = [mode copy];
[modes addObject: mode];
RELEASE(mode);
/* We only add open streams to the runloop .. subclasses may add
* streams when they are in the process of opening if they need
* to do so.
*/
if ([self _isOpened])
{
[aRunLoop addStream: self mode: mode];
}
}
}
}
@ -332,6 +333,10 @@ static RunLoopEventType typeForStream(NSStream *aStream)
{
}
- (void) _schedule
{
}
- (void) _sendEvent: (NSStreamEvent)event
{
}
@ -348,6 +353,11 @@ static RunLoopEventType typeForStream(NSStream *aStream)
{
return NO;
}
- (void) _unschedule
{
}
@end
@implementation GSStream (Private)
@ -379,6 +389,25 @@ static RunLoopEventType typeForStream(NSStream *aStream)
_currentStatus = NSStreamStatusError;
}
- (void) _schedule
{
NSMapEnumerator enumerator;
NSRunLoop *k;
NSMutableArray *v;
enumerator = NSEnumerateMapTable(_loops);
while (NSNextMapEnumeratorPair(&enumerator, (void **)(&k), (void**)&v))
{
unsigned i = [v count];
while (i-- > 0)
{
[k addStream: self mode: [v objectAtIndex: i]];
}
}
NSEndMapTableEnumeration(&enumerator);
}
- (void) _sendEvent: (NSStreamEvent)event
{
if (event == NSStreamEventNone)
@ -490,6 +519,25 @@ static RunLoopEventType typeForStream(NSStream *aStream)
return NO;
}
- (void) _unschedule
{
NSMapEnumerator enumerator;
NSRunLoop *k;
NSMutableArray *v;
enumerator = NSEnumerateMapTable(_loops);
while (NSNextMapEnumeratorPair(&enumerator, (void **)(&k), (void**)&v))
{
unsigned i = [v count];
while (i-- > 0)
{
[k removeStream: self mode: [v objectAtIndex: i]];
}
}
NSEndMapTableEnumeration(&enumerator);
}
- (BOOL) runLoopShouldBlock: (BOOL*)trigger
{
if (_events
@ -501,23 +549,43 @@ static RunLoopEventType typeForStream(NSStream *aStream)
*trigger = NO;
return NO;
}
if (_currentStatus == NSStreamStatusError &&
(_events & NSStreamEventErrorOccurred) == NSStreamEventErrorOccurred)
if (_currentStatus == NSStreamStatusError)
{
/* If an error has occurred (and been handled),
* we should not watch for any events at all.
*/
*trigger = NO;
return NO;
if ((_events & NSStreamEventErrorOccurred) == 0)
{
/* An error has occurred but not been handled,
* so we should trigger an error event at once.
*/
*trigger = YES;
return NO;
}
else
{
/* An error has occurred (and been handled),
* so we should not watch for any events at all.
*/
*trigger = NO;
return NO;
}
}
if (_currentStatus == NSStreamStatusAtEnd &&
(_events & NSStreamEventEndEncountered) == NSStreamEventEndEncountered)
if (_currentStatus == NSStreamStatusAtEnd)
{
/* If an error has occurred (and been handled),
* we should not watch for any events at all.
*/
*trigger = NO;
return NO;
if ((_events & NSStreamEventEndEncountered) == 0)
{
/* An end of stream has occurred but not been handled,
* so we should trigger an end of stream event at once.
*/
*trigger = YES;
return NO;
}
else
{
/* An end of stream has occurred (and been handled),
* so we should not watch for any events at all.
*/
*trigger = NO;
return NO;
}
}
if (_loopID == (void*)self)
@ -585,17 +653,6 @@ static RunLoopEventType typeForStream(NSStream *aStream)
{
return YES;
}
if (_currentStatus == NSStreamStatusAtEnd)
{
if ((_events & NSStreamEventEndEncountered) == 0)
{
/* We have not sent the appropriate event yet, so the
* client must not have issued a write:maxLength:
* (which is the point at which we should send).
*/
return YES;
}
}
return NO;
}
@ -675,7 +732,6 @@ static RunLoopEventType typeForStream(NSStream *aStream)
else
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
return copySize;
}
@ -757,7 +813,6 @@ static RunLoopEventType typeForStream(NSStream *aStream)
if (len == 0)
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
return 0;
}
}

View file

@ -382,14 +382,7 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt)
fprintf(stderr, "\n");
}
#endif
if (pollfds_count > 0)
{
poll_return = poll (pollfds, pollfds_count, milliseconds);
}
else
{
poll_return = 0;
}
poll_return = poll (pollfds, pollfds_count, milliseconds);
#if 0
{
unsigned int i;
@ -429,33 +422,33 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt)
* Trigger any watchers which are set up to for every runloop wait.
*/
count = GSIArrayCount(_trigger);
while (completed == NO && count-- > 0)
while (count-- > 0)
{
GSRunLoopWatcher *watcher;
watcher = (GSRunLoopWatcher*)GSIArrayItemAtIndex(_trigger, count).obj;
if (watcher->_invalidated == NO)
{
i = [contexts count];
while (i-- > 0)
{
GSRunLoopCtxt *c = [contexts objectAtIndex: i];
if (watcher->_invalidated == NO)
{
i = [contexts count];
while (i-- > 0)
{
GSRunLoopCtxt *c = [contexts objectAtIndex: i];
if (c != self)
{
[c endEvent: (void*)watcher for: watcher];
}
}
/*
* The watcher is still valid - so call its
* receivers event handling method.
*/
[watcher->receiver receivedEvent: watcher->data
type: watcher->type
extra: watcher->data
forMode: mode];
}
GSPrivateNotifyASAP();
if (c != self)
{
[c endEvent: (void*)watcher for: watcher];
}
}
/*
* The watcher is still valid - so call its
* receivers event handling method.
*/
[watcher->receiver receivedEvent: watcher->data
type: watcher->type
extra: watcher->data
forMode: mode];
}
GSPrivateNotifyASAP();
}
/*
@ -771,15 +764,8 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt)
// NSDebugMLLog(@"NSRunLoop", @"select timeout %d,%d", timeout.tv_sec, timeout.tv_usec);
if (fdEnd >= 0)
{
select_return = select (fdEnd, &read_fds, &write_fds,
&exception_fds, select_timeout);
}
else
{
select_return = 0;
}
select_return = select (fdEnd, &read_fds, &write_fds,
&exception_fds, select_timeout);
NSDebugMLLog(@"NSRunLoop", @"select returned %d", select_return);
@ -810,7 +796,7 @@ static void setPollfd(int fd, int event, GSRunLoopCtxt *ctxt)
* Trigger any watchers which are set up to for every runloop wait.
*/
count = GSIArrayCount(_trigger);
while (completed == NO && count-- > 0)
while (count-- > 0)
{
GSRunLoopWatcher *watcher;

View file

@ -424,7 +424,6 @@ static void setNonblocking(int fd)
else if (readLen == 0)
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
return readLen;
}
@ -462,7 +461,6 @@ static void setNonblocking(int fd)
if (fd < 0)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
_loopID = (void*)(intptr_t)fd;
@ -554,31 +552,25 @@ static void setNonblocking(int fd)
{
int result;
if (_runloop)
if (NSCountMapTable(_loops) > 0)
{
setNonblocking((intptr_t)_loopID);
}
result = connect((intptr_t)_loopID, [self peerAddr], [self sockLen]);
if (result < 0)
{
if (errno == EINPROGRESS && _runloop != nil)
if (errno == EINPROGRESS && NSCountMapTable(_loops) > 0)
{
unsigned i = [_modes count];
/*
* Need to set the status first, so that the run loop can tell
* it needs to add the stream as waiting on writable, as an
* indication of opened
*/
[self _setStatus: NSStreamStatusOpening];
while (i-- > 0)
{
[_runloop addStream: self mode: [_modes objectAtIndex: i]];
}
[self _schedule];
return;
}
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
}
@ -630,7 +622,6 @@ static void setNonblocking(int fd)
else if (readLen == 0)
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
return readLen;
}
@ -656,13 +647,9 @@ static void setNonblocking(int fd)
int error;
int result;
socklen_t len = sizeof(error);
unsigned i = [_modes count];
AUTORELEASE(RETAIN(self));
while (i-- > 0)
{
[_runloop removeStream: self mode: [_modes objectAtIndex: i]];
}
[self _unschedule];
result
= getsockopt((intptr_t)_loopID, SOL_SOCKET, SO_ERROR, &error, &len);
@ -898,7 +885,6 @@ static void setNonblocking(int fd)
if (fd < 0)
{ // make an error
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
_loopID = (void*)(intptr_t)fd;
@ -1036,31 +1022,25 @@ static void setNonblocking(int fd)
{
int result;
if (_runloop)
if (NSCountMapTable(_loops) > 0)
{
setNonblocking((intptr_t)_loopID);
}
result = connect((intptr_t)_loopID, [self peerAddr], [self sockLen]);
if (result < 0)
{
if (errno == EINPROGRESS && _runloop != nil)
if (errno == EINPROGRESS && NSCountMapTable(_loops) > 0)
{
unsigned i = [_modes count];
/*
* Need to set the status first, so that the run loop can tell
* it needs to add the stream as waiting on writable, as an
* indication of opened
*/
[self _setStatus: NSStreamStatusOpening];
while (i-- > 0)
{
[_runloop addStream: self mode: [_modes objectAtIndex: i]];
}
[self _unschedule];
return;
}
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
}
@ -1093,13 +1073,9 @@ static void setNonblocking(int fd)
int error;
socklen_t len = sizeof(error);
int result;
unsigned i = [_modes count];
AUTORELEASE(RETAIN(self));
while (i-- > 0)
{
[_runloop removeStream: self mode: [_modes objectAtIndex: i]];
}
[self _schedule];
result
= getsockopt((intptr_t)_loopID, SOL_SOCKET, SO_ERROR, &error, &len);
if (result >= 0 && !error)
@ -1619,7 +1595,6 @@ static void setNonblocking(int fd)
if (bindReturn < 0 || listenReturn < 0)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
setNonblocking((intptr_t)_loopID);

View file

@ -424,7 +424,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks =
}
else
{
SleepEx(wait_timeout, TRUE);
SleepEx(wait_timeout, TRUE);
wait_return = WAIT_OBJECT_0;
}
NSDebugMLLog(@"NSRunLoop", @"wait returned %d", wait_return);
@ -467,7 +467,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks =
*/
count = GSIArrayCount(_trigger);
completed = NO;
while (completed == NO && count-- > 0)
while (count-- > 0)
{
GSRunLoopWatcher *watcher;
@ -503,7 +503,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks =
return NO;
}
// if there arent events
// if there aren't events
if (wait_return == WAIT_TIMEOUT)
{
completed = YES;

View file

@ -368,7 +368,6 @@ static void setNonblocking(SOCKET fd)
if (h == INVALID_HANDLE_VALUE)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
[self _setLoopID: (void*)h];
@ -418,7 +417,6 @@ static void setNonblocking(SOCKET fd)
else if (readLen == 0)
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
return (int)readLen;
}
@ -635,7 +633,6 @@ static void setNonblocking(SOCKET fd)
{
if (myStatus == NSStreamStatusError)
{
[self _sendEvent: NSStreamEventErrorOccurred];
return -1; // Waiting for read.
}
if (myStatus == NSStreamStatusOpen)
@ -645,7 +642,6 @@ static void setNonblocking(SOCKET fd)
* so we must be at EOF.
*/
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
return 0;
}
@ -808,19 +804,13 @@ static void setNonblocking(SOCKET fd)
&& WSAGetLastError() != WSAEWOULDBLOCK)
{// make an error
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
// waiting on writable, as an indication of opened
if (_runloop)
if (NSCountMapTable(_loops) > 0)
{
unsigned i = [_modes count];
WSAEventSelect(_sock, _loopID, FD_ALL_EVENTS);
while (i-- > 0)
{
[_runloop addStream: self mode: [_modes objectAtIndex: i]];
}
[self _schedule];
}
[self _setStatus: NSStreamStatusOpening];
return;
@ -902,7 +892,6 @@ static void setNonblocking(SOCKET fd)
else if (readLen == 0)
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
else
{
@ -950,12 +939,7 @@ static void setNonblocking(SOCKET fd)
if ([self streamStatus] == NSStreamStatusOpening)
{
unsigned i = [_modes count];
while (i-- > 0)
{
[_runloop removeStream: self mode: [_modes objectAtIndex: i]];
}
[self _unschedule];
if (error == 0)
{
unsigned len = sizeof(error);
@ -1029,7 +1013,6 @@ static void setNonblocking(SOCKET fd)
if ([self _isOpened])
{
[self _setStatus: NSStreamStatusAtEnd];
[self _sendEvent: NSStreamEventEndEncountered];
}
}
}
@ -1123,7 +1106,6 @@ static void setNonblocking(SOCKET fd)
if (h == INVALID_HANDLE_VALUE)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
else if (_shouldAppend == NO)
@ -1131,7 +1113,6 @@ static void setNonblocking(SOCKET fd)
if (SetEndOfFile(h) == 0) // Truncate to current file pointer (0)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
CloseHandle(h);
return;
}
@ -1589,20 +1570,13 @@ static void setNonblocking(SOCKET fd)
&& WSAGetLastError() != WSAEWOULDBLOCK)
{// make an error
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
// waiting on writable, as an indication of opened
if (_runloop)
if (NSCountMapTable(_loops) > 0)
{
unsigned i = [_modes count];
WSAEventSelect(_sock, _loopID, FD_ALL_EVENTS);
while (i-- > 0)
{
[_runloop addStream: self mode: [_modes objectAtIndex: i]];
}
[self _schedule];
}
[self _setStatus: NSStreamStatusOpening];
return;
@ -1686,13 +1660,7 @@ static void setNonblocking(SOCKET fd)
if ([self streamStatus] == NSStreamStatusOpening)
{
unsigned i = [_modes count];
while (i-- > 0)
{
[_runloop removeStream: self mode: [_modes objectAtIndex: i]];
}
[self _unschedule];
if (error == 0)
{
unsigned len = sizeof(error);
@ -2294,7 +2262,6 @@ done:
if (bindReturn < 0 || listenReturn < 0)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
setNonblocking(_sock);
@ -2508,7 +2475,6 @@ done:
if (handle == INVALID_HANDLE_VALUE)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
@ -2531,7 +2497,6 @@ done:
else if (errno != ERROR_IO_PENDING)
{
[self _recordError];
[self _sendEvent: NSStreamEventErrorOccurred];
return;
}
}
@ -2543,7 +2508,6 @@ done:
if (alreadyConnected == YES)
{
[self _setStatus: NSStreamStatusOpen];
[self _sendEvent: NSStreamEventHasBytesAvailable];
}
}