|
|
|
@ -865,7 +865,8 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) |
|
|
|
|
if (ctx->callbacks.begin_prepare_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical replication at prepare time requires begin_prepare_cb callback"))); |
|
|
|
|
errmsg("logical replication at prepare time requires a %s callback", |
|
|
|
|
"begin_prepare_cb"))); |
|
|
|
|
|
|
|
|
|
/* do the actual work: call callback */ |
|
|
|
|
ctx->callbacks.begin_prepare_cb(ctx, txn); |
|
|
|
@ -908,7 +909,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.prepare_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical replication at prepare time requires prepare_cb callback"))); |
|
|
|
|
errmsg("logical replication at prepare time requires a %s callback", |
|
|
|
|
"prepare_cb"))); |
|
|
|
|
|
|
|
|
|
/* do the actual work: call callback */ |
|
|
|
|
ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn); |
|
|
|
@ -951,7 +953,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.commit_prepared_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical replication at prepare time requires commit_prepared_cb callback"))); |
|
|
|
|
errmsg("logical replication at prepare time requires a %s callback", |
|
|
|
|
"commit_prepared_cb"))); |
|
|
|
|
|
|
|
|
|
/* do the actual work: call callback */ |
|
|
|
|
ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); |
|
|
|
@ -995,7 +998,8 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.rollback_prepared_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical replication at prepare time requires rollback_prepared_cb callback"))); |
|
|
|
|
errmsg("logical replication at prepare time requires a %s callback", |
|
|
|
|
"rollback_prepared_cb"))); |
|
|
|
|
|
|
|
|
|
/* do the actual work: call callback */ |
|
|
|
|
ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn, |
|
|
|
@ -1217,7 +1221,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.stream_start_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical streaming requires a stream_start_cb callback"))); |
|
|
|
|
errmsg("logical streaming requires a %s callback", |
|
|
|
|
"stream_start_cb"))); |
|
|
|
|
|
|
|
|
|
ctx->callbacks.stream_start_cb(ctx, txn); |
|
|
|
|
|
|
|
|
@ -1263,7 +1268,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.stream_stop_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical streaming requires a stream_stop_cb callback"))); |
|
|
|
|
errmsg("logical streaming requires a %s callback", |
|
|
|
|
"stream_stop_cb"))); |
|
|
|
|
|
|
|
|
|
ctx->callbacks.stream_stop_cb(ctx, txn); |
|
|
|
|
|
|
|
|
@ -1302,7 +1308,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.stream_abort_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical streaming requires a stream_abort_cb callback"))); |
|
|
|
|
errmsg("logical streaming requires a %s callback", |
|
|
|
|
"stream_abort_cb"))); |
|
|
|
|
|
|
|
|
|
ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn); |
|
|
|
|
|
|
|
|
@ -1345,7 +1352,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.stream_prepare_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical streaming at prepare time requires a stream_prepare_cb callback"))); |
|
|
|
|
errmsg("logical streaming at prepare time requires a %s callback", |
|
|
|
|
"stream_prepare_cb"))); |
|
|
|
|
|
|
|
|
|
ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn); |
|
|
|
|
|
|
|
|
@ -1384,7 +1392,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.stream_commit_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical streaming requires a stream_commit_cb callback"))); |
|
|
|
|
errmsg("logical streaming requires a %s callback", |
|
|
|
|
"stream_commit_cb"))); |
|
|
|
|
|
|
|
|
|
ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn); |
|
|
|
|
|
|
|
|
@ -1430,7 +1439,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, |
|
|
|
|
if (ctx->callbacks.stream_change_cb == NULL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("logical streaming requires a stream_change_cb callback"))); |
|
|
|
|
errmsg("logical streaming requires a %s callback", |
|
|
|
|
"stream_change_cb"))); |
|
|
|
|
|
|
|
|
|
ctx->callbacks.stream_change_cb(ctx, txn, relation, change); |
|
|
|
|
|
|
|
|
|