mirror of https://github.com/postgres/postgres
parent
5a15149736
commit
d64b97ae37
@ -0,0 +1,5 @@ |
|||||||
|
|
||||||
|
CREATE TRIGGER "MyTableName_Trig" AFTER INSERT OR DELETE OR UPDATE |
||||||
|
ON "MyTableName" FOR EACH ROW EXECUTE PROCEDURE |
||||||
|
"recordchange" (); |
||||||
|
|
@ -0,0 +1,869 @@ |
|||||||
|
#!/usr/bin/perl |
||||||
|
############################################################################# |
||||||
|
# |
||||||
|
# DBMirror.pl |
||||||
|
# Contains the Database mirroring script. |
||||||
|
# This script queries the pending table off the database specified |
||||||
|
# (along with the associated schema) for updates that are pending on a |
||||||
|
# specific host. The database on that host is then updated with the changes. |
||||||
|
# |
||||||
|
# |
||||||
|
# Written by Steven Singer (ssinger@navtechinc.com) |
||||||
|
# (c) 2001-2002 Navtech Systems Support Inc. |
||||||
|
# Released under the GNU Public License version 2. See COPYING. |
||||||
|
# |
||||||
|
# |
||||||
|
# This program is distributed in the hope that it will be useful, |
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||||
|
# GNU General Public License for more details. |
||||||
|
# |
||||||
|
############################################################################## |
||||||
|
# $Id: DBMirror.pl,v 1.1 2002/06/23 21:58:07 momjian Exp $ |
||||||
|
# |
||||||
|
############################################################################## |
||||||
|
|
||||||
|
=head1 NAME |
||||||
|
|
||||||
|
DBMirror.pl - A Perl module to mirror database changes from a master database |
||||||
|
to a slave. |
||||||
|
|
||||||
|
=head1 SYNPOSIS |
||||||
|
|
||||||
|
|
||||||
|
DBMirror.pl slaveConfigfile.conf |
||||||
|
|
||||||
|
|
||||||
|
=head1 DESCRIPTION |
||||||
|
|
||||||
|
This Perl script will connect to the master database and query its pending |
||||||
|
table for a list of pending changes. |
||||||
|
|
||||||
|
The transactions of the original changes to the master will be preserved |
||||||
|
when sending things to the slave. |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
|
||||||
|
=head1 METHODS |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
|
||||||
|
BEGIN { |
||||||
|
# add in a global path to files |
||||||
|
# Pg should be included. |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
use strict; |
||||||
|
use Pg; |
||||||
|
use IO::Handle; |
||||||
|
sub mirrorCommand($$$$$$); |
||||||
|
sub mirrorInsert($$$$$); |
||||||
|
sub mirrorDelete($$$$$); |
||||||
|
sub mirrorUpdate($$$$$); |
||||||
|
sub sendQueryToSlaves($$); |
||||||
|
sub logErrorMessage($); |
||||||
|
sub openSlaveConnection($); |
||||||
|
sub updateMirrorHostTable($$); |
||||||
|
sub extractData($$); |
||||||
|
local $::masterHost; |
||||||
|
local $::masterDb; |
||||||
|
local $::masterUser; |
||||||
|
local $::masterPassword; |
||||||
|
local $::errorThreshold=5; |
||||||
|
local $::errorEmailAddr=undef; |
||||||
|
|
||||||
|
my %slaveInfoHash; |
||||||
|
local $::slaveInfo = \%slaveInfoHash; |
||||||
|
|
||||||
|
my $lastErrorMsg; |
||||||
|
my $repeatErrorCount=0; |
||||||
|
|
||||||
|
my $lastXID; |
||||||
|
my $commandCount=0; |
||||||
|
|
||||||
|
my $masterConn; |
||||||
|
|
||||||
|
Main(); |
||||||
|
|
||||||
|
sub Main() { |
||||||
|
|
||||||
|
#run the configuration file. |
||||||
|
if ($#ARGV != 0) { |
||||||
|
die "usage: DBMirror.pl configFile\n"; |
||||||
|
} |
||||||
|
if( ! defined do $ARGV[0]) { |
||||||
|
logErrorMessage("Invalid Configuration file $ARGV[0]"); |
||||||
|
die; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; |
||||||
|
|
||||||
|
$masterConn = Pg::connectdb($connectString); |
||||||
|
|
||||||
|
unless($masterConn->status == PGRES_CONNECTION_OK) { |
||||||
|
logErrorMessage("Can't connect to master database\n" . |
||||||
|
$masterConn->errorMessage); |
||||||
|
die; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
my $firstTime = 1; |
||||||
|
while(1) { |
||||||
|
if($firstTime == 0) { |
||||||
|
sleep 60; |
||||||
|
} |
||||||
|
$firstTime = 0; |
||||||
|
# Open up the connection to the slave. |
||||||
|
if(! defined $::slaveInfo->{"status"} || |
||||||
|
$::slaveInfo->{"status"} == -1) { |
||||||
|
openSlaveConnection($::slaveInfo); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); |
||||||
|
sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); |
||||||
|
|
||||||
|
|
||||||
|
#Obtain a list of pending transactions using ordering by our approximation |
||||||
|
#to the commit time. The commit time approximation is taken to be the |
||||||
|
#SeqId of the last row edit in the transaction. |
||||||
|
my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; |
||||||
|
$pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; |
||||||
|
$pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; |
||||||
|
$pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; |
||||||
|
$pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; |
||||||
|
$pendingTransQuery .= " ON pd.\"XID\""; |
||||||
|
$pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; |
||||||
|
$pendingTransQuery .= " GROUP BY pd.\"XID\" "; |
||||||
|
$pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")"; |
||||||
|
|
||||||
|
|
||||||
|
my $pendingTransResults = $masterConn->exec($pendingTransQuery); |
||||||
|
unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) { |
||||||
|
logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); |
||||||
|
die; |
||||||
|
} |
||||||
|
|
||||||
|
my $numPendingTrans = $pendingTransResults->ntuples; |
||||||
|
my $curTransTuple = 0; |
||||||
|
|
||||||
|
|
||||||
|
# |
||||||
|
# This loop loops through each pending transaction in the proper order. |
||||||
|
# The Pending row edits for that transaction will be queried from the |
||||||
|
# master and sent + committed to the slaves. |
||||||
|
while($curTransTuple < $numPendingTrans) { |
||||||
|
my $XID = $pendingTransResults->getvalue($curTransTuple,0); |
||||||
|
my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); |
||||||
|
my $seqId; |
||||||
|
|
||||||
|
my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\","; |
||||||
|
$pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; |
||||||
|
$pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; |
||||||
|
$pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; |
||||||
|
|
||||||
|
$pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC"; |
||||||
|
|
||||||
|
|
||||||
|
my $pendingResults = $masterConn->exec($pendingQuery); |
||||||
|
unless($pendingResults->resultStatus==PGRES_TUPLES_OK) { |
||||||
|
logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); |
||||||
|
die; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
my $numPending = $pendingResults->ntuples; |
||||||
|
my $curTuple = 0; |
||||||
|
sendQueryToSlaves(undef,"BEGIN"); |
||||||
|
while ($curTuple < $numPending) { |
||||||
|
$seqId = $pendingResults->getvalue($curTuple,0); |
||||||
|
my $tableName = $pendingResults->getvalue($curTuple,1); |
||||||
|
my $op = $pendingResults->getvalue($curTuple,2); |
||||||
|
|
||||||
|
$curTuple = mirrorCommand($seqId,$tableName,$op,$XID, |
||||||
|
$pendingResults,$curTuple) +1; |
||||||
|
if($::slaveInfo->{"status"}==-1) { |
||||||
|
last; |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
#Now commit the transaction. |
||||||
|
if($::slaveInfo->{"status"}==-1) { |
||||||
|
last; |
||||||
|
} |
||||||
|
sendQueryToSlaves(undef,"COMMIT"); |
||||||
|
updateMirrorHostTable($XID,$seqId); |
||||||
|
if($commandCount > 5000) { |
||||||
|
$commandCount = 0; |
||||||
|
$::slaveInfo->{"status"} = -1; |
||||||
|
$::slaveInfo->{"slaveConn"}->reset; |
||||||
|
#Open the connection right away. |
||||||
|
openSlaveConnection($::slaveInfo); |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
$pendingResults = undef; |
||||||
|
$curTransTuple = $curTransTuple +1; |
||||||
|
}#while transactions left. |
||||||
|
|
||||||
|
$pendingTransResults = undef; |
||||||
|
|
||||||
|
}#while(1) |
||||||
|
}#Main |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
=item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple) |
||||||
|
|
||||||
|
Mirrors a single SQL Command(change to a single row) to the slave. |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * SeqId |
||||||
|
|
||||||
|
The id number of the change to mirror. This is the |
||||||
|
primary key of the pending table. |
||||||
|
|
||||||
|
|
||||||
|
=item * tableName |
||||||
|
|
||||||
|
The name of the table the transaction takes place on. |
||||||
|
|
||||||
|
=item * op |
||||||
|
|
||||||
|
The type of operation this transaction is. 'i' for insert, 'u' for update or |
||||||
|
'd' for delete. |
||||||
|
|
||||||
|
=item * transId |
||||||
|
|
||||||
|
The Transaction of of the Transaction that this command is part of. |
||||||
|
|
||||||
|
=item * pendingResults |
||||||
|
|
||||||
|
A Results set structure returned from Pg::execute that contains the |
||||||
|
join of the Pending and PendingData tables for all of the pending row |
||||||
|
edits in this transaction. |
||||||
|
|
||||||
|
=item * currentTuple |
||||||
|
|
||||||
|
|
||||||
|
The tuple(or row) number of the pendingRow for the command that is about |
||||||
|
to be edited. If the command is an update then this points to the row |
||||||
|
with IsKey equal to true. The next row, curTuple+1 is the contains the |
||||||
|
PendingData with IsKey false for the update. |
||||||
|
|
||||||
|
|
||||||
|
=item returns |
||||||
|
|
||||||
|
|
||||||
|
The tuple number of last tuple for this command. This might be equal to |
||||||
|
currentTuple or it might be larger (+1 in the case of an Update). |
||||||
|
|
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
|
||||||
|
sub mirrorCommand($$$$$$) { |
||||||
|
my $seqId = $_[0]; |
||||||
|
my $tableName = $_[1]; |
||||||
|
my $op = $_[2]; |
||||||
|
my $transId = $_[3]; |
||||||
|
my $pendingResults = $_[4]; |
||||||
|
my $currentTuple = $_[5]; |
||||||
|
|
||||||
|
if($op eq 'i') { |
||||||
|
$currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults |
||||||
|
,$currentTuple); |
||||||
|
} |
||||||
|
if($op eq 'd') { |
||||||
|
$currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults, |
||||||
|
$currentTuple); |
||||||
|
} |
||||||
|
if($op eq 'u') { |
||||||
|
$currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, |
||||||
|
$currentTuple); |
||||||
|
} |
||||||
|
$commandCount = $commandCount +1; |
||||||
|
if($commandCount % 100 == 0) { |
||||||
|
# print "Sent 100 commmands on SeqId $seqId \n"; |
||||||
|
# flush STDOUT; |
||||||
|
} |
||||||
|
return $currentTuple |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
=item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple) |
||||||
|
|
||||||
|
Mirrors an INSERT operation to the slave database. A new row is placed |
||||||
|
in the slave database containing the primary key from pendingKeys along with |
||||||
|
the data fields contained in the row identified by sourceOid. |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * transId |
||||||
|
|
||||||
|
The sequence id of the INSERT operation being mirrored. This is the primary |
||||||
|
key of the pending table. |
||||||
|
|
||||||
|
=item * tableName |
||||||
|
|
||||||
|
|
||||||
|
The name of the table the transaction takes place on. |
||||||
|
|
||||||
|
=item * sourceOid |
||||||
|
|
||||||
|
The OID of the row in the master database for which this transaction effects. |
||||||
|
If the transaction is a delete then the operation is not valid. |
||||||
|
|
||||||
|
=item * transId |
||||||
|
|
||||||
|
The Transaction Id of transaction that this insert is part of. |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
=item * pendingResults |
||||||
|
|
||||||
|
A Results set structure returned from Pg::execute that contains the |
||||||
|
join of the Pending and PendingData tables for all of the pending row |
||||||
|
edits in this transaction. |
||||||
|
|
||||||
|
=item * currentTuple |
||||||
|
|
||||||
|
|
||||||
|
The tuple(or row) number of the pendingRow for the command that is about |
||||||
|
to be edited. In the case of an insert this should point to the one |
||||||
|
row for the row edit. |
||||||
|
|
||||||
|
=item returns |
||||||
|
|
||||||
|
The tuple number of the last tuple for the row edit. This should be |
||||||
|
currentTuple. |
||||||
|
|
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
|
||||||
|
sub mirrorInsert($$$$$) { |
||||||
|
my $seqId = $_[0]; |
||||||
|
my $tableName = $_[1]; |
||||||
|
my $transId = $_[2]; |
||||||
|
my $pendingResults = $_[3]; |
||||||
|
my $currentTuple = $_[4]; |
||||||
|
my $counter; |
||||||
|
my $column; |
||||||
|
|
||||||
|
my $firstIteration=1; |
||||||
|
my %recordValues = extractData($pendingResults,$currentTuple); |
||||||
|
|
||||||
|
|
||||||
|
#Now build the insert query. |
||||||
|
my $insertQuery = "INSERT INTO \"$tableName\" ("; |
||||||
|
my $valuesQuery = ") VALUES ("; |
||||||
|
foreach $column (keys (%recordValues)) { |
||||||
|
if($firstIteration==0) { |
||||||
|
$insertQuery .= " ,"; |
||||||
|
$valuesQuery .= " ,"; |
||||||
|
} |
||||||
|
$insertQuery .= "\"$column\""; |
||||||
|
if(defined $recordValues{$column}) { |
||||||
|
my $quotedValue = $recordValues{$column}; |
||||||
|
$quotedValue =~ s/\\/\\\\/g; |
||||||
|
$quotedValue =~ s/'/\\'/g; |
||||||
|
$valuesQuery .= "'$quotedValue'"; |
||||||
|
} |
||||||
|
else { |
||||||
|
$valuesQuery .= "null"; |
||||||
|
} |
||||||
|
$firstIteration=0; |
||||||
|
} |
||||||
|
$valuesQuery .= ")"; |
||||||
|
sendQueryToSlaves(undef,$insertQuery . $valuesQuery); |
||||||
|
return $currentTuple; |
||||||
|
} |
||||||
|
|
||||||
|
=item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple) |
||||||
|
|
||||||
|
Deletes a single row from the slave database. The row is identified by the |
||||||
|
primary key for the transaction in the pendingKeys table. |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * SeqId |
||||||
|
|
||||||
|
The Sequence id for this delete request. |
||||||
|
|
||||||
|
=item * tableName |
||||||
|
|
||||||
|
The name of the table to delete the row from. |
||||||
|
|
||||||
|
=item * transId |
||||||
|
|
||||||
|
The Transaction Id of the transaction that this command is part of. |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
=item * pendingResults |
||||||
|
|
||||||
|
A Results set structure returned from Pg::execute that contains the |
||||||
|
join of the Pending and PendingData tables for all of the pending row |
||||||
|
edits in this transaction. |
||||||
|
|
||||||
|
=item * currentTuple |
||||||
|
|
||||||
|
|
||||||
|
The tuple(or row) number of the pendingRow for the command that is about |
||||||
|
to be edited. In the case of a delete this should point to the one |
||||||
|
row for the row edit. |
||||||
|
|
||||||
|
=item returns |
||||||
|
|
||||||
|
The tuple number of the last tuple for the row edit. This should be |
||||||
|
currentTuple. |
||||||
|
|
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
|
||||||
|
sub mirrorDelete($$$$$) { |
||||||
|
my $seqId = $_[0]; |
||||||
|
my $tableName = $_[1]; |
||||||
|
my $transId = $_[2]; |
||||||
|
my $pendingResult = $_[3]; |
||||||
|
my $currentTuple = $_[4]; |
||||||
|
my %dataHash; |
||||||
|
my $currentField; |
||||||
|
my $firstField=1; |
||||||
|
%dataHash = extractData($pendingResult,$currentTuple); |
||||||
|
|
||||||
|
my $counter=0; |
||||||
|
my $deleteQuery = "DELETE FROM \"$tableName\" WHERE "; |
||||||
|
foreach $currentField (keys %dataHash) { |
||||||
|
if($firstField==0) { |
||||||
|
$deleteQuery .= " AND "; |
||||||
|
} |
||||||
|
my $currentValue = $dataHash{$currentField}; |
||||||
|
$deleteQuery .= "\""; |
||||||
|
$deleteQuery .= $currentField; |
||||||
|
if(defined $currentValue) { |
||||||
|
$deleteQuery .= "\"='"; |
||||||
|
$deleteQuery .= $currentValue; |
||||||
|
$deleteQuery .= "'"; |
||||||
|
} |
||||||
|
else { |
||||||
|
$deleteQuery .= " is null "; |
||||||
|
} |
||||||
|
$counter++; |
||||||
|
$firstField=0; |
||||||
|
} |
||||||
|
|
||||||
|
sendQueryToSlaves($transId,$deleteQuery); |
||||||
|
return $currentTuple; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
=item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple) |
||||||
|
|
||||||
|
Mirrors over an edit request to a single row of the database. |
||||||
|
The primary key from before the edit is used to determine which row in the |
||||||
|
slave should be changed. |
||||||
|
|
||||||
|
After the edit takes place on the slave its primary key will match the primary |
||||||
|
key the master had immediatly following the edit. All other fields will be set |
||||||
|
to the current values. |
||||||
|
|
||||||
|
Data integrity is maintained because the mirroring is performed in an |
||||||
|
SQL transcation so either all pending changes are made or none are. |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * seqId |
||||||
|
|
||||||
|
The Sequence id of the update. |
||||||
|
|
||||||
|
=item * tableName |
||||||
|
|
||||||
|
The name of the table to perform the update on. |
||||||
|
|
||||||
|
=item * transId |
||||||
|
|
||||||
|
The transaction Id for the transaction that this command is part of. |
||||||
|
|
||||||
|
|
||||||
|
=item * pendingResults |
||||||
|
|
||||||
|
A Results set structure returned from Pg::execute that contains the |
||||||
|
join of the Pending and PendingData tables for all of the pending row |
||||||
|
edits in this transaction. |
||||||
|
|
||||||
|
=item * currentTuple |
||||||
|
|
||||||
|
|
||||||
|
The tuple(or row) number of the pendingRow for the command that is about |
||||||
|
to be edited. In the case of a delete this should point to the one |
||||||
|
row for the row edit. |
||||||
|
|
||||||
|
=item returns |
||||||
|
|
||||||
|
The tuple number of the last tuple for the row edit. This should be |
||||||
|
currentTuple +1. Which points to the non key row of the update. |
||||||
|
|
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
sub mirrorUpdate($$$$$) { |
||||||
|
my $seqId = $_[0]; |
||||||
|
my $tableName = $_[1]; |
||||||
|
my $transId = $_[2]; |
||||||
|
my $pendingResult = $_[3]; |
||||||
|
my $currentTuple = $_[4]; |
||||||
|
|
||||||
|
my $counter; |
||||||
|
my $quotedValue; |
||||||
|
my $updateQuery = "UPDATE \"$tableName\" SET "; |
||||||
|
my $currentField; |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
my %keyValueHash; |
||||||
|
my %dataValueHash; |
||||||
|
my $firstIteration=1; |
||||||
|
|
||||||
|
#Extract the Key values. This row contains the values of the |
||||||
|
# key fields before the update occours(the WHERE clause) |
||||||
|
%keyValueHash = extractData($pendingResult,$currentTuple); |
||||||
|
|
||||||
|
|
||||||
|
#Extract the data values. This is a SET clause that contains |
||||||
|
#values for the entire row AFTER the update. |
||||||
|
%dataValueHash = extractData($pendingResult,$currentTuple+1); |
||||||
|
|
||||||
|
$firstIteration=1; |
||||||
|
foreach $currentField (keys (%dataValueHash)) { |
||||||
|
if($firstIteration==0) { |
||||||
|
$updateQuery .= ", "; |
||||||
|
} |
||||||
|
$updateQuery .= " \"$currentField\"="; |
||||||
|
my $currentValue = $dataValueHash{$currentField}; |
||||||
|
if(defined $currentValue ) { |
||||||
|
$quotedValue = $currentValue; |
||||||
|
$quotedValue =~ s/\\/\\\\/g; |
||||||
|
$quotedValue =~ s/'/\\'/g; |
||||||
|
$updateQuery .= "'$quotedValue'"; |
||||||
|
} |
||||||
|
else { |
||||||
|
$updateQuery .= "null "; |
||||||
|
} |
||||||
|
$firstIteration=0; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
$updateQuery .= " WHERE "; |
||||||
|
$firstIteration=1; |
||||||
|
foreach $currentField (keys (%keyValueHash)) { |
||||||
|
my $currentValue; |
||||||
|
if($firstIteration==0) { |
||||||
|
$updateQuery .= " AND "; |
||||||
|
} |
||||||
|
$updateQuery .= "\"$currentField\"="; |
||||||
|
$currentValue = $keyValueHash{$currentField}; |
||||||
|
if(defined $currentValue) { |
||||||
|
$quotedValue = $currentValue; |
||||||
|
$quotedValue =~ s/\\/\\\\/g; |
||||||
|
$quotedValue =~ s/'/\\'/g; |
||||||
|
$updateQuery .= "'$quotedValue'"; |
||||||
|
} |
||||||
|
else { |
||||||
|
$updateQuery .= " null "; |
||||||
|
} |
||||||
|
$firstIteration=0; |
||||||
|
} |
||||||
|
|
||||||
|
sendQueryToSlaves($transId,$updateQuery); |
||||||
|
return $currentTuple+1; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
=item sendQueryToSlaves(seqId,sqlQuery) |
||||||
|
|
||||||
|
Sends an SQL query to the slave. |
||||||
|
|
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * seqId |
||||||
|
|
||||||
|
The sequence Id of the command being sent. Undef if no command is associated |
||||||
|
with the query being sent. |
||||||
|
|
||||||
|
=item * sqlQuery |
||||||
|
|
||||||
|
|
||||||
|
SQL operation to perform on the slave. |
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
sub sendQueryToSlaves($$) { |
||||||
|
my $seqId = $_[0]; |
||||||
|
my $sqlQuery = $_[1]; |
||||||
|
|
||||||
|
if($::slaveInfo->{"status"} == 0) { |
||||||
|
my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); |
||||||
|
unless($queryResult->resultStatus == PGRES_COMMAND_OK) { |
||||||
|
my $errorMessage; |
||||||
|
$errorMessage = "Error sending query $seqId to " ; |
||||||
|
$errorMessage .= $::slaveInfo->{"slaveHost"}; |
||||||
|
$errorMessage .=$::slaveInfo->{"slaveConn"}->errorMessage; |
||||||
|
$errorMessage .= "\n" . $sqlQuery; |
||||||
|
logErrorMessage($errorMessage); |
||||||
|
$::slaveInfo->{"slaveConn"}->exec("ROLLBACK"); |
||||||
|
$::slaveInfo->{"status"} = -1; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
=item logErrorMessage(error) |
||||||
|
|
||||||
|
Mails an error message to the users specified $errorEmailAddr |
||||||
|
The error message is also printed to STDERR. |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * error |
||||||
|
|
||||||
|
The error message to log. |
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
sub logErrorMessage($) { |
||||||
|
my $error = $_[0]; |
||||||
|
|
||||||
|
if(defined $lastErrorMsg and $error eq $lastErrorMsg) { |
||||||
|
if($repeatErrorCount<$::errorThreshold) { |
||||||
|
$repeatErrorCount++; |
||||||
|
warn($error); |
||||||
|
return; |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
$repeatErrorCount=0; |
||||||
|
if(defined $::errorEmailAddr) { |
||||||
|
my $mailPipe; |
||||||
|
open (mailPipe, "|/bin/mail -s DBMirror.pl $::errorEmailAddr"); |
||||||
|
print mailPipe "=====================================================\n"; |
||||||
|
print mailPipe " DBMirror.pl \n"; |
||||||
|
print mailPipe "\n"; |
||||||
|
print mailPipe " The DBMirror.pl script has encountred an error. \n"; |
||||||
|
print mailPipe " It might indicate that either the master database has\n"; |
||||||
|
print mailPipe " gone down or that the connection to a slave database can\n"; |
||||||
|
print mailPipe " not be made. \n"; |
||||||
|
print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n"; |
||||||
|
print mailPipe "\n"; |
||||||
|
print mailPipe $error; |
||||||
|
print mailPipe "\n\n\n=================================================\n"; |
||||||
|
close mailPipe; |
||||||
|
} |
||||||
|
warn($error); |
||||||
|
|
||||||
|
$lastErrorMsg = $error; |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
sub openSlaveConnection($) { |
||||||
|
my $slavePtr = $_[0]; |
||||||
|
my $slaveConn; |
||||||
|
|
||||||
|
|
||||||
|
my $slaveConnString = "host=" . $slavePtr->{"slaveHost"}; |
||||||
|
$slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; |
||||||
|
$slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; |
||||||
|
$slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; |
||||||
|
|
||||||
|
$slaveConn = Pg::connectdb($slaveConnString); |
||||||
|
|
||||||
|
if($slaveConn->status !=PGRES_CONNECTION_OK) { |
||||||
|
my $errorMessage = "Can't connect to slave database " ; |
||||||
|
$errorMessage .= $slavePtr->{"slaveHost"} . "\n"; |
||||||
|
$errorMessage .= $slaveConn->errorMessage; |
||||||
|
logErrorMessage($errorMessage); |
||||||
|
$slavePtr->{"status"} = -1; |
||||||
|
} |
||||||
|
else { |
||||||
|
$slavePtr->{"slaveConn"} = $slaveConn; |
||||||
|
$slavePtr->{"status"} = 0; |
||||||
|
#Determine the MirrorHostId for the slave from the master's database |
||||||
|
my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' |
||||||
|
. ' "MirrorHost" WHERE "HostName"' |
||||||
|
. '=\'' . $slavePtr->{"slaveHost"} |
||||||
|
. '\''); |
||||||
|
if($resultSet->ntuples !=1) { |
||||||
|
my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; |
||||||
|
$errorMessage .= "Has no MirrorHost entry on master\n"; |
||||||
|
logErrorMessage($errorMessage); |
||||||
|
$slavePtr->{"status"}=-1; |
||||||
|
return; |
||||||
|
|
||||||
|
} |
||||||
|
$slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
=item updateMirrorHostTable(lastTransId,lastSeqId) |
||||||
|
|
||||||
|
Updates the MirroredTransaction table to reflect the fact that |
||||||
|
this transaction has been sent to the current slave. |
||||||
|
|
||||||
|
=over 4 |
||||||
|
|
||||||
|
=item * lastTransId |
||||||
|
|
||||||
|
The Transaction id for the last transaction that has been succesfully mirrored to |
||||||
|
the currently open slaves. |
||||||
|
|
||||||
|
=item * lastSeqId |
||||||
|
|
||||||
|
The Sequence Id of the last command that has been succefully mirrored |
||||||
|
|
||||||
|
|
||||||
|
=back |
||||||
|
|
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
sub updateMirrorHostTable($$) { |
||||||
|
my $lastTransId = shift; |
||||||
|
my $lastSeqId = shift; |
||||||
|
|
||||||
|
if($::slaveInfo->{"status"}==0) { |
||||||
|
my $deleteTransactionQuery; |
||||||
|
my $deleteResult; |
||||||
|
my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; |
||||||
|
$updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; |
||||||
|
$updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; |
||||||
|
|
||||||
|
my $updateResult = $masterConn->exec($updateMasterQuery); |
||||||
|
unless($updateResult->resultStatus == PGRES_COMMAND_OK) { |
||||||
|
my $errorMessage = $masterConn->errorMessage . "\n"; |
||||||
|
$errorMessage .= $updateMasterQuery; |
||||||
|
logErrorMessage($errorMessage); |
||||||
|
die; |
||||||
|
} |
||||||
|
# print "Updated slaves to transaction $lastTransId\n" ; |
||||||
|
# flush STDOUT; |
||||||
|
|
||||||
|
#If this transaction has now been mirrored to all mirror hosts |
||||||
|
#then it can be deleted. |
||||||
|
$deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' |
||||||
|
. $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' |
||||||
|
. ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' |
||||||
|
. ' "MirrorHost")'; |
||||||
|
|
||||||
|
$deleteResult = $masterConn->exec($deleteTransactionQuery); |
||||||
|
if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { |
||||||
|
logErrorMessage($masterConn->errorMessage . "\n" . |
||||||
|
$deleteTransactionQuery); |
||||||
|
die; |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
sub extractData($$) { |
||||||
|
my $pendingResult = $_[0]; |
||||||
|
my $currentTuple = $_[1]; |
||||||
|
my $fnumber; |
||||||
|
my %valuesHash; |
||||||
|
$fnumber = 4; |
||||||
|
my $dataField = $pendingResult->getvalue($currentTuple,$fnumber); |
||||||
|
|
||||||
|
while(length($dataField)>0) { |
||||||
|
# Extract the field name that is surronded by double quotes |
||||||
|
$dataField =~ m/(\".*?\")/s; |
||||||
|
my $fieldName = $1; |
||||||
|
$dataField = substr $dataField ,length($fieldName); |
||||||
|
$fieldName =~ s/\"//g; #Remove the surronding " signs. |
||||||
|
|
||||||
|
if($dataField =~ m/(^= )/s) { |
||||||
|
#Matched null |
||||||
|
$dataField = substr $dataField , length($1); |
||||||
|
$valuesHash{$fieldName}=undef; |
||||||
|
} |
||||||
|
elsif ($dataField =~ m/(^=\')/s) { |
||||||
|
#Has data. |
||||||
|
my $value; |
||||||
|
$dataField = substr $dataField ,2; #Skip the =' |
||||||
|
LOOP: { #This is to allow us to use last from a do loop. |
||||||
|
#Recommended in perlsyn manpage. |
||||||
|
do { |
||||||
|
my $matchString; |
||||||
|
#Find the substring ending with the first ' or first \ |
||||||
|
$dataField =~ m/(.*?[\'\\])?/s; |
||||||
|
$matchString = $1; |
||||||
|
$value .= substr $matchString,0,length($matchString)-1; |
||||||
|
|
||||||
|
if($matchString =~ m/(\'$)/s) { |
||||||
|
# $1 runs to the end of the field value. |
||||||
|
$dataField = substr $dataField,length($matchString)+1; |
||||||
|
last; |
||||||
|
|
||||||
|
} |
||||||
|
else { |
||||||
|
#deal with the escape character. |
||||||
|
#It The character following the escape gets appended. |
||||||
|
$dataField = substr $dataField,length($matchString); |
||||||
|
$dataField =~ s/(^.)//s; |
||||||
|
$value .= $1; |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
} until(length($dataField)==0); |
||||||
|
} |
||||||
|
$valuesHash{$fieldName} = $value; |
||||||
|
|
||||||
|
|
||||||
|
}#else if |
||||||
|
else { |
||||||
|
|
||||||
|
logErrorMessage "Error in PendingData Sequence Id " . |
||||||
|
$pendingResult->getvalue($currentTuple,0); |
||||||
|
die; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} #while |
||||||
|
return %valuesHash; |
||||||
|
|
||||||
|
} |
@ -0,0 +1,10 @@ |
|||||||
|
# $Header: /cvsroot/pgsql/contrib/dbmirror/Attic/Makefile,v 1.1 2002/06/23 21:58:07 momjian Exp $
|
||||||
|
|
||||||
|
subdir = contrib/dbmirror
|
||||||
|
top_builddir = ../..
|
||||||
|
include $(top_builddir)/src/Makefile.global |
||||||
|
|
||||||
|
MODULES = pending
|
||||||
|
DOCS = README.dbmirror
|
||||||
|
|
||||||
|
include $(top_srcdir)/contrib/contrib-global.mk |
@ -0,0 +1,42 @@ |
|||||||
|
|
||||||
|
CREATE FUNCTION "recordchange" () RETURNS opaque AS |
||||||
|
'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C'; |
||||||
|
|
||||||
|
CREATE TABLE "MirrorHost" ( |
||||||
|
"MirrorHostId" serial, |
||||||
|
"HostName" varchar NOT NULL |
||||||
|
); |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE "Pending" ( |
||||||
|
"SeqId" serial, |
||||||
|
"TableName" varchar NOT NULL, |
||||||
|
"Op" character, |
||||||
|
"XID" int4 NOT NULL, |
||||||
|
PRIMARY KEY ("SeqId") |
||||||
|
|
||||||
|
); |
||||||
|
|
||||||
|
CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID"); |
||||||
|
|
||||||
|
CREATE TABLE "PendingData" ( |
||||||
|
"SeqId" int4 NOT NULL, |
||||||
|
"IsKey" bool NOT NULL, |
||||||
|
"Data" varchar, |
||||||
|
PRIMARY KEY ("SeqId", "IsKey") , |
||||||
|
FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE |
||||||
|
); |
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE "MirroredTransaction" ( |
||||||
|
"XID" int4 NOT NULL, |
||||||
|
"LastSeqId" int4 NOT NULL, |
||||||
|
"MirrorHostId" int4 NOT NULL, |
||||||
|
PRIMARY KEY ("XID","MirrorHostId"), |
||||||
|
FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE, |
||||||
|
FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE |
||||||
|
CASCADE ON DELETE CASCADE |
||||||
|
); |
@ -0,0 +1,204 @@ |
|||||||
|
DBMirror - Postgres Database Mirroring |
||||||
|
=================================================== |
||||||
|
|
||||||
|
|
||||||
|
DBMirror is a database mirroring system developed for the Postgres |
||||||
|
database Written and maintened by Steven Singer(ssinger@navtechinc.com) |
||||||
|
|
||||||
|
|
||||||
|
(c) 2001-2002 Navtech Systems Support Inc. |
||||||
|
Released under the GNU Public License version 2. See COPYING. |
||||||
|
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful, |
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||||
|
GNU General Public License for more details. |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Overrview |
||||||
|
-------------------------------------------------------------------- |
||||||
|
|
||||||
|
The mirroring system is trigger based and provides the following key features: |
||||||
|
|
||||||
|
-Support for multiple mirror slaves |
||||||
|
-Transactions are maintained |
||||||
|
-Per table selection of what gets mirrored. |
||||||
|
|
||||||
|
|
||||||
|
The system is based on the idea that a master database exist where all |
||||||
|
edits are made to the tables being mirrored. A trigger attatched to the |
||||||
|
tables being mirrored runs logging information about the edit to |
||||||
|
the Pending table and PendingData table. |
||||||
|
|
||||||
|
A perl script(DBMirror.pl) runs continiously for each slave database(A database |
||||||
|
that the change is supposed to be mirrored to) examining the Pending |
||||||
|
table; searching for transactions that need to be sent to that particular slave |
||||||
|
database. Those transactions are then mirrored to the slave database and |
||||||
|
the MirroredTransaction table is updated to reflect that the transaction has |
||||||
|
been sent. |
||||||
|
|
||||||
|
If the transaction has been sent to all know slave hosts (All entries |
||||||
|
in the MirrorHost table) then all records of it are purged from the |
||||||
|
Pending tables. |
||||||
|
|
||||||
|
Installation Instructions |
||||||
|
------------------------------------------------------------------------ |
||||||
|
|
||||||
|
1) Compile pending.c |
||||||
|
|
||||||
|
The file pending.c contains the recordchange trigger. This runs every |
||||||
|
time a row inside of a table being mirrored changes. |
||||||
|
|
||||||
|
|
||||||
|
To build the trigger run make on the "Makefile" in the DBMirror directory. |
||||||
|
|
||||||
|
The Makefile supplied assumes that the postgres include files are in |
||||||
|
/usr/local/pgsql/include/server. |
||||||
|
|
||||||
|
Postgres-7.1.x installations should change this to |
||||||
|
/usr/local/pgsql/include (The server part is for 7.2+) |
||||||
|
|
||||||
|
If you have installed the postgres include files to another location then |
||||||
|
modify the Makefile to reflect this. |
||||||
|
|
||||||
|
The trigger requires that all postgres headers be installed, this is |
||||||
|
accomplished in postgresql(7.1 or 7.2) by running "make install-all-headers" |
||||||
|
in the postgres source directory. |
||||||
|
|
||||||
|
The Makefile should create a file named pending.so that contains the trigger. |
||||||
|
|
||||||
|
Install this file in /usr/local/pgsql/lib (or another suitable location). |
||||||
|
|
||||||
|
If you choose a different location the MirrorSetup.sql script will need |
||||||
|
to be modified to reflect your new location. The CREATE FUNCTION command |
||||||
|
in the MirrorSetup.sql script associates the trigger function with the |
||||||
|
pending.so shared library. Modify the arguments to this command if you |
||||||
|
choose to install the trigger elsewhere. |
||||||
|
|
||||||
|
2) Run MirroSetup.sql |
||||||
|
|
||||||
|
This file contains SQL commands to setup the Mirroring environment. |
||||||
|
This includes |
||||||
|
|
||||||
|
-Telling Postgres about the "recordchange" trigger function. |
||||||
|
-Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables |
||||||
|
|
||||||
|
|
||||||
|
To execute the script use psql as follows |
||||||
|
|
||||||
|
"psql -f MirrorSetup.sql MyDatabaseName" |
||||||
|
|
||||||
|
where MyDatabaseName is the name of the database you wish to install mirroring |
||||||
|
on(Your master). |
||||||
|
|
||||||
|
|
||||||
|
3) Create slaveDatabase.conf files. |
||||||
|
|
||||||
|
Each slave database needs its own configuration file for the |
||||||
|
DBMirror.pl script. See slaveDatabase.conf for a sample. |
||||||
|
|
||||||
|
The master settings refer to the master database(The one that is |
||||||
|
being mirrored). |
||||||
|
|
||||||
|
The slave settings refer to the database that the data is being mirrored to. |
||||||
|
The slaveHost parameter must refer to the machine name of the slave (Either |
||||||
|
a resolvable hostname or an IP address). The value for slave host |
||||||
|
must match the Hostname field in the MirrorHost table(See step 6). |
||||||
|
|
||||||
|
The master user must have sufficient permissions to modify the Pending |
||||||
|
tables and to read all of the tables being mirrored. |
||||||
|
|
||||||
|
The slave user must have enough permissions on the slave database to |
||||||
|
modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being |
||||||
|
mirrored. |
||||||
|
|
||||||
|
4) Add the trigger to tables. |
||||||
|
|
||||||
|
Execute the SQL code in AddTrigger.sql once for each table that should |
||||||
|
be mirrored. Replace MyTableName with the name of the table that should |
||||||
|
be mirrored. |
||||||
|
|
||||||
|
5) Create the slave database. |
||||||
|
|
||||||
|
The DBMirror system keeps the contents of mirrored tables identical on the |
||||||
|
master and slave databases. When you first install the mirror triggers the |
||||||
|
master and slave databases must be the same. |
||||||
|
|
||||||
|
If you are starting with an empty master database then the slave should |
||||||
|
be empty as well. Otherwise use pg_dump to ensure that the slave database |
||||||
|
tables are initially identical to the master. |
||||||
|
|
||||||
|
6) Add entries in the MirrorHost table. |
||||||
|
|
||||||
|
Each slave database must have an entry in the MirrorHost table. |
||||||
|
|
||||||
|
The name of the host in the MirrorHost table must exactly match the |
||||||
|
slaveHost variable for that slave in the configuration file. |
||||||
|
|
||||||
|
For example |
||||||
|
INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com'); |
||||||
|
|
||||||
|
|
||||||
|
6) Start DBMirror.pl |
||||||
|
|
||||||
|
|
||||||
|
DBMirror.pl is the perl script that handles the mirroring. |
||||||
|
|
||||||
|
It requires the Perl library Pg(See src/interfaces/perl5 in the postgres |
||||||
|
source distribution). |
||||||
|
|
||||||
|
It takes its configuration file as an argument(The one from step 3) |
||||||
|
One instance of DBMirror.pl runs for each slave machine that is receiving |
||||||
|
mirrored data. |
||||||
|
|
||||||
|
Any errors are printed to standard out and emailed to the address specified in |
||||||
|
the configuration file. |
||||||
|
|
||||||
|
DBMirror can be run from the master, the slave, or a third machine as long |
||||||
|
as it is able to access both the master and slave databases. |
||||||
|
|
||||||
|
7) Periodically run clean_pending.pl |
||||||
|
clean_pending.pl cleans out any entries from the Pending tables that |
||||||
|
have already been mirrored to all hosts in the MirrorHost table. |
||||||
|
It uses the same configuration file as DBMirror.pl. |
||||||
|
|
||||||
|
Normally DBMirror.pl will clean these tables as it goes but in some |
||||||
|
circumstances this will not happen. |
||||||
|
|
||||||
|
For example if a transaction has been mirrored to all slaves except for |
||||||
|
one, then that host is removed from the MirrorHost table(It stops being |
||||||
|
a mirror slave) the transactions that had already been mirrored to |
||||||
|
all the other hosts will not be deleted from the Pending tables by |
||||||
|
DBMirror.pl since DBMirror.pl will run against these transactions again |
||||||
|
since they have already been sent to all the other hosts. |
||||||
|
|
||||||
|
clean_pending.pl will remove these transactions. |
||||||
|
|
||||||
|
TODO(Current Limitations) |
||||||
|
---------- |
||||||
|
-Support for selective mirroring based on the content of data. |
||||||
|
-Support for BLOB's. |
||||||
|
-Support for conflict resolution. |
||||||
|
-Batching SQL commands in DBMirror for better performance over WAN's. |
||||||
|
-Better support for dealing with Schema changes. |
||||||
|
|
||||||
|
Tested Platforms: |
||||||
|
------------------ |
||||||
|
|
||||||
|
DBMirror has been tested on the following configurations but should |
||||||
|
work on any platform with Postgres >= 7.1 and Perl 5.6. |
||||||
|
|
||||||
|
RedHat Linux 7.1 & 6.2 |
||||||
|
-Postgres 7.1.2 |
||||||
|
-Perl 5.6 |
||||||
|
|
||||||
|
Mandrake Linux 8.0(Limited Testing) |
||||||
|
-Postgres 7.2 |
||||||
|
-Perl 5.6 |
||||||
|
|
||||||
|
|
||||||
|
Steven Singer |
||||||
|
Navtech Systems Support Inc. |
||||||
|
ssinger@navtechinc.com |
@ -0,0 +1,106 @@ |
|||||||
|
#!/usr/bin/perl |
||||||
|
# clean_pending.pl |
||||||
|
# This perl script removes entries from the pending,pendingKeys, |
||||||
|
# pendingDeleteData tables that have already been mirrored to all hosts. |
||||||
|
# |
||||||
|
# |
||||||
|
# |
||||||
|
# Written by Steven Singer (ssinger@navtechinc.com) |
||||||
|
# (c) 2001-2002 Navtech Systems Support Inc. |
||||||
|
# Released under the GNU Public License version 2. See COPYING. |
||||||
|
# |
||||||
|
# |
||||||
|
# This program is distributed in the hope that it will be useful, |
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||||
|
# GNU General Public License for more details. |
||||||
|
# |
||||||
|
############################################################################## |
||||||
|
# $Id: clean_pending.pl,v 1.1 2002/06/23 21:58:08 momjian Exp $ |
||||||
|
############################################################################## |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
=head1 NAME |
||||||
|
|
||||||
|
clean_pending.pl - A Perl script to remove old entries from the |
||||||
|
pending, pendingKeys, and pendingDeleteData tables. |
||||||
|
|
||||||
|
|
||||||
|
=head1 SYNPOSIS |
||||||
|
|
||||||
|
|
||||||
|
clean_pending.pl databasename |
||||||
|
|
||||||
|
|
||||||
|
=head1 DESCRIPTION |
||||||
|
|
||||||
|
|
||||||
|
This Perl script connects to the database specified as a command line argument |
||||||
|
on the local system. It uses a hard-coded username and password. |
||||||
|
It then removes any entries from the pending, pendingDeleteData, and |
||||||
|
pendingKeys tables that have already been sent to all hosts in mirrorHosts. |
||||||
|
|
||||||
|
|
||||||
|
=cut |
||||||
|
|
||||||
|
BEGIN { |
||||||
|
# add in a global path to files |
||||||
|
#Ensure that Pg is in the path. |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
use strict; |
||||||
|
use Pg; |
||||||
|
if ($#ARGV != 0) { |
||||||
|
die "usage: clean_pending.pl configFile\n"; |
||||||
|
} |
||||||
|
|
||||||
|
if( ! defined do $ARGV[0]) { |
||||||
|
die("Invalid Configuration file $ARGV[0]"); |
||||||
|
} |
||||||
|
|
||||||
|
#connect to the database. |
||||||
|
|
||||||
|
my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; |
||||||
|
|
||||||
|
my $dbConn = Pg::connectdb($connectString); |
||||||
|
unless($dbConn->status == PGRES_CONNECTION_OK) { |
||||||
|
printf("Can't connect to database\n"); |
||||||
|
die; |
||||||
|
} |
||||||
|
my $result = $dbConn->exec("BEGIN"); |
||||||
|
unless($result->resultStatus == PGRES_COMMAND_OK) { |
||||||
|
die $dbConn->errorMessage; |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
#delete all transactions that have been sent to all mirrorhosts |
||||||
|
#or delete everything if no mirror hosts are defined. |
||||||
|
# Postgres takes the "SELECT COUNT(*) FROM "MirrorHost" and makes it into |
||||||
|
# an InitPlan. EXPLAIN show's this. |
||||||
|
my $deletePendingQuery = 'DELETE FROM "Pending" WHERE (SELECT '; |
||||||
|
$deletePendingQuery .= ' COUNT(*) FROM "MirroredTransaction" WHERE '; |
||||||
|
$deletePendingQuery .= ' "XID"="Pending"."XID") = (SELECT COUNT(*) FROM '; |
||||||
|
$deletePendingQuery .= ' "MirrorHost") OR (SELECT COUNT(*) FROM '; |
||||||
|
$deletePendingQuery .= ' "MirrorHost") = 0'; |
||||||
|
|
||||||
|
my $result = $dbConn->exec($deletePendingQuery); |
||||||
|
unless ($result->resultStatus == PGRES_COMMAND_OK ) { |
||||||
|
printf($dbConn->errorMessage); |
||||||
|
die; |
||||||
|
} |
||||||
|
$dbConn->exec("COMMIT"); |
||||||
|
$result = $dbConn->exec('VACUUM "Pending"'); |
||||||
|
unless ($result->resultStatus == PGRES_COMMAND_OK) { |
||||||
|
printf($dbConn->errorMessage); |
||||||
|
} |
||||||
|
$result = $dbConn->exec('VACUUM "PendingData"'); |
||||||
|
unless($result->resultStatus == PGRES_COMMAND_OK) { |
||||||
|
printf($dbConn->errorMessage); |
||||||
|
} |
||||||
|
$result = $dbConn->exec('VACUUM "MirroredTransaction"'); |
||||||
|
unless($result->resultStatus == PGRES_COMMAND_OK) { |
||||||
|
printf($dbConn->errorMessage); |
||||||
|
} |
||||||
|
|
@ -0,0 +1,447 @@ |
|||||||
|
/****************************************************************************
|
||||||
|
* pending.c |
||||||
|
* $Id: pending.c,v 1.1 2002/06/23 21:58:08 momjian Exp $
|
||||||
|
* |
||||||
|
* This file contains a trigger for Postgresql-7.x to record changes to tables |
||||||
|
* to a pending table for mirroring. |
||||||
|
* All tables that should be mirrored should have this trigger hooked up to it. |
||||||
|
* |
||||||
|
* Written by Steven Singer (ssinger@navtechinc.com) |
||||||
|
* (c) 2001-2002 Navtech Systems Support Inc. |
||||||
|
* Released under the GNU Public License version 2. See COPYING. |
||||||
|
* |
||||||
|
* |
||||||
|
* This program is distributed in the hope that it will be useful, |
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||||
|
* GNU General Public License for more details. |
||||||
|
* |
||||||
|
* |
||||||
|
***************************************************************************/ |
||||||
|
#include <executor/spi.h> |
||||||
|
#include <commands/trigger.h> |
||||||
|
#include <postgres.h> |
||||||
|
|
||||||
|
enum FieldUsage {PRIMARY=0,NONPRIMARY,ALL,NUM_FIELDUSAGE}; |
||||||
|
|
||||||
|
int storePending(char * cpTableName, HeapTuple tBeforeTuple,
|
||||||
|
HeapTuple tAfterTuple, |
||||||
|
TupleDesc tTupdesc, |
||||||
|
TriggerData * tpTrigdata,char cOp); |
||||||
|
|
||||||
|
int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, |
||||||
|
TriggerData * tpTrigdata); |
||||||
|
int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc, |
||||||
|
TriggerData * tpTrigData,int iIncludeKeyData); |
||||||
|
|
||||||
|
int2vector * getPrimaryKey(Oid tblOid); |
||||||
|
|
||||||
|
char * packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, |
||||||
|
TriggerData * tTrigData, |
||||||
|
enum FieldUsage eKeyUsage ); |
||||||
|
|
||||||
|
#define BUFFER_SIZE 256 |
||||||
|
#define MAX_OID_LEN 10 |
||||||
|
|
||||||
|
|
||||||
|
extern Datum recordchange(PG_FUNCTION_ARGS); |
||||||
|
PG_FUNCTION_INFO_V1(recordchange); |
||||||
|
|
||||||
|
|
||||||
|
/*****************************************************************************
|
||||||
|
* The entry point for the trigger function. |
||||||
|
* The Trigger takes a single SQL 'text' argument indicating the name of the |
||||||
|
* table the trigger was applied to. If this name is incorrect so will the |
||||||
|
* mirroring. |
||||||
|
****************************************************************************/ |
||||||
|
Datum recordchange(PG_FUNCTION_ARGS) { |
||||||
|
TriggerData * trigdata;
|
||||||
|
TupleDesc tupdesc; |
||||||
|
HeapTuple beforeTuple=NULL; |
||||||
|
HeapTuple afterTuple=NULL; |
||||||
|
HeapTuple retTuple=NULL; |
||||||
|
char * tblname; |
||||||
|
char op; |
||||||
|
if(fcinfo->context!=NULL) { |
||||||
|
|
||||||
|
if(SPI_connect() < 0) { |
||||||
|
elog(NOTICE,"storePending could not connect to SPI"); |
||||||
|
return -1; |
||||||
|
} |
||||||
|
trigdata = (TriggerData*)fcinfo->context; |
||||||
|
/* Extract the table name */ |
||||||
|
tblname = SPI_getrelname(trigdata->tg_relation); |
||||||
|
tupdesc = trigdata->tg_relation->rd_att; |
||||||
|
if(TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) { |
||||||
|
retTuple = trigdata->tg_newtuple; |
||||||
|
beforeTuple = trigdata->tg_trigtuple; |
||||||
|
afterTuple = trigdata->tg_newtuple; |
||||||
|
op='u'; |
||||||
|
|
||||||
|
} |
||||||
|
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) { |
||||||
|
retTuple = trigdata->tg_trigtuple; |
||||||
|
afterTuple = trigdata->tg_trigtuple; |
||||||
|
op = 'i'; |
||||||
|
} |
||||||
|
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) { |
||||||
|
retTuple = trigdata->tg_trigtuple; |
||||||
|
beforeTuple = trigdata->tg_trigtuple; |
||||||
|
op = 'd'; |
||||||
|
} |
||||||
|
|
||||||
|
if(storePending(tblname,beforeTuple,afterTuple,tupdesc,trigdata,op)) { |
||||||
|
/* An error occoured. Skip the operation. */ |
||||||
|
elog(ERROR,"Operation could not be mirrored"); |
||||||
|
return PointerGetDatum(NULL); |
||||||
|
|
||||||
|
} |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"Returning on success"); |
||||||
|
#endif |
||||||
|
SPI_finish(); |
||||||
|
return PointerGetDatum(retTuple); |
||||||
|
} |
||||||
|
else { |
||||||
|
/*
|
||||||
|
* Not being called as a trigger. |
||||||
|
*/ |
||||||
|
return PointerGetDatum(NULL); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/*****************************************************************************
|
||||||
|
* Constructs and executes an SQL query to write a record of this tuple change |
||||||
|
* to the pending table.
|
||||||
|
*****************************************************************************/ |
||||||
|
int storePending(char * cpTableName, HeapTuple tBeforeTuple,
|
||||||
|
HeapTuple tAfterTuple, |
||||||
|
TupleDesc tTupDesc, |
||||||
|
TriggerData * tpTrigData,char cOp) { |
||||||
|
char * cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; |
||||||
|
|
||||||
|
int iResult=0; |
||||||
|
HeapTuple tCurTuple; // Points the current tuple(before or after)
|
||||||
|
Datum saPlanData[4]; |
||||||
|
Oid taPlanArgTypes[3] = {NAMEOID,CHAROID,INT4OID}; |
||||||
|
void * vpPlan; |
||||||
|
|
||||||
|
tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
vpPlan = SPI_prepare(cpQueryBase,3,taPlanArgTypes); |
||||||
|
if(vpPlan==NULL) { |
||||||
|
elog(NOTICE,"Error creating plan"); |
||||||
|
} |
||||||
|
// SPI_saveplan(vpPlan);
|
||||||
|
|
||||||
|
saPlanData[0] = PointerGetDatum(cpTableName); |
||||||
|
saPlanData[1] = CharGetDatum(cOp); |
||||||
|
saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); |
||||||
|
|
||||||
|
|
||||||
|
iResult = SPI_execp(vpPlan,saPlanData,NULL,1); |
||||||
|
if(iResult < 0) { |
||||||
|
elog(NOTICE,"storedPending fired (%s) returned %d",cpQueryBase,iResult); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"row successfully stored in pending table"); |
||||||
|
#endif |
||||||
|
|
||||||
|
if(cOp=='d') { |
||||||
|
/**
|
||||||
|
* This is a record of a delete operation. |
||||||
|
* Just store the key data. |
||||||
|
*/ |
||||||
|
iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData); |
||||||
|
} |
||||||
|
else if (cOp=='i') { |
||||||
|
/**
|
||||||
|
* An Insert operation. |
||||||
|
* Store all data |
||||||
|
*/ |
||||||
|
iResult = storeData(cpTableName,tAfterTuple,tTupDesc,tpTrigData,TRUE); |
||||||
|
|
||||||
|
} |
||||||
|
else { |
||||||
|
/* op must be an update. */ |
||||||
|
iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData); |
||||||
|
iResult = iResult ? iResult : storeData(cpTableName,tAfterTuple,tTupDesc, |
||||||
|
tpTrigData,TRUE); |
||||||
|
} |
||||||
|
|
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"DOne storing keyinfo"); |
||||||
|
#endif |
||||||
|
|
||||||
|
return iResult; |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
int storeKeyInfo(char * cpTableName, HeapTuple tTupleData,
|
||||||
|
TupleDesc tTupleDesc, |
||||||
|
TriggerData * tpTrigData) { |
||||||
|
|
||||||
|
Oid saPlanArgTypes[1] = {NAMEOID}; |
||||||
|
char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; |
||||||
|
void * pplan; |
||||||
|
Datum saPlanData[1]; |
||||||
|
char * cpKeyData; |
||||||
|
int iRetCode; |
||||||
|
|
||||||
|
pplan = SPI_prepare(insQuery,1,saPlanArgTypes); |
||||||
|
if(pplan==NULL) { |
||||||
|
elog(NOTICE,"Could not prepare INSERT plan"); |
||||||
|
return -1; |
||||||
|
} |
||||||
|
|
||||||
|
// pplan = SPI_saveplan(pplan);
|
||||||
|
cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,PRIMARY); |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,cpKeyData); |
||||||
|
#endif |
||||||
|
saPlanData[0] = PointerGetDatum(cpKeyData); |
||||||
|
|
||||||
|
iRetCode = SPI_execp(pplan,saPlanData,NULL,1);
|
||||||
|
|
||||||
|
if(cpKeyData!=NULL) { |
||||||
|
SPI_pfree(cpKeyData); |
||||||
|
} |
||||||
|
|
||||||
|
if(iRetCode != SPI_OK_INSERT ) { |
||||||
|
elog(NOTICE,"Error inserting row in pendingDelete"); |
||||||
|
return -1; |
||||||
|
} |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"INSERT SUCCESFULL"); |
||||||
|
#endif |
||||||
|
|
||||||
|
return 0; |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int2vector * getPrimaryKey(Oid tblOid) { |
||||||
|
char * queryBase; |
||||||
|
char * query; |
||||||
|
bool isNull; |
||||||
|
int2vector * resultKey; |
||||||
|
int2vector * tpResultKey; |
||||||
|
HeapTuple resTuple; |
||||||
|
Datum resDatum; |
||||||
|
int ret; |
||||||
|
queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid="; |
||||||
|
query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN+1); |
||||||
|
sprintf(query,"%s%d",queryBase,tblOid); |
||||||
|
ret = SPI_exec(query,1); |
||||||
|
if(ret != SPI_OK_SELECT || SPI_processed != 1 ) { |
||||||
|
elog(NOTICE,"Could not select primary index key"); |
||||||
|
return NULL; |
||||||
|
} |
||||||
|
|
||||||
|
resTuple = SPI_tuptable->vals[0]; |
||||||
|
resDatum = SPI_getbinval(resTuple,SPI_tuptable->tupdesc,1,&isNull); |
||||||
|
|
||||||
|
tpResultKey = (int2vector*) DatumGetPointer(resDatum); |
||||||
|
resultKey = SPI_palloc(sizeof(int2vector)); |
||||||
|
memcpy(resultKey,tpResultKey,sizeof(int2vector)); |
||||||
|
|
||||||
|
SPI_pfree(query); |
||||||
|
return resultKey; |
||||||
|
} |
||||||
|
|
||||||
|
/******************************************************************************
|
||||||
|
* Stores a copy of the non-key data for the row. |
||||||
|
*****************************************************************************/ |
||||||
|
int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc, |
||||||
|
TriggerData * tpTrigData,int iIncludeKeyData) { |
||||||
|
|
||||||
|
Oid planArgTypes[1] = {NAMEOID}; |
||||||
|
char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; |
||||||
|
void * pplan; |
||||||
|
Datum planData[1]; |
||||||
|
char * cpKeyData; |
||||||
|
int iRetValue; |
||||||
|
|
||||||
|
pplan = SPI_prepare(insQuery,1,planArgTypes); |
||||||
|
if(pplan==NULL) { |
||||||
|
elog(NOTICE,"Could not prepare INSERT plan"); |
||||||
|
return -1; |
||||||
|
} |
||||||
|
|
||||||
|
// pplan = SPI_saveplan(pplan);
|
||||||
|
if(iIncludeKeyData==0) { |
||||||
|
cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,NONPRIMARY);
|
||||||
|
} |
||||||
|
else { |
||||||
|
cpKeyData = packageData(tTupleData,tTupleDesc,tpTrigData,ALL); |
||||||
|
} |
||||||
|
|
||||||
|
planData[0] = PointerGetDatum(cpKeyData); |
||||||
|
iRetValue = SPI_execp(pplan,planData,NULL,1); |
||||||
|
|
||||||
|
if(cpKeyData!=0) { |
||||||
|
SPI_pfree(cpKeyData); |
||||||
|
} |
||||||
|
|
||||||
|
if(iRetValue != SPI_OK_INSERT ) { |
||||||
|
elog(NOTICE,"Error inserting row in pendingDelete"); |
||||||
|
return -1; |
||||||
|
} |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"INSERT SUCCESFULL"); |
||||||
|
#endif |
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
/**
|
||||||
|
* Packages the data in tTupleData into a string of the format
|
||||||
|
* FieldName='value text' where any quotes inside of value text |
||||||
|
* are escaped with a backslash and any backslashes in value text |
||||||
|
* are esacped by a second back slash. |
||||||
|
* |
||||||
|
* tTupleDesc should be a description of the tuple stored in
|
||||||
|
* tTupleData.
|
||||||
|
* |
||||||
|
* eFieldUsage specifies which fields to use. |
||||||
|
* PRIMARY implies include only primary key fields. |
||||||
|
* NONPRIMARY implies include only non-primary key fields. |
||||||
|
* ALL implies include all fields. |
||||||
|
*/ |
||||||
|
char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, |
||||||
|
TriggerData * tpTrigData, |
||||||
|
enum FieldUsage eKeyUsage ) { |
||||||
|
int iNumCols; |
||||||
|
int2vector * tpPKeys=NULL; |
||||||
|
int iColumnCounter; |
||||||
|
char * cpDataBlock; |
||||||
|
int iDataBlockSize; |
||||||
|
int iUsedDataBlock; |
||||||
|
|
||||||
|
iNumCols = tTupleDesc->natts; |
||||||
|
|
||||||
|
if(eKeyUsage!=ALL) { |
||||||
|
tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); |
||||||
|
if(tpPKeys==NULL) { |
||||||
|
return NULL; |
||||||
|
} |
||||||
|
} |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
if(tpPKeys!=NULL) { |
||||||
|
elog(NOTICE,"Have primary keys"); |
||||||
|
} |
||||||
|
#endif |
||||||
|
cpDataBlock = SPI_palloc(BUFFER_SIZE); |
||||||
|
iDataBlockSize = BUFFER_SIZE; |
||||||
|
iUsedDataBlock = 0; /* To account for the null */ |
||||||
|
|
||||||
|
for(iColumnCounter=1; iColumnCounter <=iNumCols; iColumnCounter++) { |
||||||
|
int iIsPrimaryKey; |
||||||
|
int iPrimaryKeyIndex; |
||||||
|
char * cpUnFormatedPtr; |
||||||
|
char * cpFormatedPtr; |
||||||
|
|
||||||
|
char * cpFieldName; |
||||||
|
char * cpFieldData; |
||||||
|
if(eKeyUsage!=ALL) { |
||||||
|
//Determine if this is a primary key or not.
|
||||||
|
iIsPrimaryKey=0; |
||||||
|
for(iPrimaryKeyIndex=0; (*tpPKeys)[iPrimaryKeyIndex]!=0; |
||||||
|
iPrimaryKeyIndex++) { |
||||||
|
if((*tpPKeys)[iPrimaryKeyIndex]==iColumnCounter) {
|
||||||
|
iIsPrimaryKey=1; |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
if( iIsPrimaryKey ? (eKeyUsage!=PRIMARY) : (eKeyUsage!=NONPRIMARY)) { |
||||||
|
/**
|
||||||
|
* Don't use. |
||||||
|
*/ |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"Skipping column"); |
||||||
|
#endif |
||||||
|
continue; |
||||||
|
} |
||||||
|
} /* KeyUsage!=ALL */ |
||||||
|
cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs |
||||||
|
[iColumnCounter-1]->attname)); |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,cpFieldName); |
||||||
|
#endif |
||||||
|
while(iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +4) { |
||||||
|
cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize + BUFFER_SIZE); |
||||||
|
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
|
||||||
|
} |
||||||
|
sprintf(cpDataBlock+iUsedDataBlock,"\"%s\"=",cpFieldName); |
||||||
|
iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName)+3; |
||||||
|
cpFieldData=SPI_getvalue(tTupleData,tTupleDesc,iColumnCounter); |
||||||
|
|
||||||
|
cpUnFormatedPtr = cpFieldData; |
||||||
|
cpFormatedPtr = cpDataBlock + iUsedDataBlock; |
||||||
|
if(cpFieldData!=NULL) { |
||||||
|
*cpFormatedPtr='\''; |
||||||
|
iUsedDataBlock++; |
||||||
|
cpFormatedPtr++; |
||||||
|
} |
||||||
|
else { |
||||||
|
*cpFormatedPtr=' '; |
||||||
|
iUsedDataBlock++; |
||||||
|
cpFormatedPtr++; |
||||||
|
continue; |
||||||
|
|
||||||
|
} |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,cpFieldData); |
||||||
|
elog(NOTICE,"Starting format loop"); |
||||||
|
#endif |
||||||
|
while(*cpUnFormatedPtr!=0) { |
||||||
|
while(iDataBlockSize - iUsedDataBlock < 2) { |
||||||
|
cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE); |
||||||
|
iDataBlockSize = iDataBlockSize + BUFFER_SIZE; |
||||||
|
cpFormatedPtr = cpDataBlock + iUsedDataBlock; |
||||||
|
} |
||||||
|
if(*cpUnFormatedPtr=='\\' || *cpUnFormatedPtr=='\'') { |
||||||
|
*cpFormatedPtr='\\'; |
||||||
|
cpFormatedPtr++; |
||||||
|
iUsedDataBlock++; |
||||||
|
} |
||||||
|
*cpFormatedPtr=*cpUnFormatedPtr; |
||||||
|
cpFormatedPtr++; |
||||||
|
cpUnFormatedPtr++; |
||||||
|
iUsedDataBlock++; |
||||||
|
} |
||||||
|
|
||||||
|
SPI_pfree(cpFieldData); |
||||||
|
|
||||||
|
while(iDataBlockSize - iUsedDataBlock < 3) { |
||||||
|
cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE); |
||||||
|
iDataBlockSize = iDataBlockSize + BUFFER_SIZE; |
||||||
|
cpFormatedPtr = cpDataBlock + iUsedDataBlock; |
||||||
|
} |
||||||
|
sprintf(cpFormatedPtr,"' "); |
||||||
|
iUsedDataBlock = iUsedDataBlock +2; |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,cpDataBlock); |
||||||
|
#endif |
||||||
|
|
||||||
|
} /* for iColumnCounter */ |
||||||
|
if(tpPKeys!=NULL) { |
||||||
|
SPI_pfree(tpPKeys);
|
||||||
|
} |
||||||
|
#if defined DEBUG_OUTPUT |
||||||
|
elog(NOTICE,"Returning"); |
||||||
|
#endif |
||||||
|
memset(cpDataBlock + iUsedDataBlock,0,iDataBlockSize - iUsedDataBlock); |
||||||
|
|
||||||
|
return cpDataBlock; |
||||||
|
|
||||||
|
} |
@ -0,0 +1,22 @@ |
|||||||
|
######################################################################### |
||||||
|
# Config file for DBMirror.pl |
||||||
|
# This file contains a sample configuration file for DBMirror.pl |
||||||
|
# It contains configuration information to mirror data from |
||||||
|
# the master database to a single slave system. |
||||||
|
# |
||||||
|
# $Id: slaveDatabase.conf,v 1.1 2002/06/23 21:58:08 momjian Exp $ |
||||||
|
####################################################################### |
||||||
|
|
||||||
|
$masterHost = "masterMachine.mydomain.com"; |
||||||
|
$masterDb = "myDatabase"; |
||||||
|
$masterUser = "postgres"; |
||||||
|
$masterPassword = "postgrespassword"; |
||||||
|
|
||||||
|
# Where to email Error messages to |
||||||
|
# $errorEmailAddr = "me@mydomain.com"; |
||||||
|
|
||||||
|
$slaveInfo->{"slaveHost"} = "backupMachine.mydomain.com"; |
||||||
|
$slaveInfo->{"slaveDb"} = "myDatabase"; |
||||||
|
$slaveInfo->{"slaveUser"} = "postgres"; |
||||||
|
$slaveInfo->{"slavePassword"} = "postgrespassword"; |
||||||
|
|
Loading…
Reference in new issue