Fixes for I/O operations which complete synchronously even though we wanted

them to be asynchronous.


git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@21988 72102866-910b-0410-8b05-ffd578937521
This commit is contained in:
CaS 2005-11-11 12:09:19 +00:00
parent 26f653b0d4
commit 77c9edcc24
2 changed files with 216 additions and 180 deletions

View file

@ -429,6 +429,7 @@ static const NSMapTableValueCallBacks WatcherMapValueCallBacks =
* The watcher is still valid - so call its receivers * The watcher is still valid - so call its receivers
* event handling method. * event handling method.
*/ */
NSDebugMLLog(@"NSRunLoop", @"Event callback found");
(*watcher->handleEvent)(watcher->receiver, (*watcher->handleEvent)(watcher->receiver,
eventSel, watcher->data, watcher->type, eventSel, watcher->data, watcher->type,
(void*)(gsaddr)handle, mode); (void*)(gsaddr)handle, mode);

View file

@ -96,6 +96,13 @@ typedef struct {
unsigned char port[16]; unsigned char port[16];
} GSPortMsgHeader; } GSPortMsgHeader;
typedef enum {
RS_NONE, // Not started yet
RS_MESG, // Waiting to be notified of a message arriving
RS_SIZE, // Need to determine message size
RS_DATA // Need to read message data
} ReadState;
typedef struct { typedef struct {
NSString *name; NSString *name;
NSRecursiveLock *lock; NSRecursiveLock *lock;
@ -103,6 +110,7 @@ typedef struct {
HANDLE wHandle; HANDLE wHandle;
HANDLE rEvent; HANDLE rEvent;
HANDLE wEvent; HANDLE wEvent;
ReadState rState;
OVERLAPPED rOv; OVERLAPPED rOv;
OVERLAPPED wOv; OVERLAPPED wOv;
DWORD rSize; DWORD rSize;
@ -309,6 +317,7 @@ static unsigned wordAlign;
this->wHandle = INVALID_HANDLE_VALUE; this->wHandle = INVALID_HANDLE_VALUE;
this->wEvent = INVALID_HANDLE_VALUE; this->wEvent = INVALID_HANDLE_VALUE;
this->rState = RS_NONE;
this->rEvent = CreateEvent(NULL, FALSE, FALSE, NULL); this->rEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
this->rData = [NSMutableData new]; this->rData = [NSMutableData new];
this->rMsgs = [NSMutableArray new]; this->rMsgs = [NSMutableArray new];
@ -367,6 +376,8 @@ static unsigned wordAlign;
this->lock = [GSLazyRecursiveLock new]; this->lock = [GSLazyRecursiveLock new];
this->rState = RS_NONE;
this->rHandle = INVALID_HANDLE_VALUE; this->rHandle = INVALID_HANDLE_VALUE;
this->rEvent = INVALID_HANDLE_VALUE; this->rEvent = INVALID_HANDLE_VALUE;
this->wHandle = INVALID_HANDLE_VALUE; this->wHandle = INVALID_HANDLE_VALUE;
@ -476,7 +487,9 @@ static unsigned wordAlign;
M_LOCK(this->lock); M_LOCK(this->lock);
if (this->rWant > 0) NSDebugMLLog(@"NSMessagePort", @"entered with rWant=%d", this->rWant);
if (this->rState == RS_MESG)
{ {
/* /*
* Have we read something? * Have we read something?
@ -488,6 +501,7 @@ static unsigned wordAlign;
TRUE) == 0) TRUE) == 0)
{ {
errno = GetLastError(); errno = GetLastError();
NSDebugMLLog(@"NSMessagePort", @"overlapped result=%d", errno);
/* /*
* Our overlapped read attempt should fail ... because mailslots * Our overlapped read attempt should fail ... because mailslots
* insist we read an entire message in one go, and we asked it * insist we read an entire message in one go, and we asked it
@ -498,210 +512,214 @@ static unsigned wordAlign;
*/ */
if (errno == ERROR_INSUFFICIENT_BUFFER) if (errno == ERROR_INSUFFICIENT_BUFFER)
{ {
if (GetMailslotInfo( this->rState = RS_SIZE;
this->rHandle,
0,
&this->rWant,
0,
0) == 0)
{
NSLog(@"unable to get info from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
[this->rData setLength: this->rWant];
if (ReadFile(this->rHandle,
[this->rData mutableBytes], // Store results here
this->rWant,
&this->rSize,
NULL) == 0)
{
NSLog(@"unable to read from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
if (this->rSize != this->rWant)
{
NSLog(@"only read %d of %d bytes from mailslot '%@' - %s",
this->rSize, this->rWant, this->name,
GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
NSDebugMLLog(@"NSMessagePort", @"Read complete on %@",
self);
}
this->rLength = this->rSize;
this->rSize = 0;
}
} }
else else
{ {
NSLog(@"GetOverlappedResult failed ...%s", GSLastErrorStr(errno)); NSLog(@"GetOverlappedResult failed ...%s", GSLastErrorStr(errno));
this->rState = RS_NONE;
this->rLength = 0; this->rLength = 0;
this->rWant = 1;
} }
} }
else else
{ {
NSLog(@"GetOverlappedResult succes ...%u", this->rSize); NSLog(@"GetOverlappedResult success ...%u", this->rSize);
this->rLength += this->rSize; this->rState = RS_NONE;
this->rSize = 0;
}
/*
* Do next part only if we have completed a read.
*/
if (this->rLength == this->rWant)
{
unsigned char *buf = [this->rData mutableBytes];
GSPortItemType rType;
GSPortItemHeader *pih;
unsigned off = 0;
unsigned len;
unsigned rId;
unsigned nItems;
NSMessagePort *rPort = nil;
NSMutableArray *rItems = nil;
while (off + sizeof(GSPortItemHeader) <= this->rLength)
{
pih = (GSPortItemHeader*)(buf + off);
off += sizeof(GSPortItemHeader);
rType = GSSwapBigI32ToHost(pih->type);
len = GSSwapBigI32ToHost(pih->length);
if (len + off > this->rLength)
{
NSLog(@"%@ - unreasonable length (%u) for data", self, len);
break;
}
if (rType != GSP_HEAD && rItems == nil)
{
NSLog(@"%@ - initial part of message had bad type");
break;
}
if (rType == GSP_HEAD)
{
GSPortMsgHeader *pmh;
NSString *n;
NSMutableData *d;
if (len < sizeof(GSPortMsgHeader))
{
NSLog(@"%@ - bad length for header", self);
break;
}
pmh = (GSPortMsgHeader*)(buf + off);
off += sizeof(GSPortMsgHeader);
len -= sizeof(GSPortMsgHeader);
rId = GSSwapBigI32ToHost(pmh->mId);
nItems = GSSwapBigI32ToHost(pmh->nItems);
if (nItems == 0)
{
NSLog(@"%@ - unable to decode item count", self);
break;
}
n = [[NSString alloc] initWithBytes: pmh->port
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
rPort = [messagePortClass newWithName: n];
RELEASE(n);
if (rPort == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
rItems = [NSMutableArray alloc];
rItems = [rItems initWithCapacity: nItems];
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_DATA)
{
NSMutableData *d;
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_PORT)
{
NSMessagePort *p;
NSString *n;
if (len != 16)
{
NSLog(@"%@ - bad length for port item", self);
break;
}
n = [[NSString alloc] initWithBytes: buf + off
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
p = [messagePortClass newWithName: n];
RELEASE(n);
if (p == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
[rItems addObject: p];
RELEASE(p);
}
off += len;
if (nItems == [rItems count])
{
NSPortMessage *pm;
pm = [NSPortMessage allocWithZone: NSDefaultMallocZone()];
pm = [pm initWithSendPort: rPort
receivePort: self
components: rItems];
DESTROY(rPort);
DESTROY(rItems);
[pm setMsgid: rId];
[this->rMsgs addObject: pm];
RELEASE(pm);
break;
}
}
DESTROY(rPort);
DESTROY(rItems);
this->rWant = 1; // Queue a read
this->rLength = 0; this->rLength = 0;
} }
}
if (this->rState == RS_SIZE)
{
if (GetMailslotInfo(
this->rHandle,
0,
&this->rWant,
0,
0) == 0)
{
NSLog(@"unable to get info from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
else else
{ {
NSLog(@"Unexpected STATE"); this->rState = RS_DATA;
NSDebugMLLog(@"NSMessagePort", @"mailslot size=%d",
this->rWant);
[this->rData setLength: this->rWant];
if (ReadFile(this->rHandle,
[this->rData mutableBytes], // Store results here
this->rWant,
&this->rSize,
NULL) == 0)
{
NSLog(@"unable to read from mailslot '%@' - %s",
this->name, GSLastErrorStr(errno));
[self invalidate];
return;
}
if (this->rSize != this->rWant)
{
NSLog(@"only read %d of %d bytes from mailslot '%@' - %s",
this->rSize, this->rWant, this->name,
GSLastErrorStr(errno));
[self invalidate];
return;
}
else
{
NSDebugMLLog(@"NSMessagePort", @"Read complete on %@",
self);
}
this->rLength = this->rSize;
this->rSize = 0;
this->rState = RS_NONE;
} }
} }
else
/*
* Do next part only if we have completed a read.
*/
if (this->rLength > 0 && this->rLength == this->rWant)
{ {
this->rWant = 1; // Queue a read unsigned char *buf = [this->rData mutableBytes];
this->rLength = 0; GSPortItemType rType;
GSPortItemHeader *pih;
unsigned off = 0;
unsigned len;
unsigned rId;
unsigned nItems;
NSMessagePort *rPort = nil;
NSMutableArray *rItems = nil;
while (off + sizeof(GSPortItemHeader) <= this->rLength)
{
pih = (GSPortItemHeader*)(buf + off);
off += sizeof(GSPortItemHeader);
rType = GSSwapBigI32ToHost(pih->type);
len = GSSwapBigI32ToHost(pih->length);
if (len + off > this->rLength)
{
NSLog(@"%@ - unreasonable length (%u) for data", self, len);
break;
}
if (rType != GSP_HEAD && rItems == nil)
{
NSLog(@"%@ - initial part of message had bad type");
break;
}
if (rType == GSP_HEAD)
{
GSPortMsgHeader *pmh;
NSString *n;
NSMutableData *d;
if (len < sizeof(GSPortMsgHeader))
{
NSLog(@"%@ - bad length for header", self);
break;
}
pmh = (GSPortMsgHeader*)(buf + off);
off += sizeof(GSPortMsgHeader);
len -= sizeof(GSPortMsgHeader);
rId = GSSwapBigI32ToHost(pmh->mId);
nItems = GSSwapBigI32ToHost(pmh->nItems);
if (nItems == 0)
{
NSLog(@"%@ - unable to decode item count", self);
break;
}
n = [[NSString alloc] initWithBytes: pmh->port
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
rPort = [messagePortClass newWithName: n];
RELEASE(n);
if (rPort == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
rItems = [NSMutableArray alloc];
rItems = [rItems initWithCapacity: nItems];
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_DATA)
{
NSMutableData *d;
d = [[NSMutableData alloc] initWithBytes: buf + off
length: len];
[rItems addObject: d];
RELEASE(d);
}
else if (rType == GSP_PORT)
{
NSMessagePort *p;
NSString *n;
if (len != 16)
{
NSLog(@"%@ - bad length for port item", self);
break;
}
n = [[NSString alloc] initWithBytes: buf + off
length: 16
encoding: NSASCIIStringEncoding];
NSDebugFLLog(@"NSMessagePort", @"Decoded port as '%@'", n);
p = [messagePortClass newWithName: n];
RELEASE(n);
if (p == nil)
{
NSLog(@"%@ - unable to decode remote port", self);
break;
}
[rItems addObject: p];
RELEASE(p);
}
off += len;
if (nItems == [rItems count])
{
NSPortMessage *pm;
pm = [NSPortMessage allocWithZone: NSDefaultMallocZone()];
pm = [pm initWithSendPort: rPort
receivePort: self
components: rItems];
DESTROY(rPort);
DESTROY(rItems);
[pm setMsgid: rId];
[this->rMsgs addObject: pm];
RELEASE(pm);
break;
}
}
DESTROY(rPort);
DESTROY(rItems);
} }
/* /*
* Got something ... is it all we want? If not, ask to read more. * Got something ... is it all we want? If not, ask to read more.
*/ */
if ([self isValid] == YES && this->rLength < this->rWant) if ([self isValid] == YES
&& (this->rState == RS_NONE || this->rLength < this->rWant))
{ {
int rc; int rc;
this->rOv.Offset = 0; this->rOv.Offset = 0;
this->rOv.OffsetHigh = 0; this->rOv.OffsetHigh = 0;
this->rOv.hEvent = this->rEvent; this->rOv.hEvent = this->rEvent;
if (this->rState == RS_NONE)
{
this->rLength = 0;
this->rWant = 1;
}
if ([this->rData length] < (this->rWant - this->rLength)) if ([this->rData length] < (this->rWant - this->rLength))
{ {
[this->rData setLength: this->rWant - this->rLength]; [this->rData setLength: this->rWant - this->rLength];
@ -715,15 +733,27 @@ static unsigned wordAlign;
if (rc > 0) if (rc > 0)
{ {
NSDebugMLLog(@"NSMessagePort", @"Read immediate on %@", self); NSDebugMLLog(@"NSMessagePort", @"Read immediate on %@", self);
if (this->rState == RS_NONE)
{
this->rState = RS_SIZE;
}
SetEvent(this->rEvent); SetEvent(this->rEvent);
} }
else if ((errno = GetLastError()) == ERROR_IO_PENDING) else if ((errno = GetLastError()) == ERROR_IO_PENDING)
{ {
NSDebugMLLog(@"NSMessagePort", @"Read queued on %@", self); NSDebugMLLog(@"NSMessagePort", @"Read queued on %@", self);
if (this->rState == RS_NONE)
{
this->rState = RS_MESG;
}
} }
else if (errno == ERROR_INSUFFICIENT_BUFFER) else if (errno == ERROR_INSUFFICIENT_BUFFER)
{ {
NSDebugMLLog(@"NSMessagePort", @"Read retry on %@", self); NSDebugMLLog(@"NSMessagePort", @"Read retry on %@", self);
if (this->rState == RS_NONE)
{
this->rState = RS_SIZE;
}
SetEvent(this->rEvent); SetEvent(this->rEvent);
} }
else else
@ -792,6 +822,8 @@ static unsigned wordAlign;
} }
} }
again:
/* /*
* Handle start of next data item if we have completed one, * Handle start of next data item if we have completed one,
* or if we are called without a write in progress. * or if we are called without a write in progress.
@ -828,7 +860,8 @@ static unsigned wordAlign;
{ {
NSDebugMLLog(@"NSMessagePort", @"Write of %d performs %d", NSDebugMLLog(@"NSMessagePort", @"Write of %d performs %d",
[this->wData length] - this->wLength, this->wSize); [this->wData length] - this->wLength, this->wSize);
SetEvent(this->wEvent); this->wLength += this->wSize;
goto again;
} }
else if ((errno = GetLastError()) != ERROR_IO_PENDING) else if ((errno = GetLastError()) != ERROR_IO_PENDING)
{ {
@ -855,6 +888,7 @@ static unsigned wordAlign;
{ {
internal *this = PORT(self); internal *this = PORT(self);
NSDebugMLLog(@"NSMessagePort", @"got event on %@ in mode %@", self, mode);
if (this->rEvent == (HANDLE)data) if (this->rEvent == (HANDLE)data)
{ {
[self receivedEventRead]; [self receivedEventRead];
@ -866,7 +900,8 @@ static unsigned wordAlign;
} }
else else
{ {
// Event on invalid port ... remove port from run loop NSDebugMLLog(@"NSMessagePort",
@"got event on invalidated port 0x%x in mode %@", self, mode);
[[NSRunLoop currentRunLoop] removeEvent: data [[NSRunLoop currentRunLoop] removeEvent: data
type: type type: type
forMode: mode forMode: mode