Change notification code to de-duplicate notifications if/when postgres sends many copies of the same notification at the same time. Also change the code to queue notifications in the thread that received them (as documented) and only queue them in the main thread if the receiving thread does not have an active run loop.

This commit is contained in:
Richard Frith-Macdonald 2020-04-21 10:51:54 +01:00
parent 535914366a
commit dc7955472e
3 changed files with 123 additions and 36 deletions

View file

@ -1,3 +1,11 @@
2020-04-21 Richard Frith-Macdonald <rfm@gnu.org>
* Postgres.m: Change notification code to de-duplicate notifications
if/when postgres sends many copies of the same notification at the
same time. Also change the code to queue notifications in the thread
that received them (as documented) and only queue them in the main
thread if the receiving thread does not have an active run loop.
2020-04-06 Wolfgang Lux <wolfgang.lux@gmail.com>
* SQLClient.m(initWithConfiguration:name:pool:): Eventually

View file

@ -581,6 +581,10 @@ connectQuote(NSString *str)
/* Post asynchronously
*/
if ([self debugging] > 0)
{
[self debug: @"Notified (database): %@", n];
}
nq = [NSNotificationQueue defaultQueue];
[nq enqueueNotification: n
postingStyle: NSPostASAP
@ -588,20 +592,60 @@ connectQuote(NSString *str)
forModes: nil];
}
- (void) _postNotifications: (NSArray*)notifications
{
NSUInteger count = [notifications count];
NSUInteger index;
for (index = 0; index < count; index++)
{
NSNotification *n = [notifications objectAtIndex: index];
NS_DURING
{
[self _postNotification: n];
}
NS_HANDLER
{
NSLog(@"Problem posting notification: %@ %@",
n, localException);
}
NS_ENDHANDLER
}
}
/* This method must only be called when the receiver is locked.
*/
- (void) _checkNotifications: (BOOL)async
{
PGnotify *notify;
NSMutableArray *notifications = nil;
PGnotify *notify;
/* While postgres sometimes de-duplicates notifications it is not guaranteed
* that it will do so, and it is therefore possible for the database server
* to send many duplicate notifications.
* So we read the notifications and add them to an array only if they are not
* already present.
*/
while ((notify = PQnotifies(connection)) != 0)
{
NS_DURING
{
static NSNumber *nY = nil;
static NSNumber *nN = nil;
NSNotification *n;
NSMutableDictionary *userInfo;
NSString *name;
if (nil == nN)
{
ASSIGN(nN, [NSNumber numberWithBool: NO]);
}
if (nil == nY)
{
ASSIGN(nY, [NSNumber numberWithBool: YES]);
}
name = [[NSString alloc] initWithUTF8String: notify->relname];
userInfo = [[NSMutableDictionary alloc] initWithCapacity: 2];
if (0 != notify->extra)
@ -612,58 +656,92 @@ connectQuote(NSString *str)
if (nil != payload)
{
[userInfo setObject: payload forKey: @"Payload"];
[payload release];
RELEASE(payload);
}
}
if (notify->be_pid == backendPID)
{
static NSNumber *nY = nil;
if (nil == nY)
{
nY = [[NSNumber numberWithBool: YES] retain];
}
[userInfo setObject: nY forKey: @"Local"];
}
else
{
static NSNumber *nN = nil;
if (nil == nN)
{
nN = [[NSNumber numberWithBool: NO] retain];
}
[userInfo setObject: nN forKey: @"Local"];
}
if (YES == async)
{
[userInfo setObject: nY forKey: @"Async"];
}
else
{
[userInfo setObject: nN forKey: @"Async"];
}
n = [NSNotification notificationWithName: name
object: self
userInfo: (NSDictionary*)userInfo];
[name release];
[userInfo release];
if ([self debugging] > 0)
{
if (YES == async)
{
[self debug: @"Notified (asynchronously): %@", n];
}
else
{
[self debug: @"Notified (query/execute): %@", n];
}
if (nil == notifications)
{
notifications = [[NSMutableArray alloc] initWithCapacity: 10];
}
[self performSelectorOnMainThread: @selector(_postNotification:)
withObject: n
waitUntilDone: NO];
if (NO == [notifications containsObject: n])
{
[notifications addObject: n];
}
RELEASE(name);
RELEASE(userInfo);
}
NS_HANDLER
{
NSLog(@"Problem handling asynchronous notification: %@",
localException);
NSLog(@"Problem handling %@ notification: %@",
(async ? @"asynchronous" : @"query/execute"), localException);
}
NS_ENDHANDLER
PQfreemem(notify);
}
/* Now that we have read all the available notifications from the database,
* we post them locally in the current thread (if its run loop is active)
* or the main thread.
*/
if (notifications != nil)
{
if ([[NSRunLoop currentRunLoop] currentMode] == nil)
{
if ([self debugging] > 0)
{
[self debug: @"Notifying (main thread): %@", notifications];
}
NS_DURING
{
[self performSelectorOnMainThread: @selector(_postNotifications:)
withObject: notifications
waitUntilDone: NO];
}
NS_HANDLER
{
NSLog(@"Problem posting to main thread: %@ %@",
notifications, localException);
}
NS_ENDHANDLER
}
else
{
if ([self debugging] > 0)
{
[self debug: @"Notifying (receiving thread): %@", notifications];
}
NS_DURING
{
[self _postNotifications: notifications];
}
NS_HANDLER
{
NSLog(@"Problem posting in receiving thread: %@ %@",
notifications, localException);
}
NS_ENDHANDLER
}
RELEASE(notifications);
}
}
- (NSInteger) backendExecute: (NSArray*)info

View file

@ -1401,10 +1401,11 @@ SQLCLIENT_PRIVATE
* If the 'Payload' value is not nil, then it is a string providing extra
* information about the notification.<br />
* Notifications are posted asynchronously using the default notification
* queue for the current thread, so they should be delivered to the
* observer after the database statement in which they were detected
* queue for the thread which receives them (or the main thread if the
* receiving thread run loop is not active), so they should be delivered to
* the observer after the database statement in which they were detected
* has completed. However, delivery of the notification could still
* occur inside a transaction is the -begin and -commit statements
* occur inside a transaction if the -begin and -commit statements
* are used. For this reason, observing code may want to use the
* -lockBeforeDate: -isInTransaction and -unlock methods to ensure
* that they don't interfere with ongoing transactions.<br />
@ -1434,7 +1435,7 @@ SQLCLIENT_PRIVATE
* If anObserver is nil, the removal will be performed for all current
* observers (and if both name and anObserver are nil, then all observations
* are removed).<br />
* Any atempt to remove a non existent observation is silently ignored.
* Any attempt to remove a non existent observation is silently ignored.
*/
- (void) removeObserver: (id)anObserver name: (NSString*)name;
@end