|
|
|
@ -237,8 +237,8 @@ typedef struct AsyncQueueControl |
|
|
|
|
QueuePosition tail; /* the global tail is equivalent to the tail
|
|
|
|
|
* of the "slowest" backend */ |
|
|
|
|
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ |
|
|
|
|
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */ |
|
|
|
|
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */ |
|
|
|
|
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; |
|
|
|
|
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */ |
|
|
|
|
} AsyncQueueControl; |
|
|
|
|
|
|
|
|
|
static AsyncQueueControl *asyncQueueControl; |
|
|
|
@ -303,7 +303,7 @@ typedef enum |
|
|
|
|
typedef struct |
|
|
|
|
{ |
|
|
|
|
ListenActionKind action; |
|
|
|
|
char channel[1]; /* actually, as long as needed */ |
|
|
|
|
char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ |
|
|
|
|
} ListenAction; |
|
|
|
|
|
|
|
|
|
static List *pendingActions = NIL; /* list of ListenAction */ |
|
|
|
@ -417,8 +417,8 @@ AsyncShmemSize(void) |
|
|
|
|
Size size; |
|
|
|
|
|
|
|
|
|
/* This had better match AsyncShmemInit */ |
|
|
|
|
size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); |
|
|
|
|
size = add_size(size, sizeof(AsyncQueueControl)); |
|
|
|
|
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); |
|
|
|
|
size = add_size(size, offsetof(AsyncQueueControl, backend)); |
|
|
|
|
|
|
|
|
|
size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0)); |
|
|
|
|
|
|
|
|
@ -438,12 +438,11 @@ AsyncShmemInit(void) |
|
|
|
|
/*
|
|
|
|
|
* Create or attach to the AsyncQueueControl structure. |
|
|
|
|
* |
|
|
|
|
* The used entries in the backend[] array run from 1 to MaxBackends. |
|
|
|
|
* sizeof(AsyncQueueControl) already includes space for the unused zero'th |
|
|
|
|
* entry, but we need to add on space for the used entries. |
|
|
|
|
* The used entries in the backend[] array run from 1 to MaxBackends; the |
|
|
|
|
* zero'th entry is unused but must be allocated. |
|
|
|
|
*/ |
|
|
|
|
size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); |
|
|
|
|
size = add_size(size, sizeof(AsyncQueueControl)); |
|
|
|
|
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); |
|
|
|
|
size = add_size(size, offsetof(AsyncQueueControl, backend)); |
|
|
|
|
|
|
|
|
|
asyncQueueControl = (AsyncQueueControl *) |
|
|
|
|
ShmemInitStruct("Async Queue Control", size, &found); |
|
|
|
@ -605,7 +604,8 @@ queue_listen(ListenActionKind action, const char *channel) |
|
|
|
|
oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
|
|
|
|
|
|
|
|
|
/* space for terminating null is included in sizeof(ListenAction) */ |
|
|
|
|
actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(channel)); |
|
|
|
|
actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + |
|
|
|
|
strlen(channel) + 1); |
|
|
|
|
actrec->action = action; |
|
|
|
|
strcpy(actrec->channel, channel); |
|
|
|
|
|
|
|
|
|