Для приготовления асинхронных уведомлений listen/notify в реплике нам понадобится postgres. Как говорится в документации:
Транзакции, запущенные в режиме горячего резерва, никогда не получают ID транзакции и не могут быть записаны в журнал предзаписи. Поэтому при попытке выполнить следующие действия возникнут ошибки:
LISTEN
,NOTIFY
Поэтому берём файл async.c файл из исходников, переименовываем в нём все публичные методы (не static-функции), удаляем связь с транзакциями и добавляем обработку сигнала SIGUSR1, чтобы получилось так:
src/backend/commands/async.c
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5739d2b40f..9f62d4ca6b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1,3 +1,5 @@
+#include <include.h>
+
/*-------------------------------------------------------------------------
*
* async.c
@@ -46,7 +48,7 @@
* to. In case there is a match it delivers the notification event to its
* frontend. Non-matching events are simply skipped.
*
- * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
+ * 4. The NOTIFY statement (routine Async_Notify_My) stores the notification in
* a backend-local list which will not be processed until transaction end.
*
* Duplicate notifications from the same transaction are sent out as one
@@ -56,7 +58,7 @@
* that has been sent, it can easily add some unique string into the extra
* payload parameter.
*
- * When the transaction is ready to commit, PreCommit_Notify() adds the
+ * When the transaction is ready to commit, PreCommit_Notify_My() adds the
* pending notifications to the head of the queue. The head pointer of the
* queue always points to the next free position and a position is just a
* page number and the offset in that page. This is done before marking the
@@ -67,7 +69,7 @@
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
- * After commit we are called another time (AtCommit_Notify()). Here we
+ * After commit we are called another time (AtCommit_Notify_My()). Here we
* make the actual updates to the effective listen state (listenChannels).
*
* Finally, after we are out of the transaction altogether, we check if
@@ -171,7 +173,7 @@ typedef struct AsyncQueueEntry
{
int length; /* total allocated length of entry */
Oid dboid; /* sender's database OID */
- TransactionId xid; /* sender's XID */
+// TransactionId xid; /* sender's XID */
int32 srcPid; /* sender's PID */
char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;
@@ -414,14 +416,16 @@ typedef struct NotificationHash
static NotificationList *pendingNotifies = NULL;
+static pqsigfunc pg_async_signal_original = NULL;
+
/*
- * Inbound notifications are initially processed by HandleNotifyInterrupt(),
+ * Inbound notifications are initially processed by HandleNotifyInterruptMy(),
* called from inside a signal handler. That just sets the
* notifyInterruptPending flag and sets the process
- * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
+ * latch. ProcessNotifyInterruptMy() will then be called whenever it's safe to
* actually deal with the interrupt.
*/
-volatile sig_atomic_t notifyInterruptPending = false;
+//volatile sig_atomic_t notifyInterruptPending = false;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false;
@@ -436,7 +440,7 @@ static bool backendHasSentNotifications = false;
static bool backendTryAdvanceTail = false;
/* GUC parameter */
-bool Trace_notify = false;
+//bool Trace_notify = false;
/* local function prototypes */
static int asyncQueuePageDiff(int p, int q);
@@ -469,6 +473,12 @@ static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+static void pg_async_signal(SIGNAL_ARGS) {
+ HandleNotifyInterruptMy();
+ if (notifyInterruptPending) ProcessNotifyInterruptMy();
+ pg_async_signal_original(postgres_signal_arg);
+}
+
/*
* Compute the difference between two queue page numbers (i.e., p - q),
* accounting for wraparound.
@@ -509,11 +519,11 @@ asyncQueuePagePrecedes(int p, int q)
* Report space needed for our shared memory area
*/
Size
-AsyncShmemSize(void)
+AsyncShmemSizeMy(void)
{
Size size;
- /* This had better match AsyncShmemInit */
+ /* This had better match AsyncShmemInitMy */
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
@@ -526,7 +536,7 @@ AsyncShmemSize(void)
* Initialize our shared memory area
*/
void
-AsyncShmemInit(void)
+AsyncShmemInitMy(void)
{
bool found;
Size size;
@@ -585,7 +595,7 @@ AsyncShmemInit(void)
* SQL function to send a notification event
*/
Datum
-pg_notify(PG_FUNCTION_ARGS)
+pg_notify_my(PG_FUNCTION_ARGS)
{
const char *channel;
const char *payload;
@@ -601,16 +611,16 @@ pg_notify(PG_FUNCTION_ARGS)
payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
/* For NOTIFY as a statement, this is checked in ProcessUtility */
- PreventCommandDuringRecovery("NOTIFY");
+// PreventCommandDuringRecovery("NOTIFY");
- Async_Notify(channel, payload);
+ Async_Notify_My(channel, payload);
PG_RETURN_VOID();
}
/*
- * Async_Notify
+ * Async_Notify_My
*
* This is executed by the SQL notify command.
*
@@ -619,7 +629,7 @@ pg_notify(PG_FUNCTION_ARGS)
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*/
void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify_My(const char *channel, const char *payload)
{
int my_level = GetCurrentTransactionNestLevel();
size_t channel_len;
@@ -631,7 +641,7 @@ Async_Notify(const char *channel, const char *payload)
elog(ERROR, "cannot send notifications from a parallel worker");
if (Trace_notify)
- elog(DEBUG1, "Async_Notify(%s)", channel);
+ elog(DEBUG1, "Async_Notify_My(%s)", channel);
channel_len = channel ? strlen(channel) : 0;
payload_len = payload ? strlen(payload) : 0;
@@ -679,7 +689,7 @@ Async_Notify(const char *channel, const char *payload)
/*
* First notify event in current (sub)xact. Note that we allocate the
* NotificationList in TopTransactionContext; the nestingLevel might
- * get changed later by AtSubCommit_Notify.
+ * get changed later by AtSubCommit_Notify_My.
*/
notifies = (NotificationList *)
MemoryContextAlloc(TopTransactionContext,
@@ -725,7 +735,7 @@ queue_listen(ListenActionKind action, const char *channel)
int my_level = GetCurrentTransactionNestLevel();
/*
- * Unlike Async_Notify, we don't try to collapse out duplicates. It would
+ * Unlike Async_Notify_My, we don't try to collapse out duplicates. It would
* be too complicated to ensure we get the right interactions of
* conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
* would be any performance benefit anyway in sane applications.
@@ -745,7 +755,7 @@ queue_listen(ListenActionKind action, const char *channel)
/*
* First action in current sub(xact). Note that we allocate the
* ActionList in TopTransactionContext; the nestingLevel might get
- * changed later by AtSubCommit_Notify.
+ * changed later by AtSubCommit_Notify_My.
*/
actions = (ActionList *)
MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
@@ -761,29 +771,29 @@ queue_listen(ListenActionKind action, const char *channel)
}
/*
- * Async_Listen
+ * Async_Listen_My
*
* This is executed by the SQL listen command.
*/
void
-Async_Listen(const char *channel)
+Async_Listen_My(const char *channel)
{
if (Trace_notify)
- elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Async_Listen_My(%s,%d)", channel, MyProcPid);
queue_listen(LISTEN_LISTEN, channel);
}
/*
- * Async_Unlisten
+ * Async_Unlisten_My
*
* This is executed by the SQL unlisten command.
*/
void
-Async_Unlisten(const char *channel)
+Async_Unlisten_My(const char *channel)
{
if (Trace_notify)
- elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+ elog(DEBUG1, "Async_Unlisten_My(%s,%d)", channel, MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NULL && !unlistenExitRegistered)
@@ -793,15 +803,15 @@ Async_Unlisten(const char *channel)
}
/*
- * Async_UnlistenAll
+ * Async_UnlistenAll_My
*
* This is invoked by UNLISTEN * command, and also at backend exit.
*/
void
-Async_UnlistenAll(void)
+Async_UnlistenAll_My(void)
{
if (Trace_notify)
- elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
+ elog(DEBUG1, "Async_UnlistenAll_My(%d)", MyProcPid);
/* If we couldn't possibly be listening, no need to queue anything */
if (pendingActions == NULL && !unlistenExitRegistered)
@@ -818,7 +828,7 @@ Async_UnlistenAll(void)
* change within a transaction.
*/
Datum
-pg_listening_channels(PG_FUNCTION_ARGS)
+pg_listening_channels_my(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
@@ -858,13 +868,13 @@ Async_UnlistenOnExit(int code, Datum arg)
}
/*
- * AtPrepare_Notify
+ * AtPrepare_Notify_My
*
* This is called at the prepare phase of a two-phase
* transaction. Save the state for possible commit later.
*/
void
-AtPrepare_Notify(void)
+AtPrepare_Notify_My(void)
{
/* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
if (pendingActions || pendingNotifies)
@@ -874,7 +884,7 @@ AtPrepare_Notify(void)
}
/*
- * PreCommit_Notify
+ * PreCommit_Notify_My
*
* This is called at transaction commit, before actually committing to
* clog.
@@ -889,7 +899,7 @@ AtPrepare_Notify(void)
* we can still throw error if we run out of queue space.
*/
void
-PreCommit_Notify(void)
+PreCommit_Notify_My(void)
{
ListCell *p;
@@ -897,7 +907,7 @@ PreCommit_Notify(void)
return; /* no relevant statements in this xact */
if (Trace_notify)
- elog(DEBUG1, "PreCommit_Notify");
+ elog(DEBUG1, "PreCommit_Notify_My");
/* Preflight for any pending listen/unlisten actions */
if (pendingActions != NULL)
@@ -932,7 +942,7 @@ PreCommit_Notify(void)
* so cheap if we don't, and we'd prefer not to do that work while
* holding NotifyQueueLock.
*/
- (void) GetCurrentTransactionId();
+// (void) GetCurrentTransactionId();
/*
* Serialize writers by acquiring a special lock that we hold till
@@ -951,7 +961,7 @@ PreCommit_Notify(void)
* used by the flatfiles mechanism.)
*/
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
- AccessExclusiveLock);
+ RowExclusiveLock);
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
@@ -984,14 +994,14 @@ PreCommit_Notify(void)
}
/*
- * AtCommit_Notify
+ * AtCommit_Notify_My
*
* This is called at transaction commit, after committing to clog.
*
* Update listenChannels and clear transaction-local state.
*/
void
-AtCommit_Notify(void)
+AtCommit_Notify_My(void)
{
ListCell *p;
@@ -1003,7 +1013,7 @@ AtCommit_Notify(void)
return;
if (Trace_notify)
- elog(DEBUG1, "AtCommit_Notify");
+ elog(DEBUG1, "AtCommit_Notify_My");
/* Perform any pending listen/unlisten actions */
if (pendingActions != NULL)
@@ -1036,7 +1046,7 @@ AtCommit_Notify(void)
}
/*
- * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
+ * Exec_ListenPreCommit --- subroutine for PreCommit_Notify_My
*
* This function must make sure we are ready to catch any incoming messages.
*/
@@ -1131,7 +1141,7 @@ Exec_ListenPreCommit(void)
}
/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * Exec_ListenCommit --- subroutine for AtCommit_Notify_My
*
* Add the channel to the list of channels we are listening on.
*/
@@ -1155,10 +1165,12 @@ Exec_ListenCommit(const char *channel)
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
MemoryContextSwitchTo(oldcontext);
+
+ if (!pg_async_signal_original) pg_async_signal_original = pqsignal(SIGUSR1, pg_async_signal);
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenCommit --- subroutine for AtCommit_Notify_My
*
* Remove the specified channel name from listenChannels.
*/
@@ -1186,10 +1198,15 @@ Exec_UnlistenCommit(const char *channel)
* We do not complain about unlistening something not being listened;
* should we?
*/
+
+ if (!list_length(listenChannels) && pg_async_signal_original) {
+ pqsignal(SIGUSR1, pg_async_signal_original);
+ pg_async_signal_original = NULL;
+ }
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify_My
*
* Unlisten on all channels for this backend.
*/
@@ -1201,10 +1218,15 @@ Exec_UnlistenAllCommit(void)
list_free_deep(listenChannels);
listenChannels = NIL;
+
+ if (pg_async_signal_original) {
+ pqsignal(SIGUSR1, pg_async_signal_original);
+ pg_async_signal_original = NULL;
+ }
}
/*
- * ProcessCompletedNotifies --- send out signals and self-notifies
+ * ProcessCompletedNotifiesMy --- send out signals and self-notifies
*
* This is called from postgres.c just before going idle at the completion
* of a transaction. If we issued any notifications in the just-completed
@@ -1213,10 +1235,10 @@ Exec_UnlistenAllCommit(void)
* Also, if we filled enough queue pages with new notifies, try to advance
* the queue tail pointer.
*
- * The reason that this is not done in AtCommit_Notify is that there is
+ * The reason that this is not done in AtCommit_Notify_My is that there is
* a nonzero chance of errors here (for example, encoding conversion errors
* while trying to format messages to our frontend). An error during
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
+ * AtCommit_Notify_My would be a PANIC condition. The timing is also arranged
* to ensure that a transaction's self-notifies are delivered to the frontend
* before it gets the terminating ReadyForQuery message.
*
@@ -1227,8 +1249,9 @@ Exec_UnlistenAllCommit(void)
* NOTE: we are outside of any transaction here.
*/
void
-ProcessCompletedNotifies(void)
+ProcessCompletedNotifiesMy(void)
{
+ bool idle = !IsTransactionOrTransactionBlock();
MemoryContext caller_context;
/* Nothing to do if we didn't send any notifications */
@@ -1249,12 +1272,13 @@ ProcessCompletedNotifies(void)
caller_context = CurrentMemoryContext;
if (Trace_notify)
- elog(DEBUG1, "ProcessCompletedNotifies");
+ elog(DEBUG1, "ProcessCompletedNotifiesMy");
/*
* We must run asyncQueueReadAllNotifications inside a transaction, else
* bad things happen if it gets an error.
*/
+ if (idle)
StartTransactionCommand();
/* Send signals to other backends */
@@ -1275,6 +1299,7 @@ ProcessCompletedNotifies(void)
asyncQueueAdvanceTail();
}
+ if (idle)
CommitTransactionCommand();
MemoryContextSwitchTo(caller_context);
@@ -1431,7 +1456,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
entryLength = QUEUEALIGN(entryLength);
qe->length = entryLength;
qe->dboid = MyDatabaseId;
- qe->xid = GetCurrentTransactionId();
+// qe->xid = GetCurrentTransactionId();
qe->srcPid = MyProcPid;
memcpy(qe->data, n->data, channellen + payloadlen + 2);
}
@@ -1567,7 +1592,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
* occupied.
*/
Datum
-pg_notification_queue_usage(PG_FUNCTION_ARGS)
+pg_notification_queue_usage_my(PG_FUNCTION_ARGS)
{
double usage;
@@ -1749,7 +1774,7 @@ SignalBackends(void)
}
/*
- * AtAbort_Notify
+ * AtAbort_Notify_My
*
* This is called at transaction abort.
*
@@ -1757,10 +1782,10 @@ SignalBackends(void)
* executed if the transaction got committed.
*/
void
-AtAbort_Notify(void)
+AtAbort_Notify_My(void)
{
/*
- * If we LISTEN but then roll back the transaction after PreCommit_Notify,
+ * If we LISTEN but then roll back the transaction after PreCommit_Notify_My,
* we have registered as a listener but have not made any entry in
* listenChannels. In that case, deregister again.
*/
@@ -1772,12 +1797,12 @@ AtAbort_Notify(void)
}
/*
- * AtSubCommit_Notify() --- Take care of subtransaction commit.
+ * AtSubCommit_Notify_My() --- Take care of subtransaction commit.
*
* Reassign all items in the pending lists to the parent transaction.
*/
void
-AtSubCommit_Notify(void)
+AtSubCommit_Notify_My(void)
{
int my_level = GetCurrentTransactionNestLevel();
@@ -1844,10 +1869,10 @@ AtSubCommit_Notify(void)
}
/*
- * AtSubAbort_Notify() --- Take care of subtransaction abort.
+ * AtSubAbort_Notify_My() --- Take care of subtransaction abort.
*/
void
-AtSubAbort_Notify(void)
+AtSubAbort_Notify_My(void)
{
int my_level = GetCurrentTransactionNestLevel();
@@ -1882,15 +1907,15 @@ AtSubAbort_Notify(void)
}
/*
- * HandleNotifyInterrupt
+ * HandleNotifyInterruptMy
*
* Signal handler portion of interrupt handling. Let the backend know
* that there's a pending notify interrupt. If we're currently reading
* from the client, this will interrupt the read and
- * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
+ * ProcessClientReadInterrupt() will call ProcessNotifyInterruptMy().
*/
void
-HandleNotifyInterrupt(void)
+HandleNotifyInterruptMy(void)
{
/*
* Note: this is called by a SIGNAL HANDLER. You must be very wary what
@@ -1905,18 +1930,18 @@ HandleNotifyInterrupt(void)
}
/*
- * ProcessNotifyInterrupt
+ * ProcessNotifyInterruptMy
*
* This is called if we see notifyInterruptPending set, just before
* transmitting ReadyForQuery at the end of a frontend command, and
* also if a notify signal occurs while reading from the frontend.
- * HandleNotifyInterrupt() will cause the read to be interrupted
+ * HandleNotifyInterruptMy() will cause the read to be interrupted
* via the process's latch, and this routine will get called.
* If we are truly idle (ie, *not* inside a transaction block),
* process the incoming notifies.
*/
void
-ProcessNotifyInterrupt(void)
+ProcessNotifyInterruptMy(void)
{
if (IsTransactionOrTransactionBlock())
return; /* not really idle */
@@ -1999,7 +2024,7 @@ asyncQueueReadAllNotifications(void)
* before we see them.
*----------
*/
- snapshot = RegisterSnapshot(GetLatestSnapshot());
+// snapshot = RegisterSnapshot(GetLatestSnapshot());
/*
* It is possible that we fail while trying to send a message to our
@@ -2078,7 +2103,7 @@ asyncQueueReadAllNotifications(void)
PG_END_TRY();
/* Done with snapshot */
- UnregisterSnapshot(snapshot);
+// UnregisterSnapshot(snapshot);
}
/*
@@ -2126,6 +2151,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
+#if 0
if (XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
@@ -2153,6 +2179,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
}
else if (TransactionIdDidCommit(qe->xid))
{
+#endif
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
@@ -2161,8 +2188,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* payload follows channel name */
char *payload = qe->data + strlen(channel) + 1;
- NotifyMyFrontEnd(channel, payload, qe->srcPid);
+ NotifyMyFrontEndMy(channel, payload, qe->srcPid);
}
+#if 0
}
else
{
@@ -2171,6 +2199,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
* ignore its notifications.
*/
}
+#endif
}
/* Loop back if we're not at end of page */
@@ -2271,6 +2300,7 @@ static void
ProcessIncomingNotify(void)
{
/* We *must* reset the flag */
+ bool idle = !IsTransactionOrTransactionBlock();
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
@@ -2286,10 +2316,12 @@ ProcessIncomingNotify(void)
* We must run asyncQueueReadAllNotifications inside a transaction, else
* bad things happen if it gets an error.
*/
+ if (idle)
StartTransactionCommand();
asyncQueueReadAllNotifications();
+ if (idle)
CommitTransactionCommand();
/*
@@ -2307,7 +2339,7 @@ ProcessIncomingNotify(void)
* Send NOTIFY message to my front end.
*/
void
-NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
+NotifyMyFrontEndMy(const char *channel, const char *payload, int32 srcPid)
{
if (whereToSendOutput == DestRemote)
{
Теперь делаем расширение для функций
pg_async--1.0.sql
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_async" to load this file. \quit
CREATE FUNCTION pg_listen(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_listen' LANGUAGE C;
CREATE FUNCTION pg_listening_channels() RETURNS setof pg_catalog.text STRICT AS 'MODULE_PATHNAME', 'pg_async_listening_channels' LANGUAGE C;
CREATE FUNCTION pg_notification_queue_usage() RETURNS pg_catalog.float8 STRICT AS 'MODULE_PATHNAME', 'pg_async_notification_queue_usage' LANGUAGE C;
CREATE FUNCTION pg_notify(channel pg_catalog.text default null, payload pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_notify' LANGUAGE C;
CREATE FUNCTION pg_unlisten_all() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten_all' LANGUAGE C;
CREATE FUNCTION pg_unlisten(channel pg_catalog.text default null) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME', 'pg_async_unlisten' LANGUAGE C;
Здесь к стандартным pg_listening_channels, pg_notification_queue_usage и pg_notify добавлены новые удобные функции pg_listen, pg_unlisten и pg_unlisten_all, дополняющие соответствующие команды LISTEN, UNLISTEN и UNLISTEN *.
Делаем реализацию этих функций, вызывая на ведущем оригинальные функции, а на реплике функции из изменённого скопированного файла async.c:
pg_async.c
#define EXTENSION(function) Datum (function)(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(function); Datum (function)(PG_FUNCTION_ARGS)
EXTENSION(pg_async_listen) {
const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
!XactReadOnly ? Async_Listen(channel) : Async_Listen_My(channel);
PG_RETURN_VOID();
}
EXTENSION(pg_async_listening_channels) {
return !XactReadOnly ? pg_listening_channels(fcinfo) : pg_listening_channels_my(fcinfo);
}
EXTENSION(pg_async_notification_queue_usage) {
return !XactReadOnly ? pg_notification_queue_usage(fcinfo) : pg_notification_queue_usage_my(fcinfo);
}
EXTENSION(pg_async_notify) {
return !XactReadOnly ? pg_notify(fcinfo) : pg_notify_my(fcinfo);
}
EXTENSION(pg_async_unlisten_all) {
!XactReadOnly ? Async_UnlistenAll() : Async_UnlistenAll_My();
PG_RETURN_VOID();
}
EXTENSION(pg_async_unlisten) {
const char *channel = PG_ARGISNULL(0) ? "" : text_to_cstring(PG_GETARG_TEXT_PP(0));
!XactReadOnly ? Async_Unlisten(channel) : Async_Unlisten_My(channel);
PG_RETURN_VOID();
}
Также, регистрируем хуки на выполнение команд, на транзакции и разделяемую память:
pg_async.c
static ProcessUtility_hook_type pg_async_ProcessUtility_hook_original = NULL;
static shmem_startup_hook_type pg_async_shmem_startup_hook_original = NULL;
void _PG_init(void); void _PG_init(void) {
if (!process_shared_preload_libraries_in_progress) return;
pg_async_ProcessUtility_hook_original = ProcessUtility_hook;
ProcessUtility_hook = pg_async_ProcessUtility_hook;
pg_async_shmem_startup_hook_original = shmem_startup_hook;
shmem_startup_hook = pg_async_shmem_startup_hook;
RequestAddinShmemSpace(AsyncShmemSizeMy());
RegisterSubXactCallback(pg_async_SubXactCallback, NULL);
RegisterXactCallback(pg_async_XactCallback, NULL);
}
void _PG_fini(void); void _PG_fini(void) {
ProcessUtility_hook = pg_async_ProcessUtility_hook_original;
shmem_startup_hook = pg_async_shmem_startup_hook_original;
UnregisterSubXactCallback(pg_async_SubXactCallback, NULL);
UnregisterXactCallback(pg_async_XactCallback, NULL);
}
В хуке на разделяемую память регистрируем её из изменённого скопированного файла async.c:
pg_async.c
static void pg_async_shmem_startup_hook(void) {
if (pg_async_shmem_startup_hook_original) pg_async_shmem_startup_hook_original();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
AsyncShmemInitMy();
LWLockRelease(AddinShmemInitLock);
}
В хуке на транзакции на реплике вызываем соответсвующие функции из изменённого скопированного файла async.c:
pg_async.c
static void pg_async_XactCallback(XactEvent event, void *arg) {
if (!XactReadOnly) return;
switch (event) {
case XACT_EVENT_ABORT: AtAbort_Notify_My(); break;
case XACT_EVENT_COMMIT: AtCommit_Notify_My(); ProcessCompletedNotifiesMy(); break;
case XACT_EVENT_PRE_COMMIT: PreCommit_Notify_My(); break;
case XACT_EVENT_PREPARE: AtPrepare_Notify_My(); break;
default: break;
}
}
В хуке на выполнение команд на реплике для команд LISTEN, UNLISTEN и NOTIFY вызываем соответсвующие функции из изменённого скопированного файла async.c:
pg_async.c
static void CheckRestrictedOperation(const char *cmdname) {
if (InSecurityRestrictedOperation()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("cannot execute %s within security-restricted operation", cmdname)));
}
static void pg_async_ProcessUtility_hook(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletion *qc) {
Node *parsetree = pstmt->utilityStmt;
if (!XactReadOnly) return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
check_stack_depth();
switch (nodeTag(parsetree)) {
case T_ListenStmt: {
ListenStmt *stmt = (ListenStmt *)parsetree;
CheckRestrictedOperation("LISTEN");
Async_Listen_My(stmt->conditionname);
} break;
case T_NotifyStmt: {
NotifyStmt *stmt = (NotifyStmt *)parsetree;
Async_Notify_My(stmt->conditionname, stmt->payload);
} break;
case T_UnlistenStmt: {
UnlistenStmt *stmt = (UnlistenStmt *)parsetree;
CheckRestrictedOperation("UNLISTEN");
stmt->conditionname ? Async_Unlisten_My(stmt->conditionname) : Async_UnlistenAll_My();
} break;
default: return pg_async_ProcessUtility_hook_original ? pg_async_ProcessUtility_hook_original(pstmt, queryString, context, params, queryEnv, dest, qc) : standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, qc);
}
CommandCounterIncrement();
}
Всё это можно посмотреть в репозитории.