diff options
Diffstat (limited to 'contrib/dbmirror/DBMirror.pl')
-rwxr-xr-x | contrib/dbmirror/DBMirror.pl | 869 |
1 files changed, 869 insertions, 0 deletions
diff --git a/contrib/dbmirror/DBMirror.pl b/contrib/dbmirror/DBMirror.pl new file mode 100755 index 00000000000..c934c869b78 --- /dev/null +++ b/contrib/dbmirror/DBMirror.pl @@ -0,0 +1,869 @@ +#!/usr/bin/perl +############################################################################# +# +# DBMirror.pl +# Contains the Database mirroring script. +# This script queries the pending table off the database specified +# (along with the associated schema) for updates that are pending on a +# specific host. The database on that host is then updated with the changes. +# +# +# Written by Steven Singer (ssinger@navtechinc.com) +# (c) 2001-2002 Navtech Systems Support Inc. +# Released under the GNU Public License version 2. See COPYING. +# +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +############################################################################## +# $Id: DBMirror.pl,v 1.1 2002/06/23 21:58:07 momjian Exp $ +# +############################################################################## + +=head1 NAME + +DBMirror.pl - A Perl module to mirror database changes from a master database +to a slave. + +=head1 SYNPOSIS + + +DBMirror.pl slaveConfigfile.conf + + +=head1 DESCRIPTION + +This Perl script will connect to the master database and query its pending +table for a list of pending changes. + +The transactions of the original changes to the master will be preserved +when sending things to the slave. + +=cut + + +=head1 METHODS + +=over 4 + +=cut + + +BEGIN { + # add in a global path to files + # Pg should be included. +} + + +use strict; +use Pg; +use IO::Handle; +sub mirrorCommand($$$$$$); +sub mirrorInsert($$$$$); +sub mirrorDelete($$$$$); +sub mirrorUpdate($$$$$); +sub sendQueryToSlaves($$); +sub logErrorMessage($); +sub openSlaveConnection($); +sub updateMirrorHostTable($$); + sub extractData($$); +local $::masterHost; +local $::masterDb; +local $::masterUser; +local $::masterPassword; +local $::errorThreshold=5; +local $::errorEmailAddr=undef; + +my %slaveInfoHash; +local $::slaveInfo = \%slaveInfoHash; + +my $lastErrorMsg; +my $repeatErrorCount=0; + +my $lastXID; +my $commandCount=0; + +my $masterConn; + +Main(); + +sub Main() { + +#run the configuration file. + if ($#ARGV != 0) { + die "usage: DBMirror.pl configFile\n"; + } + if( ! defined do $ARGV[0]) { + logErrorMessage("Invalid Configuration file $ARGV[0]"); + die; + } + + + my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; + + $masterConn = Pg::connectdb($connectString); + + unless($masterConn->status == PGRES_CONNECTION_OK) { + logErrorMessage("Can't connect to master database\n" . + $masterConn->errorMessage); + die; + } + + + my $firstTime = 1; + while(1) { + if($firstTime == 0) { + sleep 60; + } + $firstTime = 0; +# Open up the connection to the slave. + if(! defined $::slaveInfo->{"status"} || + $::slaveInfo->{"status"} == -1) { + openSlaveConnection($::slaveInfo); + } + + + + sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED"); + + + #Obtain a list of pending transactions using ordering by our approximation + #to the commit time. The commit time approximation is taken to be the + #SeqId of the last row edit in the transaction. + my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; + $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; + $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; + $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; + $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; + $pendingTransQuery .= " ON pd.\"XID\""; + $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; + $pendingTransQuery .= " GROUP BY pd.\"XID\" "; + $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")"; + + + my $pendingTransResults = $masterConn->exec($pendingTransQuery); + unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) { + logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); + die; + } + + my $numPendingTrans = $pendingTransResults->ntuples; + my $curTransTuple = 0; + + + # + # This loop loops through each pending transaction in the proper order. + # The Pending row edits for that transaction will be queried from the + # master and sent + committed to the slaves. + while($curTransTuple < $numPendingTrans) { + my $XID = $pendingTransResults->getvalue($curTransTuple,0); + my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); + my $seqId; + + my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\","; + $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; + $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; + $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; + + $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC"; + + + my $pendingResults = $masterConn->exec($pendingQuery); + unless($pendingResults->resultStatus==PGRES_TUPLES_OK) { + logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage); + die; + } + + + + my $numPending = $pendingResults->ntuples; + my $curTuple = 0; + sendQueryToSlaves(undef,"BEGIN"); + while ($curTuple < $numPending) { + $seqId = $pendingResults->getvalue($curTuple,0); + my $tableName = $pendingResults->getvalue($curTuple,1); + my $op = $pendingResults->getvalue($curTuple,2); + + $curTuple = mirrorCommand($seqId,$tableName,$op,$XID, + $pendingResults,$curTuple) +1; + if($::slaveInfo->{"status"}==-1) { + last; + } + + } + #Now commit the transaction. + if($::slaveInfo->{"status"}==-1) { + last; + } + sendQueryToSlaves(undef,"COMMIT"); + updateMirrorHostTable($XID,$seqId); + if($commandCount > 5000) { + $commandCount = 0; + $::slaveInfo->{"status"} = -1; + $::slaveInfo->{"slaveConn"}->reset; + #Open the connection right away. + openSlaveConnection($::slaveInfo); + + } + + $pendingResults = undef; + $curTransTuple = $curTransTuple +1; + }#while transactions left. + + $pendingTransResults = undef; + + }#while(1) +}#Main + + + +=item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple) + +Mirrors a single SQL Command(change to a single row) to the slave. + +=over 4 + +=item * SeqId + +The id number of the change to mirror. This is the +primary key of the pending table. + + +=item * tableName + +The name of the table the transaction takes place on. + +=item * op + +The type of operation this transaction is. 'i' for insert, 'u' for update or +'d' for delete. + +=item * transId + +The Transaction of of the Transaction that this command is part of. + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. If the command is an update then this points to the row +with IsKey equal to true. The next row, curTuple+1 is the contains the +PendingData with IsKey false for the update. + + +=item returns + + +The tuple number of last tuple for this command. This might be equal to +currentTuple or it might be larger (+1 in the case of an Update). + + +=back + +=cut + + +sub mirrorCommand($$$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $op = $_[2]; + my $transId = $_[3]; + my $pendingResults = $_[4]; + my $currentTuple = $_[5]; + + if($op eq 'i') { + $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults + ,$currentTuple); + } + if($op eq 'd') { + $currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults, + $currentTuple); + } + if($op eq 'u') { + $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, + $currentTuple); + } + $commandCount = $commandCount +1; + if($commandCount % 100 == 0) { + # print "Sent 100 commmands on SeqId $seqId \n"; + # flush STDOUT; + } + return $currentTuple + } + + +=item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple) + +Mirrors an INSERT operation to the slave database. A new row is placed +in the slave database containing the primary key from pendingKeys along with +the data fields contained in the row identified by sourceOid. + +=over 4 + +=item * transId + +The sequence id of the INSERT operation being mirrored. This is the primary +key of the pending table. + +=item * tableName + + +The name of the table the transaction takes place on. + +=item * sourceOid + +The OID of the row in the master database for which this transaction effects. +If the transaction is a delete then the operation is not valid. + +=item * transId + +The Transaction Id of transaction that this insert is part of. + + + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. In the case of an insert this should point to the one +row for the row edit. + +=item returns + +The tuple number of the last tuple for the row edit. This should be +currentTuple. + + +=back + +=cut + + +sub mirrorInsert($$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $transId = $_[2]; + my $pendingResults = $_[3]; + my $currentTuple = $_[4]; + my $counter; + my $column; + + my $firstIteration=1; + my %recordValues = extractData($pendingResults,$currentTuple); + + + #Now build the insert query. + my $insertQuery = "INSERT INTO \"$tableName\" ("; + my $valuesQuery = ") VALUES ("; + foreach $column (keys (%recordValues)) { + if($firstIteration==0) { + $insertQuery .= " ,"; + $valuesQuery .= " ,"; + } + $insertQuery .= "\"$column\""; + if(defined $recordValues{$column}) { + my $quotedValue = $recordValues{$column}; + $quotedValue =~ s/\\/\\\\/g; + $quotedValue =~ s/'/\\'/g; + $valuesQuery .= "'$quotedValue'"; + } + else { + $valuesQuery .= "null"; + } + $firstIteration=0; + } + $valuesQuery .= ")"; + sendQueryToSlaves(undef,$insertQuery . $valuesQuery); + return $currentTuple; +} + +=item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple) + +Deletes a single row from the slave database. The row is identified by the +primary key for the transaction in the pendingKeys table. + +=over 4 + +=item * SeqId + +The Sequence id for this delete request. + +=item * tableName + +The name of the table to delete the row from. + +=item * transId + +The Transaction Id of the transaction that this command is part of. + + + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. In the case of a delete this should point to the one +row for the row edit. + +=item returns + +The tuple number of the last tuple for the row edit. This should be +currentTuple. + + +=back + +=cut + + +sub mirrorDelete($$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $transId = $_[2]; + my $pendingResult = $_[3]; + my $currentTuple = $_[4]; + my %dataHash; + my $currentField; + my $firstField=1; + %dataHash = extractData($pendingResult,$currentTuple); + + my $counter=0; + my $deleteQuery = "DELETE FROM \"$tableName\" WHERE "; + foreach $currentField (keys %dataHash) { + if($firstField==0) { + $deleteQuery .= " AND "; + } + my $currentValue = $dataHash{$currentField}; + $deleteQuery .= "\""; + $deleteQuery .= $currentField; + if(defined $currentValue) { + $deleteQuery .= "\"='"; + $deleteQuery .= $currentValue; + $deleteQuery .= "'"; + } + else { + $deleteQuery .= " is null "; + } + $counter++; + $firstField=0; + } + + sendQueryToSlaves($transId,$deleteQuery); + return $currentTuple; +} + + +=item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple) + +Mirrors over an edit request to a single row of the database. +The primary key from before the edit is used to determine which row in the +slave should be changed. + +After the edit takes place on the slave its primary key will match the primary +key the master had immediatly following the edit. All other fields will be set +to the current values. + +Data integrity is maintained because the mirroring is performed in an +SQL transcation so either all pending changes are made or none are. + +=over 4 + +=item * seqId + +The Sequence id of the update. + +=item * tableName + +The name of the table to perform the update on. + +=item * transId + +The transaction Id for the transaction that this command is part of. + + +=item * pendingResults + +A Results set structure returned from Pg::execute that contains the +join of the Pending and PendingData tables for all of the pending row +edits in this transaction. + +=item * currentTuple + + +The tuple(or row) number of the pendingRow for the command that is about +to be edited. In the case of a delete this should point to the one +row for the row edit. + +=item returns + +The tuple number of the last tuple for the row edit. This should be +currentTuple +1. Which points to the non key row of the update. + + +=back + +=cut + +sub mirrorUpdate($$$$$) { + my $seqId = $_[0]; + my $tableName = $_[1]; + my $transId = $_[2]; + my $pendingResult = $_[3]; + my $currentTuple = $_[4]; + + my $counter; + my $quotedValue; + my $updateQuery = "UPDATE \"$tableName\" SET "; + my $currentField; + + + + my %keyValueHash; + my %dataValueHash; + my $firstIteration=1; + + #Extract the Key values. This row contains the values of the + # key fields before the update occours(the WHERE clause) + %keyValueHash = extractData($pendingResult,$currentTuple); + + + #Extract the data values. This is a SET clause that contains + #values for the entire row AFTER the update. + %dataValueHash = extractData($pendingResult,$currentTuple+1); + + $firstIteration=1; + foreach $currentField (keys (%dataValueHash)) { + if($firstIteration==0) { + $updateQuery .= ", "; + } + $updateQuery .= " \"$currentField\"="; + my $currentValue = $dataValueHash{$currentField}; + if(defined $currentValue ) { + $quotedValue = $currentValue; + $quotedValue =~ s/\\/\\\\/g; + $quotedValue =~ s/'/\\'/g; + $updateQuery .= "'$quotedValue'"; + } + else { + $updateQuery .= "null "; + } + $firstIteration=0; + } + + + $updateQuery .= " WHERE "; + $firstIteration=1; + foreach $currentField (keys (%keyValueHash)) { + my $currentValue; + if($firstIteration==0) { + $updateQuery .= " AND "; + } + $updateQuery .= "\"$currentField\"="; + $currentValue = $keyValueHash{$currentField}; + if(defined $currentValue) { + $quotedValue = $currentValue; + $quotedValue =~ s/\\/\\\\/g; + $quotedValue =~ s/'/\\'/g; + $updateQuery .= "'$quotedValue'"; + } + else { + $updateQuery .= " null "; + } + $firstIteration=0; + } + + sendQueryToSlaves($transId,$updateQuery); + return $currentTuple+1; +} + + + +=item sendQueryToSlaves(seqId,sqlQuery) + +Sends an SQL query to the slave. + + +=over 4 + +=item * seqId + +The sequence Id of the command being sent. Undef if no command is associated +with the query being sent. + +=item * sqlQuery + + +SQL operation to perform on the slave. + +=back + +=cut + +sub sendQueryToSlaves($$) { + my $seqId = $_[0]; + my $sqlQuery = $_[1]; + + if($::slaveInfo->{"status"} == 0) { + my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); + unless($queryResult->resultStatus == PGRES_COMMAND_OK) { + my $errorMessage; + $errorMessage = "Error sending query $seqId to " ; + $errorMessage .= $::slaveInfo->{"slaveHost"}; + $errorMessage .=$::slaveInfo->{"slaveConn"}->errorMessage; + $errorMessage .= "\n" . $sqlQuery; + logErrorMessage($errorMessage); + $::slaveInfo->{"slaveConn"}->exec("ROLLBACK"); + $::slaveInfo->{"status"} = -1; + } + } + +} + + +=item logErrorMessage(error) + +Mails an error message to the users specified $errorEmailAddr +The error message is also printed to STDERR. + +=over 4 + +=item * error + +The error message to log. + +=back + +=cut + +sub logErrorMessage($) { + my $error = $_[0]; + + if(defined $lastErrorMsg and $error eq $lastErrorMsg) { + if($repeatErrorCount<$::errorThreshold) { + $repeatErrorCount++; + warn($error); + return; + } + + } + $repeatErrorCount=0; + if(defined $::errorEmailAddr) { + my $mailPipe; + open (mailPipe, "|/bin/mail -s DBMirror.pl $::errorEmailAddr"); + print mailPipe "=====================================================\n"; + print mailPipe " DBMirror.pl \n"; + print mailPipe "\n"; + print mailPipe " The DBMirror.pl script has encountred an error. \n"; + print mailPipe " It might indicate that either the master database has\n"; + print mailPipe " gone down or that the connection to a slave database can\n"; + print mailPipe " not be made. \n"; + print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n"; + print mailPipe "\n"; + print mailPipe $error; + print mailPipe "\n\n\n=================================================\n"; + close mailPipe; + } + warn($error); + + $lastErrorMsg = $error; + +} + +sub openSlaveConnection($) { + my $slavePtr = $_[0]; + my $slaveConn; + + + my $slaveConnString = "host=" . $slavePtr->{"slaveHost"}; + $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"}; + $slaveConnString .= " user=" . $slavePtr->{"slaveUser"}; + $slaveConnString .= " password=" . $slavePtr->{"slavePassword"}; + + $slaveConn = Pg::connectdb($slaveConnString); + + if($slaveConn->status !=PGRES_CONNECTION_OK) { + my $errorMessage = "Can't connect to slave database " ; + $errorMessage .= $slavePtr->{"slaveHost"} . "\n"; + $errorMessage .= $slaveConn->errorMessage; + logErrorMessage($errorMessage); + $slavePtr->{"status"} = -1; + } + else { + $slavePtr->{"slaveConn"} = $slaveConn; + $slavePtr->{"status"} = 0; + #Determine the MirrorHostId for the slave from the master's database + my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' + . ' "MirrorHost" WHERE "HostName"' + . '=\'' . $slavePtr->{"slaveHost"} + . '\''); + if($resultSet->ntuples !=1) { + my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; + $errorMessage .= "Has no MirrorHost entry on master\n"; + logErrorMessage($errorMessage); + $slavePtr->{"status"}=-1; + return; + + } + $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); + + + + } + +} + + +=item updateMirrorHostTable(lastTransId,lastSeqId) + +Updates the MirroredTransaction table to reflect the fact that +this transaction has been sent to the current slave. + +=over 4 + +=item * lastTransId + +The Transaction id for the last transaction that has been succesfully mirrored to +the currently open slaves. + +=item * lastSeqId + +The Sequence Id of the last command that has been succefully mirrored + + +=back + + +=cut + +sub updateMirrorHostTable($$) { + my $lastTransId = shift; + my $lastSeqId = shift; + + if($::slaveInfo->{"status"}==0) { + my $deleteTransactionQuery; + my $deleteResult; + my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; + $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; + $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; + + my $updateResult = $masterConn->exec($updateMasterQuery); + unless($updateResult->resultStatus == PGRES_COMMAND_OK) { + my $errorMessage = $masterConn->errorMessage . "\n"; + $errorMessage .= $updateMasterQuery; + logErrorMessage($errorMessage); + die; + } +# print "Updated slaves to transaction $lastTransId\n" ; +# flush STDOUT; + + #If this transaction has now been mirrored to all mirror hosts + #then it can be deleted. + $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' + . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' + . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' + . ' "MirrorHost")'; + + $deleteResult = $masterConn->exec($deleteTransactionQuery); + if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { + logErrorMessage($masterConn->errorMessage . "\n" . + $deleteTransactionQuery); + die; + } + + } + +} + + +sub extractData($$) { + my $pendingResult = $_[0]; + my $currentTuple = $_[1]; + my $fnumber; + my %valuesHash; + $fnumber = 4; + my $dataField = $pendingResult->getvalue($currentTuple,$fnumber); + + while(length($dataField)>0) { + # Extract the field name that is surronded by double quotes + $dataField =~ m/(\".*?\")/s; + my $fieldName = $1; + $dataField = substr $dataField ,length($fieldName); + $fieldName =~ s/\"//g; #Remove the surronding " signs. + + if($dataField =~ m/(^= )/s) { + #Matched null + $dataField = substr $dataField , length($1); + $valuesHash{$fieldName}=undef; + } + elsif ($dataField =~ m/(^=\')/s) { + #Has data. + my $value; + $dataField = substr $dataField ,2; #Skip the =' + LOOP: { #This is to allow us to use last from a do loop. + #Recommended in perlsyn manpage. + do { + my $matchString; + #Find the substring ending with the first ' or first \ + $dataField =~ m/(.*?[\'\\])?/s; + $matchString = $1; + $value .= substr $matchString,0,length($matchString)-1; + + if($matchString =~ m/(\'$)/s) { + # $1 runs to the end of the field value. + $dataField = substr $dataField,length($matchString)+1; + last; + + } + else { + #deal with the escape character. + #It The character following the escape gets appended. + $dataField = substr $dataField,length($matchString); + $dataField =~ s/(^.)//s; + $value .= $1; + + + + } + + + } until(length($dataField)==0); + } + $valuesHash{$fieldName} = $value; + + + }#else if + else { + + logErrorMessage "Error in PendingData Sequence Id " . + $pendingResult->getvalue($currentTuple,0); + die; + } + + + + } #while + return %valuesHash; + +} |