|
|
|
@ -44,8 +44,8 @@ |
|
|
|
|
* point it sets state to READY and stops tracking. Again, there might |
|
|
|
|
* be zero changes in between. |
|
|
|
|
* |
|
|
|
|
* So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> |
|
|
|
|
* SYNCDONE -> READY. |
|
|
|
|
* So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> |
|
|
|
|
* SYNCDONE -> READY. |
|
|
|
|
* |
|
|
|
|
* The catalog pg_subscription_rel is used to keep information about |
|
|
|
|
* subscribed tables and their state. Some transient state during data |
|
|
|
@ -136,7 +136,8 @@ finish_sync_worker(void) |
|
|
|
|
StartTransactionCommand(); |
|
|
|
|
ereport(LOG, |
|
|
|
|
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", |
|
|
|
|
MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); |
|
|
|
|
MySubscription->name, |
|
|
|
|
get_rel_name(MyLogicalRepWorker->relid)))); |
|
|
|
|
CommitTransactionCommand(); |
|
|
|
|
|
|
|
|
|
/* Find the main apply worker and signal it. */ |
|
|
|
@ -164,7 +165,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) |
|
|
|
|
for (;;) |
|
|
|
|
{ |
|
|
|
|
LogicalRepWorker *worker; |
|
|
|
|
XLogRecPtr statelsn; |
|
|
|
|
XLogRecPtr statelsn; |
|
|
|
|
|
|
|
|
|
CHECK_FOR_INTERRUPTS(); |
|
|
|
|
|
|
|
|
@ -185,7 +186,7 @@ wait_for_relation_state_change(Oid relid, char expected_state) |
|
|
|
|
|
|
|
|
|
/* Check if the opposite worker is still running and bail if not. */ |
|
|
|
|
worker = logicalrep_worker_find(MyLogicalRepWorker->subid, |
|
|
|
|
am_tablesync_worker() ? InvalidOid : relid, |
|
|
|
|
am_tablesync_worker() ? InvalidOid : relid, |
|
|
|
|
false); |
|
|
|
|
LWLockRelease(LogicalRepWorkerLock); |
|
|
|
|
if (!worker) |
|
|
|
@ -401,8 +402,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Apply has caught up to the position where the table sync has |
|
|
|
|
* finished. Mark the table as ready so that the apply will |
|
|
|
|
* just continue to replicate it normally. |
|
|
|
|
* finished. Mark the table as ready so that the apply will just |
|
|
|
|
* continue to replicate it normally. |
|
|
|
|
*/ |
|
|
|
|
if (current_lsn >= rstate->lsn) |
|
|
|
|
{ |
|
|
|
@ -436,9 +437,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) |
|
|
|
|
else |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If there is no sync worker for this table yet, count running sync |
|
|
|
|
* workers for this subscription, while we have the lock, for |
|
|
|
|
* later. |
|
|
|
|
* If there is no sync worker for this table yet, count |
|
|
|
|
* running sync workers for this subscription, while we have |
|
|
|
|
* the lock, for later. |
|
|
|
|
*/ |
|
|
|
|
nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); |
|
|
|
|
LWLockRelease(LogicalRepWorkerLock); |
|
|
|
@ -858,17 +859,17 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Use a standard write lock here. It might be better to |
|
|
|
|
* disallow access to the table while it's being synchronized. But |
|
|
|
|
* we don't want to block the main apply process from working |
|
|
|
|
* and it has to open the relation in RowExclusiveLock when |
|
|
|
|
* remapping remote relation id to local one. |
|
|
|
|
* disallow access to the table while it's being synchronized. |
|
|
|
|
* But we don't want to block the main apply process from |
|
|
|
|
* working and it has to open the relation in RowExclusiveLock |
|
|
|
|
* when remapping remote relation id to local one. |
|
|
|
|
*/ |
|
|
|
|
rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Create a temporary slot for the sync process. We do this |
|
|
|
|
* inside the transaction so that we can use the snapshot made by |
|
|
|
|
* the slot to get existing data. |
|
|
|
|
* inside the transaction so that we can use the snapshot made |
|
|
|
|
* by the slot to get existing data. |
|
|
|
|
*/ |
|
|
|
|
res = walrcv_exec(wrconn, |
|
|
|
|
"BEGIN READ ONLY ISOLATION LEVEL " |
|
|
|
@ -916,14 +917,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) |
|
|
|
|
/* Wait for main apply worker to tell us to catchup. */ |
|
|
|
|
wait_for_worker_state_change(SUBREL_STATE_CATCHUP); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
/*----------
|
|
|
|
|
* There are now two possible states here: |
|
|
|
|
* a) Sync is behind the apply. If that's the case we need to |
|
|
|
|
* catch up with it by consuming the logical replication |
|
|
|
|
* stream up to the relstate_lsn. For that, we exit this |
|
|
|
|
* function and continue in ApplyWorkerMain(). |
|
|
|
|
* catch up with it by consuming the logical replication |
|
|
|
|
* stream up to the relstate_lsn. For that, we exit this |
|
|
|
|
* function and continue in ApplyWorkerMain(). |
|
|
|
|
* b) Sync is caught up with the apply. So it can just set |
|
|
|
|
* the state to SYNCDONE and finish. |
|
|
|
|
* the state to SYNCDONE and finish. |
|
|
|
|
*---------- |
|
|
|
|
*/ |
|
|
|
|
if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) |
|
|
|
|
{ |
|
|
|
@ -943,9 +945,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) |
|
|
|
|
case SUBREL_STATE_SYNCDONE: |
|
|
|
|
case SUBREL_STATE_READY: |
|
|
|
|
case SUBREL_STATE_UNKNOWN: |
|
|
|
|
/* Nothing to do here but finish. (UNKNOWN means the relation was
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Nothing to do here but finish. (UNKNOWN means the relation was |
|
|
|
|
* removed from pg_subscription_rel before the sync worker could |
|
|
|
|
* start.) */ |
|
|
|
|
* start.) |
|
|
|
|
*/ |
|
|
|
|
finish_sync_worker(); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|