mirror of https://github.com/postgres/postgres
A background worker can use pq_redirect_to_shm_mq() to direct protocol that would normally be sent to the frontend to a shm_mq so that another process may read them. The receiving process may use pq_parse_errornotice() to parse an ErrorResponse or NoticeResponse from the background worker and, if it wishes, ThrowErrorData() to propagate the error (with or without further modification). Patch by me. Review by Andres Freund.pull/14/head
parent
252e652ede
commit
2bd9e412f9
@ -0,0 +1,261 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* pqmq.c |
||||
* Use the frontend/backend protocol for communication over a shm_mq |
||||
* |
||||
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group |
||||
* Portions Copyright (c) 1994, Regents of the University of California |
||||
* |
||||
* src/backend/libpq/pqmq.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
#include "libpq/libpq.h" |
||||
#include "libpq/pqformat.h" |
||||
#include "libpq/pqmq.h" |
||||
#include "tcop/tcopprot.h" |
||||
#include "utils/builtins.h" |
||||
|
||||
static shm_mq *pq_mq; |
||||
static shm_mq_handle *pq_mq_handle; |
||||
static bool pq_mq_busy = false; |
||||
|
||||
static void mq_comm_reset(void); |
||||
static int mq_flush(void); |
||||
static int mq_flush_if_writable(void); |
||||
static bool mq_is_send_pending(void); |
||||
static int mq_putmessage(char msgtype, const char *s, size_t len); |
||||
static void mq_putmessage_noblock(char msgtype, const char *s, size_t len); |
||||
static void mq_startcopyout(void); |
||||
static void mq_endcopyout(bool errorAbort); |
||||
|
||||
static PQcommMethods PqCommMqMethods = { |
||||
mq_comm_reset, |
||||
mq_flush, |
||||
mq_flush_if_writable, |
||||
mq_is_send_pending, |
||||
mq_putmessage, |
||||
mq_putmessage_noblock, |
||||
mq_startcopyout, |
||||
mq_endcopyout |
||||
}; |
||||
|
||||
/*
|
||||
* Arrange to redirect frontend/backend protocol messages to a shared-memory |
||||
* message queue. |
||||
*/ |
||||
void |
||||
pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh) |
||||
{ |
||||
PqCommMethods = &PqCommMqMethods; |
||||
pq_mq = mq; |
||||
pq_mq_handle = mqh; |
||||
whereToSendOutput = DestRemote; |
||||
FrontendProtocol = PG_PROTOCOL_LATEST; |
||||
} |
||||
|
||||
static void |
||||
mq_comm_reset(void) |
||||
{ |
||||
/* Nothing to do. */ |
||||
} |
||||
|
||||
static int |
||||
mq_flush(void) |
||||
{ |
||||
/* Nothing to do. */ |
||||
return 0; |
||||
} |
||||
|
||||
static int |
||||
mq_flush_if_writable(void) |
||||
{ |
||||
/* Nothing to do. */ |
||||
return 0; |
||||
} |
||||
|
||||
static bool |
||||
mq_is_send_pending(void) |
||||
{ |
||||
/* There's never anything pending. */ |
||||
return 0; |
||||
} |
||||
|
||||
/*
|
||||
* Transmit a libpq protocol message to the shared memory message queue |
||||
* selected via pq_mq_handle. We don't include a length word, because the |
||||
* receiver will know the length of the message from shm_mq_receive(). |
||||
*/ |
||||
static int |
||||
mq_putmessage(char msgtype, const char *s, size_t len) |
||||
{ |
||||
shm_mq_iovec iov[2]; |
||||
shm_mq_result result; |
||||
|
||||
/*
|
||||
* If we're sending a message, and we have to wait because the |
||||
* queue is full, and then we get interrupted, and that interrupt |
||||
* results in trying to send another message, we respond by detaching |
||||
* the queue. There's no way to return to the original context, but |
||||
* even if there were, just queueing the message would amount to |
||||
* indefinitely postponing the response to the interrupt. So we do |
||||
* this instead. |
||||
*/ |
||||
if (pq_mq_busy) |
||||
{ |
||||
if (pq_mq != NULL) |
||||
shm_mq_detach(pq_mq); |
||||
pq_mq = NULL; |
||||
return EOF; |
||||
} |
||||
|
||||
pq_mq_busy = true; |
||||
|
||||
iov[0].data = &msgtype; |
||||
iov[0].len = 1; |
||||
iov[1].data = s; |
||||
iov[1].len = len; |
||||
|
||||
Assert(pq_mq_handle != NULL); |
||||
result = shm_mq_sendv(pq_mq_handle, iov, 2, false); |
||||
|
||||
pq_mq_busy = false; |
||||
|
||||
Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); |
||||
if (result != SHM_MQ_SUCCESS) |
||||
return EOF; |
||||
return 0; |
||||
} |
||||
|
||||
static void |
||||
mq_putmessage_noblock(char msgtype, const char *s, size_t len) |
||||
{ |
||||
/*
|
||||
* While the shm_mq machinery does support sending a message in |
||||
* non-blocking mode, there's currently no way to try sending beginning |
||||
* to send the message that doesn't also commit us to completing the |
||||
* transmission. This could be improved in the future, but for now |
||||
* we don't need it. |
||||
*/ |
||||
elog(ERROR, "not currently supported"); |
||||
} |
||||
|
||||
static void |
||||
mq_startcopyout(void) |
||||
{ |
||||
/* Nothing to do. */ |
||||
} |
||||
|
||||
static void |
||||
mq_endcopyout(bool errorAbort) |
||||
{ |
||||
/* Nothing to do. */ |
||||
} |
||||
|
||||
/*
|
||||
* Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData |
||||
* structure with the results. |
||||
*/ |
||||
void |
||||
pq_parse_errornotice(StringInfo msg, ErrorData *edata) |
||||
{ |
||||
/* Initialize edata with reasonable defaults. */ |
||||
MemSet(edata, 0, sizeof(ErrorData)); |
||||
edata->elevel = ERROR; |
||||
edata->assoc_context = CurrentMemoryContext; |
||||
|
||||
/* Loop over fields and extract each one. */ |
||||
for (;;) |
||||
{ |
||||
char code = pq_getmsgbyte(msg); |
||||
const char *value; |
||||
|
||||
if (code == '\0') |
||||
{ |
||||
pq_getmsgend(msg); |
||||
break; |
||||
} |
||||
value = pq_getmsgstring(msg); |
||||
|
||||
switch (code) |
||||
{ |
||||
case PG_DIAG_SEVERITY: |
||||
if (strcmp(value, "DEBUG") == 0) |
||||
edata->elevel = DEBUG1; /* or some other DEBUG level */ |
||||
else if (strcmp(value, "LOG") == 0) |
||||
edata->elevel = LOG; /* can't be COMMERROR */ |
||||
else if (strcmp(value, "INFO") == 0) |
||||
edata->elevel = INFO; |
||||
else if (strcmp(value, "NOTICE") == 0) |
||||
edata->elevel = NOTICE; |
||||
else if (strcmp(value, "WARNING") == 0) |
||||
edata->elevel = WARNING; |
||||
else if (strcmp(value, "ERROR") == 0) |
||||
edata->elevel = ERROR; |
||||
else if (strcmp(value, "FATAL") == 0) |
||||
edata->elevel = FATAL; |
||||
else if (strcmp(value, "PANIC") == 0) |
||||
edata->elevel = PANIC; |
||||
else |
||||
elog(ERROR, "unknown error severity"); |
||||
break; |
||||
case PG_DIAG_SQLSTATE: |
||||
if (strlen(value) != 5) |
||||
elog(ERROR, "malformed sql state"); |
||||
edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2], |
||||
value[3], value[4]); |
||||
break; |
||||
case PG_DIAG_MESSAGE_PRIMARY: |
||||
edata->message = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_MESSAGE_DETAIL: |
||||
edata->detail = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_MESSAGE_HINT: |
||||
edata->hint = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_STATEMENT_POSITION: |
||||
edata->cursorpos = pg_atoi(value, sizeof(int), '\0'); |
||||
break; |
||||
case PG_DIAG_INTERNAL_POSITION: |
||||
edata->internalpos = pg_atoi(value, sizeof(int), '\0'); |
||||
break; |
||||
case PG_DIAG_INTERNAL_QUERY: |
||||
edata->internalquery = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_CONTEXT: |
||||
edata->context = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_SCHEMA_NAME: |
||||
edata->schema_name = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_TABLE_NAME: |
||||
edata->table_name = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_COLUMN_NAME: |
||||
edata->column_name = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_DATATYPE_NAME: |
||||
edata->datatype_name = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_CONSTRAINT_NAME: |
||||
edata->constraint_name = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_SOURCE_FILE: |
||||
edata->filename = pstrdup(value); |
||||
break; |
||||
case PG_DIAG_SOURCE_LINE: |
||||
edata->lineno = pg_atoi(value, sizeof(int), '\0'); |
||||
break; |
||||
case PG_DIAG_SOURCE_FUNCTION: |
||||
edata->funcname = pstrdup(value); |
||||
break; |
||||
default: |
||||
elog(ERROR, "unknown error field: %d", (int) code); |
||||
break; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,22 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* pqmq.h |
||||
* Use the frontend/backend protocol for communication over a shm_mq |
||||
* |
||||
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group |
||||
* Portions Copyright (c) 1994, Regents of the University of California |
||||
* |
||||
* src/include/libpq/pqmq.h |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef PQMQ_H |
||||
#define PQMQ_H |
||||
|
||||
#include "storage/shm_mq.h" |
||||
|
||||
extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *); |
||||
|
||||
extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); |
||||
|
||||
#endif /* PQMQ_H */ |
Loading…
Reference in new issue