mirror of
https://github.com/gnustep/libs-base.git
synced 2025-05-31 00:30:53 +00:00
Remove old-style methods for getting incoming ConnectedCoders. Add
new methods that use RunLoop. All callers changed. ([Connection -addToRunLoop:forMode:]): New method. ([Connection -removeFromRunLoop:forMode:]): New method. ([Connection -runConnectionUntilDate:]): New method. ([Connection -runConnection]): Method changed to call above method. ([Connection -_handleRmc:]): New method. ([Connection -_handleQueuedRmcRequests]): New method. ([Connection -_getReceivedReplyRmcFromQueueWithSequenceNumber:]): New method. ([Connection -_getReceivedReplyRmcWithSequenceNumber:]): New method. ([Connection -invokeWithObject:]): New method. ([Connection +newForInPort:outPort:ancestorConnection:]): Set reply_depth ivar. ([Connection -_encoderCreateReferenceForConstPtr:]): Return the xref! git-svn-id: svn+ssh://svn.gna.org/svn/gnustep/libs/base/trunk@1117 72102866-910b-0410-8b05-ffd578937521
This commit is contained in:
parent
17093e1111
commit
cce7a64779
1 changed files with 163 additions and 215 deletions
|
@ -42,12 +42,15 @@
|
|||
#include <objects/Queue.h>
|
||||
#include <objects/mframe.h>
|
||||
#include <objects/Notification.h>
|
||||
#include <objects/RunLoop.h>
|
||||
#include <Foundation/NSString.h>
|
||||
#include <Foundation/NSDate.h>
|
||||
#include <assert.h>
|
||||
|
||||
@interface Connection (GettingCoderInterface)
|
||||
- _serviceReceivedRequestsWithTimeout: (int)to;
|
||||
- newReceivedReplyRmcWithSequenceNumber: (int)n;
|
||||
- (void) _handleRmc: rmc;
|
||||
- (void) _handleQueuedRmcRequests;
|
||||
- _getReceivedReplyRmcWithSequenceNumber: (int)n;
|
||||
- newSendingRequestRmc;
|
||||
- newSendingReplyRmcWithSequenceNumber: (int)n;
|
||||
- (int) _newMsgNumber;
|
||||
|
@ -238,7 +241,7 @@ static int messages_received_count;
|
|||
return [connection_array count];
|
||||
}
|
||||
|
||||
+ (unsigned) connectionsCountWithInPort: (Port*)aPort
|
||||
+ (unsigned) connectionsCountWithInPort: (InPort*)aPort
|
||||
{
|
||||
unsigned count = 0;
|
||||
id o;
|
||||
|
@ -315,13 +318,13 @@ static int messages_received_count;
|
|||
return [self rootProxyAtPort: p];
|
||||
}
|
||||
|
||||
+ (Proxy*) rootProxyAtPort: (Port*)anOutPort
|
||||
+ (Proxy*) rootProxyAtPort: (OutPort*)anOutPort
|
||||
{
|
||||
id newInPort = [default_in_port_class newForReceiving];
|
||||
return [self rootProxyAtPort: anOutPort withInPort: newInPort];
|
||||
}
|
||||
|
||||
+ (Proxy*) rootProxyAtPort: (Port*)anOutPort withInPort: (Port*)anInPort
|
||||
+ (Proxy*) rootProxyAtPort: (OutPort*)anOutPort withInPort: (InPort*)anInPort
|
||||
{
|
||||
Connection *newConn = [self newForInPort:anInPort
|
||||
outPort:anOutPort
|
||||
|
@ -336,7 +339,7 @@ static int messages_received_count;
|
|||
|
||||
/* This is the designated initializer for Connection */
|
||||
|
||||
+ (Connection*) newForInPort: (Port*)ip outPort: (Port*)op
|
||||
+ (Connection*) newForInPort: (InPort*)ip outPort: (OutPort*)op
|
||||
ancestorConnection: (Connection*)ancestor;
|
||||
{
|
||||
Connection *newConn;
|
||||
|
@ -409,6 +412,7 @@ static int messages_received_count;
|
|||
newConn->out_port_class = default_out_port_class;
|
||||
}
|
||||
newConn->delay_dialog_interruptions = YES;
|
||||
newConn->reply_depth = 0;
|
||||
newConn->delegate = nil;
|
||||
|
||||
/* Here ask the delegate for permission. */
|
||||
|
@ -556,7 +560,7 @@ static int messages_received_count;
|
|||
{
|
||||
assert(ip != (id)-1);
|
||||
if (!ip)
|
||||
ip = [self newReceivedReplyRmcWithSequenceNumber:seq_num];
|
||||
ip = [self _getReceivedReplyRmcWithSequenceNumber:seq_num];
|
||||
[ip decodeValueOfObjCType:type at:datum withName:NULL];
|
||||
if (argnum == last_argnum)
|
||||
{
|
||||
|
@ -646,7 +650,7 @@ static int messages_received_count;
|
|||
sequenceNumber: seq_num
|
||||
identifier: ROOTPROXY_REQUEST];
|
||||
[op dismiss];
|
||||
ip = [self newReceivedReplyRmcWithSequenceNumber: seq_num];
|
||||
ip = [self _getReceivedReplyRmcWithSequenceNumber: seq_num];
|
||||
[ip decodeObjectAt: &newProxy withName: NULL];
|
||||
assert (class_is_kind_of (newProxy->isa, objc_get_class ("Proxy")));
|
||||
[ip dismiss];
|
||||
|
@ -711,7 +715,7 @@ static int messages_received_count;
|
|||
at:&target
|
||||
withName:NULL];
|
||||
[op dismiss];
|
||||
ip = [self newReceivedReplyRmcWithSequenceNumber:seq_num];
|
||||
ip = [self _getReceivedReplyRmcWithSequenceNumber:seq_num];
|
||||
[ip decodeValueOfCType:@encode(char*)
|
||||
at:&type
|
||||
withName:NULL];
|
||||
|
@ -760,15 +764,157 @@ static int messages_received_count;
|
|||
|
||||
/* Running the connection, getting/sending requests/replies. */
|
||||
|
||||
- (void) runConnection
|
||||
- (void) addToRunLoop: run_loop forMode: (id <String>)mode
|
||||
{
|
||||
[self runConnectionWithTimeout: -1];
|
||||
[in_port addToRunLoop: run_loop forMode: mode];
|
||||
[in_port setReceivedPacketInvocation: (id)self];
|
||||
}
|
||||
|
||||
/* to < 0 will never time out */
|
||||
- (void) runConnectionWithTimeout: (int)to
|
||||
- (void) removeFromRunLoop: run_loop forMode: (id <String>)mode
|
||||
{
|
||||
[self _serviceReceivedRequestsWithTimeout: to];
|
||||
[in_port removeFromRunLoop: run_loop forMode: mode];
|
||||
}
|
||||
|
||||
- (void) runConnectionUntilDate: date
|
||||
{
|
||||
[self addToRunLoop: [RunLoop currentInstance] forMode: nil];
|
||||
[RunLoop runUntilDate: date];
|
||||
}
|
||||
|
||||
- (void) runConnection
|
||||
{
|
||||
[self runConnectionUntilDate: [NSDate distantFuture]];
|
||||
}
|
||||
|
||||
- (void) _handleRmc: rmc
|
||||
{
|
||||
switch ([rmc identifier])
|
||||
{
|
||||
case ROOTPROXY_REQUEST:
|
||||
/* It won't take much time to handle this, so go ahead and service
|
||||
it, even if we are waiting for a reply. */
|
||||
[[rmc connection] _service_rootObject: rmc];
|
||||
[rmc dismiss];
|
||||
break;
|
||||
case METHODTYPE_REQUEST:
|
||||
/* It won't take much time to handle this, so go ahead and service
|
||||
it, even if we are waiting for a reply. */
|
||||
[[rmc connection] _service_typeForSelector: rmc];
|
||||
[rmc dismiss];
|
||||
break;
|
||||
case METHOD_REQUEST:
|
||||
if (([rmc connection] == self)
|
||||
&& (reply_depth == 0
|
||||
|| !delay_dialog_interruptions))
|
||||
{
|
||||
[[rmc connection] _service_forwardForProxy: rmc];
|
||||
/* Service any requests that were queued while we
|
||||
were waiting for replies.
|
||||
xxx Is this the right place for this check? */
|
||||
if (reply_depth == 0)
|
||||
[self _handleQueuedRmcRequests];
|
||||
}
|
||||
else
|
||||
{
|
||||
[received_request_rmc_queue_gate lock];
|
||||
[received_request_rmc_queue enqueueObject: rmc];
|
||||
[received_request_rmc_queue_gate unlock];
|
||||
}
|
||||
break;
|
||||
case ROOTPROXY_REPLY:
|
||||
case METHOD_REPLY:
|
||||
case METHODTYPE_REPLY:
|
||||
/* Remember multi-threaded callbacks will have to be handled specially */
|
||||
[received_reply_rmc_queue_gate lock];
|
||||
[received_reply_rmc_queue enqueueObject: rmc];
|
||||
[received_reply_rmc_queue_gate unlock];
|
||||
break;
|
||||
case CONNECTION_SHUTDOWN:
|
||||
{
|
||||
[[rmc connection] _service_shutdown: rmc forConnection: self];
|
||||
break;
|
||||
}
|
||||
default:
|
||||
[self error:"unrecognized ConnectedDecoder identifier"];
|
||||
}
|
||||
}
|
||||
|
||||
- (void) _handleQueuedRmcRequests
|
||||
{
|
||||
id rmc;
|
||||
|
||||
[received_request_rmc_queue_gate lock];
|
||||
while ((rmc = [received_request_rmc_queue dequeueObject]))
|
||||
{
|
||||
[received_request_rmc_queue_gate unlock];
|
||||
[self _handleRmc: rmc];
|
||||
[received_request_rmc_queue_gate lock];
|
||||
}
|
||||
[received_request_rmc_queue_gate unlock];
|
||||
}
|
||||
|
||||
/* Deal with an RMC, either by queuing it for later service, or
|
||||
by servicing it right away. This method is called by the
|
||||
in_port's received-packet-invocation. */
|
||||
|
||||
/* Look for it on the queue, if it is not there, return nil. */
|
||||
- _getReceivedReplyRmcFromQueueWithSequenceNumber: (int)sn
|
||||
{
|
||||
id the_rmc = nil;
|
||||
unsigned count, i;
|
||||
|
||||
[received_reply_rmc_queue_gate lock];
|
||||
|
||||
count = [received_reply_rmc_queue count];
|
||||
/* xxx There should be a per-thread queue of rmcs so we can do
|
||||
callbacks when multi-threaded. */
|
||||
for (i = 0; i < count; i++)
|
||||
{
|
||||
id a_rmc = [received_reply_rmc_queue objectAtIndex: i];
|
||||
if ([a_rmc connection] == self
|
||||
&& [a_rmc sequenceNumber] == sn)
|
||||
{
|
||||
if (debug_connection)
|
||||
printf("Getting received reply from queue\n");
|
||||
[received_reply_rmc_queue removeObjectAtIndex: i];
|
||||
the_rmc = a_rmc;
|
||||
break;
|
||||
}
|
||||
/* xxx Make sure that there isn't a higher sequenceNumber, meaning
|
||||
that we somehow dropped a packet. */
|
||||
}
|
||||
[received_reply_rmc_queue_gate unlock];
|
||||
return the_rmc;
|
||||
}
|
||||
|
||||
/* Check the queue, then try to get it from the network by waiting
|
||||
while we run the RunLoop. Return nil if we don't get anything
|
||||
before timing out. */
|
||||
- _getReceivedReplyRmcWithSequenceNumber: (int)sn
|
||||
{
|
||||
id rmc;
|
||||
id timeout_date = nil;
|
||||
|
||||
reply_depth++;
|
||||
while (!(rmc = [self _getReceivedReplyRmcFromQueueWithSequenceNumber: sn]))
|
||||
{
|
||||
if (!timeout_date)
|
||||
timeout_date = [[NSDate alloc]
|
||||
initWithTimeIntervalSinceNow: in_timeout];
|
||||
[[RunLoop currentInstance] runOnceBeforeDate: timeout_date
|
||||
forMode: nil];
|
||||
}
|
||||
reply_depth--;
|
||||
return rmc;
|
||||
}
|
||||
|
||||
/* Sneaky, sneaky. See "sneaky" comment in TcpPort.m.
|
||||
This method will be called by InPort when I receives a new packet. */
|
||||
- (void) invokeWithObject: packet
|
||||
{
|
||||
[self _handleRmc:
|
||||
[ConnectedDecoder newDecodingWithPacket: packet
|
||||
connection: self]];
|
||||
}
|
||||
|
||||
- (int) _newMsgNumber
|
||||
|
@ -782,205 +928,6 @@ static int messages_received_count;
|
|||
return n;
|
||||
}
|
||||
|
||||
/* We not going to get one from a queue; actually go to the network and
|
||||
wait for it. */
|
||||
- _newReceivedRmcWithTimeout: (int)to
|
||||
{
|
||||
id rmc;
|
||||
|
||||
assert (is_valid);
|
||||
rmc = [[self decodingClass] newDecodingWithConnection: self
|
||||
timeout: to];
|
||||
/* If this times out, rmc will be nil. */
|
||||
return rmc;
|
||||
}
|
||||
|
||||
/* Waiting for incoming requests. */
|
||||
- _serviceReceivedRequestsWithTimeout: (int)to
|
||||
{
|
||||
id rmc;
|
||||
unsigned count;
|
||||
|
||||
assert (is_valid);
|
||||
for (;;)
|
||||
{
|
||||
#if 0
|
||||
if (debug_connection)
|
||||
printf("%s\n", sel_get_name(_cmd));
|
||||
#endif
|
||||
|
||||
/* Get a rmc, either from off the queue, or from the network. */
|
||||
[received_request_rmc_queue_gate lock];
|
||||
count = [received_request_rmc_queue count];
|
||||
if (count)
|
||||
{
|
||||
if (debug_connection)
|
||||
printf ("Getting received request from queue\n");
|
||||
rmc = [received_request_rmc_queue dequeueObject];
|
||||
[received_request_rmc_queue_gate unlock];
|
||||
}
|
||||
else
|
||||
{
|
||||
[received_request_rmc_queue_gate unlock];
|
||||
rmc = [self _newReceivedRmcWithTimeout:to];
|
||||
}
|
||||
|
||||
/* If we timed out, just return. */
|
||||
if (!rmc) return self; /* timed out */
|
||||
assert([rmc isKindOf: [Decoder class]]);
|
||||
|
||||
/* We got a rmc; process it */
|
||||
switch ([rmc identifier])
|
||||
{
|
||||
case ROOTPROXY_REQUEST:
|
||||
[[rmc connection] _service_rootObject: rmc];
|
||||
[rmc dismiss];
|
||||
break;
|
||||
case ROOTPROXY_REPLY:
|
||||
[self error: "Got ROOTPROXY reply when looking for request"];
|
||||
break;
|
||||
case METHOD_REQUEST:
|
||||
[[rmc connection] _service_forwardForProxy: rmc];
|
||||
break;
|
||||
case METHOD_REPLY:
|
||||
/* Will this ever happen?
|
||||
Yes, with multi-threaded callbacks */
|
||||
[received_reply_rmc_queue_gate lock];
|
||||
[received_reply_rmc_queue enqueueObject: rmc];
|
||||
[received_reply_rmc_queue_gate unlock];
|
||||
break;
|
||||
case METHODTYPE_REQUEST:
|
||||
[[rmc connection] _service_typeForSelector: rmc];
|
||||
[rmc dismiss];
|
||||
break;
|
||||
case METHODTYPE_REPLY:
|
||||
/* Will this ever happen?
|
||||
Yes, with multi-threaded callbacks */
|
||||
[received_reply_rmc_queue_gate lock];
|
||||
[received_reply_rmc_queue enqueueObject: rmc];
|
||||
[received_reply_rmc_queue_gate unlock];
|
||||
break;
|
||||
case CONNECTION_SHUTDOWN:
|
||||
{
|
||||
[[rmc connection] _service_shutdown: rmc forConnection: self];
|
||||
break;
|
||||
}
|
||||
default:
|
||||
[self error:"unrecognized ConnectedDecoder identifier"];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* waiting for a reply to a request. */
|
||||
- newReceivedReplyRmcWithSequenceNumber: (int)n
|
||||
{
|
||||
id rmc, aRmc;
|
||||
unsigned count, i;
|
||||
|
||||
assert (is_valid);
|
||||
again:
|
||||
|
||||
/* Get a rmc */
|
||||
rmc = nil;
|
||||
[received_reply_rmc_queue_gate lock];
|
||||
/* Check to see if what we are looking for is on the queue. */
|
||||
count = [received_reply_rmc_queue count];
|
||||
/* xxx There should be a per-thread queue of rmcs so we can do
|
||||
callbacks when multi-threaded. */
|
||||
for (i = 0; i < count; i++)
|
||||
{
|
||||
aRmc = [received_reply_rmc_queue objectAtIndex: i];
|
||||
if ([aRmc connection] == self
|
||||
&& [aRmc sequenceNumber] == n)
|
||||
{
|
||||
if (debug_connection)
|
||||
printf("Getting received reply from queue\n");
|
||||
[received_reply_rmc_queue removeObjectAtIndex:i];
|
||||
rmc = aRmc;
|
||||
break;
|
||||
}
|
||||
}
|
||||
[received_reply_rmc_queue_gate unlock];
|
||||
|
||||
/* What we needed was not on the queue, get it from the network. */
|
||||
if (rmc == nil)
|
||||
rmc = [self _newReceivedRmcWithTimeout:in_timeout];
|
||||
|
||||
/* We timed out on the network. */
|
||||
if (rmc == nil)
|
||||
{
|
||||
/* We timed out */
|
||||
[self error:"connection timed out after waiting %d milliseconds "
|
||||
"for a reply",
|
||||
in_timeout];
|
||||
/* Eventually we need to change this from crashing to
|
||||
connection invalidating? I want to use gcc exceptions for this. */
|
||||
}
|
||||
|
||||
/* Process the rmc we got */
|
||||
switch ([rmc identifier])
|
||||
{
|
||||
case ROOTPROXY_REQUEST:
|
||||
[self _service_rootObject: rmc];
|
||||
[rmc dismiss];
|
||||
break;
|
||||
case METHODTYPE_REQUEST:
|
||||
[self _service_typeForSelector: rmc];
|
||||
[rmc dismiss];
|
||||
break;
|
||||
case ROOTPROXY_REPLY:
|
||||
case METHOD_REPLY:
|
||||
case METHODTYPE_REPLY:
|
||||
/* We got a reply... */
|
||||
if ([rmc connection] != self)
|
||||
{
|
||||
/* ... but it wasn't for us; enqueue it. */
|
||||
[received_reply_rmc_queue_gate lock];
|
||||
[received_reply_rmc_queue enqueueObject:rmc];
|
||||
[received_reply_rmc_queue_gate unlock];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* ... and it's for us; make sure the sequence is right. */
|
||||
if ([rmc sequenceNumber] != n)
|
||||
[self error:"sequence number mismatch %d != %d\n",
|
||||
[rmc sequenceNumber], n];
|
||||
if (debug_connection)
|
||||
printf("received reply number %d\n", n);
|
||||
/* ... and all checks out; return it. */
|
||||
return rmc;
|
||||
}
|
||||
break;
|
||||
case METHOD_REQUEST:
|
||||
/* We got a new request while waiting for a reply.
|
||||
We can either
|
||||
(1) honor new requests from other connections immediately, or
|
||||
(2) just queue them. */
|
||||
if (delay_dialog_interruptions && [rmc connection] != self)
|
||||
{
|
||||
/* Here we queue them */
|
||||
[received_request_rmc_queue_gate lock];
|
||||
[received_request_rmc_queue enqueueObject:rmc];
|
||||
[received_request_rmc_queue_gate unlock];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Here we honor them right away */
|
||||
[self _service_forwardForProxy: rmc];
|
||||
}
|
||||
break;
|
||||
case CONNECTION_SHUTDOWN:
|
||||
{
|
||||
[[rmc connection] _service_shutdown: rmc forConnection: self];
|
||||
break;
|
||||
}
|
||||
default:
|
||||
[self error:"unrecognized ConnectedDecoder identifier"];
|
||||
}
|
||||
goto again;
|
||||
|
||||
return rmc;
|
||||
}
|
||||
|
||||
|
||||
/* Managing objects and proxies. */
|
||||
|
@ -1123,7 +1070,7 @@ static int messages_received_count;
|
|||
|
||||
|
||||
/* Pass nil to remove any reference keyed by aPort. */
|
||||
+ (void) setRootObject: anObj forInPort: (Port*)aPort
|
||||
+ (void) setRootObject: anObj forInPort: (InPort*)aPort
|
||||
{
|
||||
id oldRootObject = [self rootObjectForInPort: aPort];
|
||||
|
||||
|
@ -1146,7 +1093,7 @@ static int messages_received_count;
|
|||
}
|
||||
}
|
||||
|
||||
+ rootObjectForInPort: (Port*)aPort
|
||||
+ rootObjectForInPort: (InPort*)aPort
|
||||
{
|
||||
id ro;
|
||||
|
||||
|
@ -1259,6 +1206,7 @@ static int messages_received_count;
|
|||
xref = NSCountMapTable (outgoing_const_ptr_2_xref) + 1;
|
||||
assert (! NSMapGet (outgoing_const_ptr_2_xref, (void*)xref));
|
||||
NSMapInsert (outgoing_const_ptr_2_xref, ptr, (void*)xref);
|
||||
return xref;
|
||||
}
|
||||
|
||||
- (unsigned) _encoderReferenceForConstPtr: (const void*)ptr
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue