diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 331 |
1 files changed, 247 insertions, 84 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 6e9c580ec6d..6cb2d445f0d 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -135,6 +135,7 @@ #include "storage/sinval.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/hashutils.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/snapmgr.h" @@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ /* * State for outbound notifies consists of a list of all channels+payloads - * NOTIFYed in the current transaction. We do not actually perform a NOTIFY - * until and unless the transaction commits. pendingNotifies is NIL if no - * NOTIFYs have been done in the current transaction. + * NOTIFYed in the current transaction. We do not actually perform a NOTIFY + * until and unless the transaction commits. pendingNotifies is NULL if no + * NOTIFYs have been done in the current (sub) transaction. + * + * We discard duplicate notify events issued in the same transaction. + * Hence, in addition to the list proper (which we need to track the order + * of the events, since we guarantee to deliver them in order), we build a + * hash table which we can probe to detect duplicates. Since building the + * hash table is somewhat expensive, we do so only once we have at least + * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction; + * before that we just scan the events linearly. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but - * successful subtransactions attach their lists to their parent's list. - * Failed subtransactions simply discard their lists. + * successful subtransactions add their entries to their parent's list. + * Failed subtransactions simply discard their lists. Since these lists + * are independent, there may be notify events in a subtransaction's list + * that duplicate events in some ancestor (sub) transaction; we get rid of + * the dups when merging the subtransaction's list into its parent's. * * Note: the action and notify lists do not interact within a transaction. * In particular, if a transaction does NOTIFY and then LISTEN on the same @@ -339,11 +351,26 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */ */ typedef struct Notification { - char *channel; /* channel name */ - char *payload; /* payload string (can be empty) */ + uint16 channel_len; /* length of channel-name string */ + uint16 payload_len; /* length of payload string */ + /* null-terminated channel name, then null-terminated payload follow */ + char data[FLEXIBLE_ARRAY_MEMBER]; } Notification; -static List *pendingNotifies = NIL; /* list of Notifications */ +typedef struct NotificationList +{ + List *events; /* list of Notification structs */ + HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ +} NotificationList; + +#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */ + +typedef struct NotificationHash +{ + Notification *event; /* => the actual Notification struct */ +} NotificationHash; + +static NotificationList *pendingNotifies = NULL; /* current list, if any */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ @@ -392,7 +419,10 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, Snapshot snapshot); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static bool AsyncExistsPendingNotify(const char *channel, const char *payload); +static bool AsyncExistsPendingNotify(Notification *n); +static void AddEventToPendingNotifies(Notification *n); +static uint32 notification_hash(const void *key, Size keysize); +static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); /* @@ -541,6 +571,8 @@ pg_notify(PG_FUNCTION_ARGS) void Async_Notify(const char *channel, const char *payload) { + size_t channel_len; + size_t payload_len; Notification *n; MemoryContext oldcontext; @@ -550,47 +582,67 @@ Async_Notify(const char *channel, const char *payload) if (Trace_notify) elog(DEBUG1, "Async_Notify(%s)", channel); + channel_len = channel ? strlen(channel) : 0; + payload_len = payload ? strlen(payload) : 0; + /* a channel name must be specified */ - if (!channel || !strlen(channel)) + if (channel_len == 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("channel name cannot be empty"))); - if (strlen(channel) >= NAMEDATALEN) + /* enforce length limits */ + if (channel_len >= NAMEDATALEN) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("channel name too long"))); - if (payload) - { - if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("payload string too long"))); - } - - /* no point in making duplicate entries in the list ... */ - if (AsyncExistsPendingNotify(channel, payload)) - return; + if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("payload string too long"))); /* + * We must construct the Notification entry, even if we end up not using + * it, in order to compare it cheaply to existing list entries. + * * The notification list needs to live until end of transaction, so store * it in the transaction context. */ oldcontext = MemoryContextSwitchTo(CurTransactionContext); - n = (Notification *) palloc(sizeof(Notification)); - n->channel = pstrdup(channel); + n = (Notification *) palloc(offsetof(Notification, data) + + channel_len + payload_len + 2); + n->channel_len = channel_len; + n->payload_len = payload_len; + strcpy(n->data, channel); if (payload) - n->payload = pstrdup(payload); + strcpy(n->data + channel_len + 1, payload); else - n->payload = ""; + n->data[channel_len + 1] = '\0'; - /* - * We want to preserve the order so we need to append every notification. - * See comments at AsyncExistsPendingNotify(). - */ - pendingNotifies = lappend(pendingNotifies, n); + /* Now check for duplicates */ + if (AsyncExistsPendingNotify(n)) + { + /* It's a dup, so forget it */ + pfree(n); + MemoryContextSwitchTo(oldcontext); + return; + } + + if (pendingNotifies == NULL) + { + /* First notify event in current (sub)xact */ + pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList)); + pendingNotifies->events = list_make1(n); + /* We certainly don't need a hashtable yet */ + pendingNotifies->hashtab = NULL; + } + else + { + /* Append more events to existing list */ + AddEventToPendingNotifies(n); + } MemoryContextSwitchTo(oldcontext); } @@ -761,7 +813,7 @@ PreCommit_Notify(void) { ListCell *p; - if (pendingActions == NIL && pendingNotifies == NIL) + if (!pendingActions && !pendingNotifies) return; /* no relevant statements in this xact */ if (Trace_notify) @@ -821,7 +873,7 @@ PreCommit_Notify(void) /* Now push the notifications into the queue */ backendHasSentNotifications = true; - nextNotify = list_head(pendingNotifies); + nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) { /* @@ -1267,8 +1319,8 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength) static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) { - size_t channellen = strlen(n->channel); - size_t payloadlen = strlen(n->payload); + size_t channellen = n->channel_len; + size_t payloadlen = n->payload_len; int entryLength; Assert(channellen < NAMEDATALEN); @@ -1281,8 +1333,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) qe->dboid = MyDatabaseId; qe->xid = GetCurrentTransactionId(); qe->srcPid = MyProcPid; - memcpy(qe->data, n->channel, channellen + 1); - memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1); + memcpy(qe->data, n->data, channellen + payloadlen + 2); } /* @@ -1294,7 +1345,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * database OID in order to fill the page. So every page is always used up to * the last byte which simplifies reading the page later. * - * We are passed the list cell (in pendingNotifies) containing the next + * We are passed the list cell (in pendingNotifies->events) containing the next * notification to write and return the first still-unwritten cell back. * Eventually we will return NULL indicating all is done. * @@ -1345,7 +1396,7 @@ asyncQueueAddEntries(ListCell *nextNotify) if (offset + qe.length <= QUEUE_PAGESIZE) { /* OK, so advance nextNotify past this item */ - nextNotify = lnext(pendingNotifies, nextNotify); + nextNotify = lnext(pendingNotifies->events, nextNotify); } else { @@ -1607,7 +1658,7 @@ AtSubStart_Notify(void) Assert(list_length(upperPendingNotifies) == GetCurrentTransactionNestLevel() - 1); - pendingNotifies = NIL; + pendingNotifies = NULL; MemoryContextSwitchTo(old_cxt); } @@ -1621,7 +1672,7 @@ void AtSubCommit_Notify(void) { List *parentPendingActions; - List *parentPendingNotifies; + NotificationList *parentPendingNotifies; parentPendingActions = linitial_node(List, upperPendingActions); upperPendingActions = list_delete_first(upperPendingActions); @@ -1634,16 +1685,41 @@ AtSubCommit_Notify(void) */ pendingActions = list_concat(parentPendingActions, pendingActions); - parentPendingNotifies = linitial_node(List, upperPendingNotifies); + parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); Assert(list_length(upperPendingNotifies) == GetCurrentTransactionNestLevel() - 2); - /* - * We could try to eliminate duplicates here, but it seems not worthwhile. - */ - pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies); + if (pendingNotifies == NULL) + { + /* easy, no notify events happened in current subxact */ + pendingNotifies = parentPendingNotifies; + } + else if (parentPendingNotifies == NULL) + { + /* easy, subxact's list becomes parent's */ + } + else + { + /* + * Formerly, we didn't bother to eliminate duplicates here, but now we + * must, else we fall foul of "Assert(!found)", either here or during + * a later attempt to build the parent-level hashtable. + */ + NotificationList *childPendingNotifies = pendingNotifies; + ListCell *l; + + pendingNotifies = parentPendingNotifies; + /* Insert all the subxact's events into parent, except for dups */ + foreach(l, childPendingNotifies->events) + { + Notification *childn = (Notification *) lfirst(l); + + if (!AsyncExistsPendingNotify(childn)) + AddEventToPendingNotifies(childn); + } + } } /* @@ -1672,7 +1748,7 @@ AtSubAbort_Notify(void) while (list_length(upperPendingNotifies) > my_level - 2) { - pendingNotifies = linitial_node(List, upperPendingNotifies); + pendingNotifies = (NotificationList *) linitial(upperPendingNotifies); upperPendingNotifies = list_delete_first(upperPendingNotifies); } } @@ -2098,52 +2174,139 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); } -/* Does pendingNotifies include the given channel/payload? */ +/* Does pendingNotifies include a match for the given event? */ static bool -AsyncExistsPendingNotify(const char *channel, const char *payload) +AsyncExistsPendingNotify(Notification *n) { - ListCell *p; - Notification *n; - - if (pendingNotifies == NIL) + if (pendingNotifies == NULL) return false; - if (payload == NULL) - payload = ""; + if (pendingNotifies->hashtab != NULL) + { + /* Use the hash table to probe for a match */ + if (hash_search(pendingNotifies->hashtab, + &n, + HASH_FIND, + NULL)) + return true; + } + else + { + /* Must scan the event list */ + ListCell *l; - /*---------- - * We need to append new elements to the end of the list in order to keep - * the order. However, on the other hand we'd like to check the list - * backwards in order to make duplicate-elimination a tad faster when the - * same condition is signaled many times in a row. So as a compromise we - * check the tail element first which we can access directly. If this - * doesn't match, we check the whole list. - * - * As we are not checking our parents' lists, we can still get duplicates - * in combination with subtransactions, like in: - * - * begin; - * notify foo '1'; - * savepoint foo; - * notify foo '1'; - * commit; - *---------- - */ - n = (Notification *) llast(pendingNotifies); - if (strcmp(n->channel, channel) == 0 && - strcmp(n->payload, payload) == 0) - return true; + foreach(l, pendingNotifies->events) + { + Notification *oldn = (Notification *) lfirst(l); + + if (n->channel_len == oldn->channel_len && + n->payload_len == oldn->payload_len && + memcmp(n->data, oldn->data, + n->channel_len + n->payload_len + 2) == 0) + return true; + } + } + + return false; +} + +/* + * Add a notification event to a pre-existing pendingNotifies list. + * + * Because pendingNotifies->events is already nonempty, this works + * correctly no matter what CurrentMemoryContext is. + */ +static void +AddEventToPendingNotifies(Notification *n) +{ + Assert(pendingNotifies->events != NIL); - foreach(p, pendingNotifies) + /* Create the hash table if it's time to */ + if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && + pendingNotifies->hashtab == NULL) { - n = (Notification *) lfirst(p); + HASHCTL hash_ctl; + ListCell *l; + + /* Create the hash table */ + MemSet(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = sizeof(Notification *); + hash_ctl.entrysize = sizeof(NotificationHash); + hash_ctl.hash = notification_hash; + hash_ctl.match = notification_match; + hash_ctl.hcxt = CurTransactionContext; + pendingNotifies->hashtab = + hash_create("Pending Notifies", + 256L, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); + + /* Insert all the already-existing events */ + foreach(l, pendingNotifies->events) + { + Notification *oldn = (Notification *) lfirst(l); + NotificationHash *hentry; + bool found; + + hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, + &oldn, + HASH_ENTER, + &found); + Assert(!found); + hentry->event = oldn; + } + } - if (strcmp(n->channel, channel) == 0 && - strcmp(n->payload, payload) == 0) - return true; + /* Add new event to the list, in order */ + pendingNotifies->events = lappend(pendingNotifies->events, n); + + /* Add event to the hash table if needed */ + if (pendingNotifies->hashtab != NULL) + { + NotificationHash *hentry; + bool found; + + hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, + &n, + HASH_ENTER, + &found); + Assert(!found); + hentry->event = n; } +} - return false; +/* + * notification_hash: hash function for notification hash table + * + * The hash "keys" are pointers to Notification structs. + */ +static uint32 +notification_hash(const void *key, Size keysize) +{ + const Notification *k = *(const Notification *const *) key; + + Assert(keysize == sizeof(Notification *)); + /* We don't bother to include the payload's trailing null in the hash */ + return DatumGetUInt32(hash_any((const unsigned char *) k->data, + k->channel_len + k->payload_len + 1)); +} + +/* + * notification_match: match function to use with notification_hash + */ +static int +notification_match(const void *key1, const void *key2, Size keysize) +{ + const Notification *k1 = *(const Notification *const *) key1; + const Notification *k2 = *(const Notification *const *) key2; + + Assert(keysize == sizeof(Notification *)); + if (k1->channel_len == k2->channel_len && + k1->payload_len == k2->payload_len && + memcmp(k1->data, k2->data, + k1->channel_len + k1->payload_len + 2) == 0) + return 0; /* equal */ + return 1; /* not equal */ } /* Clear the pendingActions and pendingNotifies lists. */ @@ -2158,5 +2321,5 @@ ClearPendingActionsAndNotifies(void) * pointers. */ pendingActions = NIL; - pendingNotifies = NIL; + pendingNotifies = NULL; } |