Fixes for I/O operations which complete synchronously even though we wanted

them to be asynchronous.


git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@21988 72102866-910b-0410-8b05-ffd578937521
This commit is contained in:
Richard Frith-Macdonald 2005-11-11 12:09:19 +00:00
parent ed4c30c3ac
commit ef91b661a7
2 changed files with 216 additions and 180 deletions

View file

@ -429,6 +429,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks =
* The watcher is still valid - so call its receivers
* event handling method.
*/
NSDebugMLLog(@"NSRunLoop", @"Event callback found");
(*watcher->handleEvent)(watcher->receiver,
eventSel, watcher->data, watcher->type,
(void*)(gsaddr)handle, mode);

View file

@ -96,6 +96,13 @@ typedef struct {
unsigned char port[16];
} GSPortMsgHeader;
typedef enum {
RS_NONE, // Not started yet
RS_MESG, // Waiting to be notified of a message arriving
RS_SIZE, // Need to determine message size
RS_DATA // Need to read message data
} ReadState;
typedef struct {
NSString *name;
NSRecursiveLock *lock;
@ -103,6 +110,7 @@ typedef struct {
HANDLE wHandle;
HANDLE rEvent;
HANDLE wEvent;
ReadState rState;
OVERLAPPED rOv;
OVERLAPPED wOv;
DWORD rSize;
@ -309,6 +317,7 @@ static unsigned wordAlign;
this->wHandle = INVALID_HANDLE_VALUE;
this->wEvent = INVALID_HANDLE_VALUE;
this->rState = RS_NONE;
this->rEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
this->rData = [NSMutableData new];
this->rMsgs = [NSMutableArray new];
@ -367,6 +376,8 @@ static unsigned wordAlign;
this->lock = [GSLazyRecursiveLock new];
this->rState = RS_NONE;
this->rHandle = INVALID_HANDLE_VALUE;
this->rEvent = INVALID_HANDLE_VALUE;
this->wHandle = INVALID_HANDLE_VALUE;
@ -476,7 +487,9 @@ static unsigned wordAlign;
M_LOCK(this->lock);
if (this->rWant > 0)
NSDebugMLLog(@"NSMessagePort", @"entered with rWant=%d", this->rWant);
if (this->rState == RS_MESG)
{
/*
* Have we read something?
@ -488,6 +501,7 @@ static unsigned wordAlign;
TRUE) == 0)
{
errno = GetLastError();
NSDebugMLLog(@"NSMessagePort", @"overlapped result=%d", errno);
/*
* Our overlapped read attempt should fail ... because mailslots
* insist we read an entire message in one go, and we asked it
@ -498,210 +512,214 @@ static unsigned wordAlign;
*/
if (errno == ERROR_INSUFFICIENT_BUFFER)
{
if (GetMailslotInfo(
this->rHandle,
0,
&this->rWant,
0,
0) == 0)
{
NSLog(@"unable to get info from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
[this->rData setLength: this->rWant];
if (ReadFile(this->rHandle,
[this->rData mutableBytes], // Store results here
this->rWant,
&this->rSize,
NULL) == 0)
{
NSLog(@"unable to read from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
if (this->rSize != this->rWant)
{
NSLog(@"only read %d of %d bytes from mailslot '%@' - %s",
this->rSize, this->rWant, this->name,
GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
NSDebugMLLog(@"NSMessagePort", @"Read complete on %@",
self);
}
this->rLength = this->rSize;
this->rSize = 0;
}
this->rState = RS_SIZE;
}
else
{
NSLog(@"GetOverlappedResult failed ...%s", GSLastErrorStr(errno));
this->rState = RS_NONE;
this->rLength = 0;
this->rWant = 1;
}
}
else
{
NSLog(@"GetOverlappedResult succes ...%u", this->rSize);
this->rLength += this->rSize;
this->rSize = 0;
}
/*
* Do next part only if we have completed a read.
*/
if (this->rLength == this->rWant)
{
unsigned char *buf = [this->rData mutableBytes];
GSPortItemType rType;
GSPortItemHeader *pih;
unsigned off = 0;
unsigned len;
unsigned rId;
unsigned nItems;
NSMessagePort *rPort = nil;
NSMutableArray *rItems = nil;
while (off + sizeof(GSPortItemHeader) <= this->rLength)
{
pih = (GSPortItemHeader*)(buf + off);
off += sizeof(GSPortItemHeader);
rType = GSSwapBigI32ToHost(pih->type);
len = GSSwapBigI32ToHost(pih->length);
if (len + off > this->rLength)
{
NSLog(@"%@ - unreasonable length (%u) for data", self, len);
break;
}
if (rType != GSP_HEAD && rItems == nil)
{
NSLog(@"%@ - initial part of message had bad type");
break;
}
if (rType == GSP_HEAD)
{
GSPortMsgHeader *pmh;
NSString *n;
NSMutableData *d;
if (len < sizeof(GSPortMsgHeader))
{
NSLog(@"%@ - bad length for header", self);
break;
}
pmh = (GSPortMsgHeader*)(buf + off);
off += sizeof(GSPortMsgHeader);
len -= sizeof(GSPortMsgHeader);
rId = GSSwapBigI32ToHost(pmh->mId);
nItems = GSSwapBigI32ToHost(pmh->nItems);
if (nItems == 0)
{
NSLog(@"%@ - unable to decode item count", self);
break;
}
n = [[NSString alloc] initWithBytes: pmh->port
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
rPort = [messagePortClass newWithName: n];
RELEASE(n);
if (rPort == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
rItems = [NSMutableArray alloc];
rItems = [rItems initWithCapacity: nItems];
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_DATA)
{
NSMutableData *d;
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_PORT)
{
NSMessagePort *p;
NSString *n;
if (len != 16)
{
NSLog(@"%@ - bad length for port item", self);
break;
}
n = [[NSString alloc] initWithBytes: buf + off
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
p = [messagePortClass newWithName: n];
RELEASE(n);
if (p == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
[rItems addObject: p];
RELEASE(p);
}
off += len;
if (nItems == [rItems count])
{
NSPortMessage *pm;
pm = [NSPortMessage allocWithZone: NSDefaultMallocZone()];
pm = [pm initWithSendPort: rPort
receivePort: self
components: rItems];
DESTROY(rPort);
DESTROY(rItems);
[pm setMsgid: rId];
[this->rMsgs addObject: pm];
RELEASE(pm);
break;
}
}
DESTROY(rPort);
DESTROY(rItems);
this->rWant = 1; // Queue a read
NSLog(@"GetOverlappedResult success ...%u", this->rSize);
this->rState = RS_NONE;
this->rLength = 0;
}
}
if (this->rState == RS_SIZE)
{
if (GetMailslotInfo(
this->rHandle,
0,
&this->rWant,
0,
0) == 0)
{
NSLog(@"unable to get info from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
NSLog(@"Unexpected STATE");
this->rState = RS_DATA;
NSDebugMLLog(@"NSMessagePort", @"mailslot size=%d",
this->rWant);
[this->rData setLength: this->rWant];
if (ReadFile(this->rHandle,
[this->rData mutableBytes], // Store results here
this->rWant,
&this->rSize,
NULL) == 0)
{
NSLog(@"unable to read from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
if (this->rSize != this->rWant)
{
NSLog(@"only read %d of %d bytes from mailslot '%@' - %s",
this->rSize, this->rWant, this->name,
GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
NSDebugMLLog(@"NSMessagePort", @"Read complete on %@",
self);
}
this->rLength = this->rSize;
this->rSize = 0;
this->rState = RS_NONE;
}
}
else
/*
* Do next part only if we have completed a read.
*/
if (this->rLength > 0 && this->rLength == this->rWant)
{
this->rWant = 1; // Queue a read
this->rLength = 0;
unsigned char *buf = [this->rData mutableBytes];
GSPortItemType rType;
GSPortItemHeader *pih;
unsigned off = 0;
unsigned len;
unsigned rId;
unsigned nItems;
NSMessagePort *rPort = nil;
NSMutableArray *rItems = nil;
while (off + sizeof(GSPortItemHeader) <= this->rLength)
{
pih = (GSPortItemHeader*)(buf + off);
off += sizeof(GSPortItemHeader);
rType = GSSwapBigI32ToHost(pih->type);
len = GSSwapBigI32ToHost(pih->length);
if (len + off > this->rLength)
{
NSLog(@"%@ - unreasonable length (%u) for data", self, len);
break;
}
if (rType != GSP_HEAD && rItems == nil)
{
NSLog(@"%@ - initial part of message had bad type");
break;
}
if (rType == GSP_HEAD)
{
GSPortMsgHeader *pmh;
NSString *n;
NSMutableData *d;
if (len < sizeof(GSPortMsgHeader))
{
NSLog(@"%@ - bad length for header", self);
break;
}
pmh = (GSPortMsgHeader*)(buf + off);
off += sizeof(GSPortMsgHeader);
len -= sizeof(GSPortMsgHeader);
rId = GSSwapBigI32ToHost(pmh->mId);
nItems = GSSwapBigI32ToHost(pmh->nItems);
if (nItems == 0)
{
NSLog(@"%@ - unable to decode item count", self);
break;
}
n = [[NSString alloc] initWithBytes: pmh->port
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
rPort = [messagePortClass newWithName: n];
RELEASE(n);
if (rPort == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
rItems = [NSMutableArray alloc];
rItems = [rItems initWithCapacity: nItems];
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_DATA)
{
NSMutableData *d;
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_PORT)
{
NSMessagePort *p;
NSString *n;
if (len != 16)
{
NSLog(@"%@ - bad length for port item", self);
break;
}
n = [[NSString alloc] initWithBytes: buf + off
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
p = [messagePortClass newWithName: n];
RELEASE(n);
if (p == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
[rItems addObject: p];
RELEASE(p);
}
off += len;
if (nItems == [rItems count])
{
NSPortMessage *pm;
pm = [NSPortMessage allocWithZone: NSDefaultMallocZone()];
pm = [pm initWithSendPort: rPort
receivePort: self
components: rItems];
DESTROY(rPort);
DESTROY(rItems);
[pm setMsgid: rId];
[this->rMsgs addObject: pm];
RELEASE(pm);
break;
}
}
DESTROY(rPort);
DESTROY(rItems);
}
/*
* Got something ... is it all we want? If not, ask to read more.
*/
if ([self isValid] == YES && this->rLength < this->rWant)
if ([self isValid] == YES
&& (this->rState == RS_NONE || this->rLength < this->rWant))
{
int rc;
this->rOv.Offset = 0;
this->rOv.OffsetHigh = 0;
this->rOv.hEvent = this->rEvent;
if (this->rState == RS_NONE)
{
this->rLength = 0;
this->rWant = 1;
}
if ([this->rData length] < (this->rWant - this->rLength))
{
[this->rData setLength: this->rWant - this->rLength];
@ -715,15 +733,27 @@ static unsigned wordAlign;
if (rc > 0)
{
NSDebugMLLog(@"NSMessagePort", @"Read immediate on %@", self);
if (this->rState == RS_NONE)
{
this->rState = RS_SIZE;
}
SetEvent(this->rEvent);
}
else if ((errno = GetLastError()) == ERROR_IO_PENDING)
{
NSDebugMLLog(@"NSMessagePort", @"Read queued on %@", self);
if (this->rState == RS_NONE)
{
this->rState = RS_MESG;
}
}
else if (errno == ERROR_INSUFFICIENT_BUFFER)
{
NSDebugMLLog(@"NSMessagePort", @"Read retry on %@", self);
if (this->rState == RS_NONE)
{
this->rState = RS_SIZE;
}
SetEvent(this->rEvent);
}
else
@ -792,6 +822,8 @@ static unsigned wordAlign;
}
}
again:
/*
* Handle start of next data item if we have completed one,
* or if we are called without a write in progress.
@ -828,7 +860,8 @@ static unsigned wordAlign;
{
NSDebugMLLog(@"NSMessagePort", @"Write of %d performs %d",
[this->wData length] - this->wLength, this->wSize);
SetEvent(this->wEvent);
this->wLength += this->wSize;
goto again;
}
else if ((errno = GetLastError()) != ERROR_IO_PENDING)
{
@ -855,6 +888,7 @@ static unsigned wordAlign;
{
internal *this = PORT(self);
NSDebugMLLog(@"NSMessagePort", @"got event on %@ in mode %@", self, mode);
if (this->rEvent == (HANDLE)data)
{
[self receivedEventRead];
@ -866,7 +900,8 @@ static unsigned wordAlign;
}
else
{
// Event on invalid port ... remove port from run loop
NSDebugMLLog(@"NSMessagePort",
@"got event on invalidated port 0x%x in mode %@", self, mode);
[[NSRunLoop currentRunLoop] removeEvent: data
type: type
forMode: mode