aboutsummaryrefslogtreecommitdiff
path: root/contrib/rserv/RServ.pm
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/rserv/RServ.pm')
-rw-r--r--contrib/rserv/RServ.pm761
1 files changed, 761 insertions, 0 deletions
diff --git a/contrib/rserv/RServ.pm b/contrib/rserv/RServ.pm
new file mode 100644
index 00000000000..de0f037cbe0
--- /dev/null
+++ b/contrib/rserv/RServ.pm
@@ -0,0 +1,761 @@
+# -*- perl -*-
+# RServ.pm
+# Vadim Mikheev, (c) 2000, PostgreSQL Inc.
+
+package RServ;
+
+require Exporter;
+@ISA = qw(Exporter);
+@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);
+@EXPORT_OK = qw();
+
+use Pg;
+
+$debug = 0;
+$quiet = 1;
+
+my %Mtables = ();
+my %Stables = ();
+
+sub PrepareSnapshot
+{
+ my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);
+
+ my $result = $conn->exec("BEGIN");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ $result = $conn->exec("set transaction isolation level serializable");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ # MAP oid --> tabname, keyname
+ $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" .
+ " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" .
+ " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
+ " and pga.attnum = rt.key");
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ my @row;
+ while (@row = $result->fetchrow)
+ {
+ # printf "$row[0], $row[1], $row[2]\n";
+ push @{$Mtables{$row[0]}}, $row[1], $row[2];
+ }
+
+ # Read last succeeded sync
+ $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" .
+ " where server = $server and syncid = (select max(syncid) from" .
+ " _RSERV_SYNC_ where server = $server and status > 0)";
+
+ printf "$sql\n" if $debug;
+
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ my @lastsync = $result->fetchrow;
+
+ my $sinfo = "";
+ if ($lastsync[3] ne '') # sync info
+ {
+ $sinfo = "and (l.logid >= $lastsync[3]";
+ $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
+ $sinfo .= ")";
+ }
+
+ my $havedeal = 0;
+
+ # DELETED rows
+ $sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
+ " where l.deleted = 1 $sinfo order by l.reloid";
+
+ printf "$sql\n" if $debug;
+
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ $lastoid = '';
+ while (@row = $result->fetchrow)
+ {
+ next unless exists $Mtables{$row[0]};
+ if ($lastoid != $row[0])
+ {
+ if ($lastoid eq '')
+ {
+ my $syncid = GetSYNCID($conn, $outf);
+ return($syncid) if $syncid < 0;
+ $havedeal = 1;
+ }
+ else
+ {
+ printf $outf "\\.\n";
+ }
+ printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
+ $lastoid = $row[0];
+ }
+ if (! defined $row[1])
+ {
+ print STDERR "NULL key\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ printf $outf "%s\n", OutputValue($row[1]);
+ }
+ printf $outf "\\.\n" if $lastoid ne '';
+
+ # UPDATED rows
+
+ my ($taboid, $tabname, $tabkey);
+ foreach $taboid (keys %Mtables)
+ {
+ ($tabname, $tabkey) = @{$Mtables{$taboid}};
+ my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
+ $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .
+ " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .
+ " and l.key = _$tabname.${tabkey}::text";
+
+ printf "$sql\n" if $debug;
+
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ printf $outf "-- ERROR\n" if $havedeal;
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ next if $result->ntuples <= 0;
+ if (! $havedeal)
+ {
+ my $syncid = GetSYNCID($conn, $outf);
+ return($syncid) if $syncid < 0;
+ $havedeal = 1;
+ }
+ printf $outf "-- UPDATE $tabname\n";
+ while (@row = $result->fetchrow)
+ {
+ for ($i = 0; $i <= $#row; $i++)
+ {
+ printf $outf " " if $i;
+ printf $outf "%s", OutputValue($row[$i]);
+ }
+ printf $outf "\n";
+ }
+ printf $outf "\\.\n";
+ }
+
+ unless ($havedeal)
+ {
+ $conn->exec("ROLLBACK");
+ return(0);
+ }
+
+ # Remember this snapshot info
+ $result = $conn->exec("select _rserv_sync_($server)");
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ printf $outf "-- ERROR\n";
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ $result = $conn->exec("COMMIT");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ printf $outf "-- ERROR\n";
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ printf $outf "-- OK\n";
+
+ return(1);
+
+}
+
+sub OutputValue
+{
+ my ($val) = @_; # @_[0];
+
+ return("\\N") unless defined $val;
+
+ $val =~ s/\\/\\\\/g;
+ $val =~ s/ /\\011/g;
+ $val =~ s/\n/\\012/g;
+ $val =~ s/\'/\\047/g;
+
+ return($val);
+}
+
+# Get syncid for new snapshot
+sub GetSYNCID
+{
+ my ($conn, $outf) = @_; # (@_[0], @_[1]);
+
+ my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ my @row = $result->fetchrow;
+
+ printf $outf "-- SYNCID $row[0]\n";
+ return($row[0]);
+}
+
+
+sub CleanLog
+{
+ my ($conn, $howold) = @_; # (@_[0], @_[1]);
+
+ my $result = $conn->exec("BEGIN");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
+ " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
+ " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";
+
+ printf "$sql\n" if $debug;
+
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+ my $maxid = '';
+ my %active = ();
+ while (my @row = $result->fetchrow)
+ {
+ $maxid = $row[0] if $maxid eq '';
+ last if $row[0] > $maxid;
+ my @ids = split(/[ ]+,[ ]+/, $row[1]);
+ foreach $aid (@ids)
+ {
+ $active{$aid} = 1 unless exists $active{$aid};
+ }
+ }
+ if ($maxid eq '')
+ {
+ print STDERR "No Sync IDs\n" unless ($quiet);
+ return(0);
+ }
+ my $alist = join(',', keys %active);
+ my $sinfo = "logid < $maxid";
+ $sinfo .= " and logid not in ($alist)" if $alist ne '';
+
+ $sql = "delete from _RSERV_LOG_ where " .
+ "logtime < now() - '$howold second'::interval and $sinfo";
+
+ printf "$sql\n" if $debug;
+
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ $maxid = $result->cmdTuples;
+
+ $result = $conn->exec("COMMIT");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ return($maxid);
+}
+
+sub ApplySnapshot
+{
+ my ($conn, $inpf) = @_; # (@_[0], @_[1]);
+
+ my $result = $conn->exec("BEGIN");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ # MAP name --> oid, keyname, keynum
+ my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" .
+ " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" .
+ " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
+ " and pga.attnum = rt.key";
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ while (@row = $result->fetchrow)
+ {
+ # printf " %s %s\n", $row[1], $row[0];
+ push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
+ }
+
+ my $ok = 0;
+ my $syncid = '';
+ while(<$inpf>)
+ {
+ $_ =~ s/\n//;
+ my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3);
+ if ($cmt ne '--')
+ {
+ printf STDERR "Invalid format\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ if ($cmd eq 'DELETE')
+ {
+ if ($syncid eq '')
+ {
+ printf STDERR "Sync ID unspecified\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ $result = DoDelete($conn, $inpf, $prm);
+ if ($result)
+ {
+ $conn->exec("ROLLBACK");
+ return($result);
+ }
+ }
+ elsif ($cmd eq 'UPDATE')
+ {
+ if ($syncid eq '')
+ {
+ printf STDERR "Sync ID unspecified\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ $result = DoUpdate($conn, $inpf, $prm);
+ if ($result)
+ {
+ $conn->exec("ROLLBACK");
+ return($result);
+ }
+ }
+ elsif ($cmd eq 'SYNCID')
+ {
+ if ($syncid ne '')
+ {
+ printf STDERR "Second Sync ID ?!\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ if ($prm !~ /^\d+$/)
+ {
+ printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ $syncid = $prm;
+
+ printf STDERR "Sync ID $syncid\n" unless ($quiet);
+
+ $result = $conn->exec("select syncid, synctime from " .
+ "_RSERV_SLAVE_SYNC_ where syncid = " .
+ "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ my @row = $result->fetchrow;
+ if (! defined $row[0])
+ {
+ $result = $conn->exec("insert into" .
+ " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");
+ }
+ elsif ($row[0] >= $prm)
+ {
+ printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(0);
+ }
+ else
+ {
+ $result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
+ " set syncid = $syncid, synctime = now()");
+ }
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ }
+ elsif ($cmd eq 'OK')
+ {
+ $ok = 1;
+ last;
+ }
+ elsif ($cmd eq 'ERROR')
+ {
+ printf STDERR "ERROR signaled\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ else
+ {
+ printf STDERR "Unknown command $cmd\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+ }
+
+ if (! $ok)
+ {
+ printf STDERR "No OK flag in input\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-2);
+ }
+
+ $result = $conn->exec("COMMIT");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ return(1);
+}
+
+sub DoDelete
+{
+ my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
+
+ my $ok = 0;
+ while(<$inpf>)
+ {
+ if ($_ !~ /\n$/)
+ {
+ printf STDERR "Invalid format\n" unless ($quiet);
+ return(-2);
+ }
+ my $key = $_;
+ $key =~ s/\n//;
+ if ($key eq '\.')
+ {
+ $ok = 1;
+ last;
+ }
+
+ my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";
+
+ printf "$sql\n" if $debug;
+
+ my $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+ }
+
+ if (! $ok)
+ {
+ printf STDERR "No end of input in DELETE section\n" unless ($quiet);
+ return(-2);
+ }
+
+ return(0);
+}
+
+
+sub DoUpdate
+{
+ my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
+ my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
+
+ my @CopyBuf = ();
+ my $CBufLen = 0;
+ my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy
+
+ my $sql = "select attnum, attname from pg_attribute" .
+ " where attrelid = $Stables{$tabname}->[0] and attnum > 0";
+
+ my $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+
+ my @anames = ();
+ while (@row = $result->fetchrow)
+ {
+ $anames[$row[0]] = $row[1];
+ }
+
+ my $istring;
+ my $ok = 0;
+ while(<$inpf>)
+ {
+ if ($_ !~ /\n$/)
+ {
+ printf STDERR "Invalid format\n" unless ($quiet);
+ return(-2);
+ }
+ $istring = $_;
+ $istring =~ s/\n//;
+ if ($istring eq '\.')
+ {
+ $ok = 1;
+ last;
+ }
+ my @vals = split(/ /, $istring);
+ if ($oidkey)
+ {
+ if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
+ {
+ printf STDERR "Invalid OID\n" unless ($quiet);
+ return(-2);
+ }
+ $oidkey = $vals[0];
+ }
+ else
+ {
+ unshift @vals, '';
+ }
+
+ $sql = "update $tabname set ";
+ my $ocnt = 0;
+ for (my $i = 1; $i <= $#anames; $i++)
+ {
+ if ($vals[$i] eq '\N')
+ {
+ if ($i == $Stables{$tabname}->[2])
+ {
+ printf STDERR "NULL key\n" unless ($quiet);
+ return(-2);
+ }
+ $vals[$i] = 'null';
+ }
+ else
+ {
+ $vals[$i] = "'" . $vals[$i] . "'";
+ next if $i == $Stables{$tabname}->[2];
+ }
+ $ocnt++;
+ $sql .= ', ' if $ocnt > 1;
+ $sql .= "$anames[$i] = $vals[$i]";
+ }
+ if ($oidkey)
+ {
+ $sql .= " where $Stables{$tabname}->[1] = $oidkey";
+ }
+ else
+ {
+ $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";
+ }
+
+ printf "$sql\n" if $debug;
+
+ $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+ next if $result->cmdTuples == 1; # updated
+
+ if ($result->cmdTuples > 1)
+ {
+ printf STDERR "Duplicate keys\n" unless ($quiet);
+ return(-2);
+ }
+
+ # no key - copy
+ push @CopyBuf, "$istring\n";
+ $CBufLen += length($istring);
+
+ if ($CBufLen >= $CBufMax)
+ {
+ $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
+ return($result) if $result;
+ @CopyBuf = ();
+ $CBufLen = 0;
+ }
+ }
+
+ if (! $ok)
+ {
+ printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
+ return(-2);
+ }
+
+ if ($CBufLen)
+ {
+ $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
+ return($result) if $result;
+ }
+
+ return(0);
+}
+
+
+sub DoCopy
+{
+ my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
+
+ my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') .
+"FROM STDIN";
+ my $result = $conn->exec($sql);
+ if ($result->resultStatus ne PGRES_COPY_IN)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+
+ foreach $str (@{$CBuf})
+ {
+ $conn->putline($str);
+ }
+
+ $conn->putline("\\.\n");
+
+ if ($conn->endcopy)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+
+ return(0);
+
+}
+
+
+#
+# Returns last SyncID applied on Slave
+#
+sub GetSyncID
+{
+ my ($conn) = @_; # (@_[0]);
+
+ my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ return(-1);
+ }
+ my @row = $result->fetchrow;
+ return(undef) unless defined $row[0]; # null
+ return($row[0]);
+}
+
+#
+# Updates _RSERV_SYNC_ on Master with Slave SyncID
+#
+sub SyncSyncID
+{
+ my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
+
+ my $result = $conn->exec("BEGIN");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ $result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
+ " where server = $server and syncid = $syncid" .
+ " for update");
+ if ($result->resultStatus ne PGRES_TUPLES_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ my @row = $result->fetchrow;
+ if (! defined $row[0])
+ {
+ printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(0);
+ }
+ if ($row[1] > 0)
+ {
+ printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(0);
+ }
+ $result = $conn->exec("update _RSERV_SYNC_" .
+ " set synctime = now(), status = 1" .
+ " where server = $server and syncid = $syncid");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+ $result = $conn->exec("delete from _RSERV_SYNC_" .
+ " where server = $server and syncid < $syncid");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ $result = $conn->exec("COMMIT");
+ if ($result->resultStatus ne PGRES_COMMAND_OK)
+ {
+ print STDERR $conn->errorMessage unless ($quiet);
+ $conn->exec("ROLLBACK");
+ return(-1);
+ }
+
+ return(1);
+}
+
+1;