|
|
|
|
@ -314,7 +314,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) |
|
|
|
|
*/ |
|
|
|
|
ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, |
|
|
|
|
MyLogicalRepWorker->relid, |
|
|
|
|
syncslotname); |
|
|
|
|
syncslotname, |
|
|
|
|
sizeof(syncslotname)); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* It is important to give an error if we are unable to drop the slot, |
|
|
|
|
@ -462,7 +463,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) |
|
|
|
|
*/ |
|
|
|
|
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, |
|
|
|
|
rstate->relid, |
|
|
|
|
originname); |
|
|
|
|
originname, |
|
|
|
|
sizeof(originname)); |
|
|
|
|
replorigin_drop_by_name(originname, true, false); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -871,27 +873,20 @@ copy_table(Relation rel) |
|
|
|
|
* pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum |
|
|
|
|
* length of slot_name will be 50. |
|
|
|
|
* |
|
|
|
|
* The returned slot name is either: |
|
|
|
|
* - stored in the supplied buffer (syncslotname), or |
|
|
|
|
* - palloc'ed in current memory context (if syncslotname = NULL). |
|
|
|
|
* The returned slot name is stored in the supplied buffer (syncslotname) with |
|
|
|
|
* the given size. |
|
|
|
|
* |
|
|
|
|
* Note: We don't use the subscription slot name as part of tablesync slot name |
|
|
|
|
* because we are responsible for cleaning up these slots and it could become |
|
|
|
|
* impossible to recalculate what name to cleanup if the subscription slot name |
|
|
|
|
* had changed. |
|
|
|
|
*/ |
|
|
|
|
char * |
|
|
|
|
void |
|
|
|
|
ReplicationSlotNameForTablesync(Oid suboid, Oid relid, |
|
|
|
|
char syncslotname[NAMEDATALEN]) |
|
|
|
|
char *syncslotname, int szslot) |
|
|
|
|
{ |
|
|
|
|
if (syncslotname) |
|
|
|
|
sprintf(syncslotname, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid, |
|
|
|
|
GetSystemIdentifier()); |
|
|
|
|
else |
|
|
|
|
syncslotname = psprintf("pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid, |
|
|
|
|
GetSystemIdentifier()); |
|
|
|
|
|
|
|
|
|
return syncslotname; |
|
|
|
|
snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, |
|
|
|
|
relid, GetSystemIdentifier()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -901,9 +896,9 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
ReplicationOriginNameForTablesync(Oid suboid, Oid relid, |
|
|
|
|
char originname[NAMEDATALEN]) |
|
|
|
|
char *originname, int szorgname) |
|
|
|
|
{ |
|
|
|
|
snprintf(originname, NAMEDATALEN, "pg_%u_%u", suboid, relid); |
|
|
|
|
snprintf(originname, szorgname, "pg_%u_%u", suboid, relid); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -951,9 +946,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Calculate the name of the tablesync slot. */ |
|
|
|
|
slotname = ReplicationSlotNameForTablesync(MySubscription->oid, |
|
|
|
|
MyLogicalRepWorker->relid, |
|
|
|
|
NULL /* use palloc */ ); |
|
|
|
|
slotname = (char *) palloc(NAMEDATALEN); |
|
|
|
|
ReplicationSlotNameForTablesync(MySubscription->oid, |
|
|
|
|
MyLogicalRepWorker->relid, |
|
|
|
|
slotname, |
|
|
|
|
NAMEDATALEN); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Here we use the slot name instead of the subscription name as the |
|
|
|
|
@ -972,7 +969,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) |
|
|
|
|
/* Assign the origin tracking record name. */ |
|
|
|
|
ReplicationOriginNameForTablesync(MySubscription->oid, |
|
|
|
|
MyLogicalRepWorker->relid, |
|
|
|
|
originname); |
|
|
|
|
originname, |
|
|
|
|
sizeof(originname)); |
|
|
|
|
|
|
|
|
|
if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) |
|
|
|
|
{ |
|
|
|
|
|