aboutsummaryrefslogtreecommitdiff
path: root/contrib/rserv/rserv.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/rserv/rserv.c')
-rw-r--r--contrib/rserv/rserv.c319
1 files changed, 319 insertions, 0 deletions
diff --git a/contrib/rserv/rserv.c b/contrib/rserv/rserv.c
new file mode 100644
index 00000000000..518dd68a539
--- /dev/null
+++ b/contrib/rserv/rserv.c
@@ -0,0 +1,319 @@
+/* rserv.c
+ * Support functions for erServer replication.
+ * (c) 2000 Vadim Mikheev, PostgreSQL Inc.
+ */
+
+#include "executor/spi.h" /* this is what you need to work with SPI */
+#include "commands/trigger.h" /* -"- and triggers */
+#include "utils/tqual.h" /* -"- and SnapshotData */
+#include <ctype.h> /* tolower () */
+
+#ifdef PG_FUNCTION_INFO_V1
+#define CurrentTriggerData ((TriggerData *) fcinfo->context)
+#endif
+
+#ifdef PG_FUNCTION_INFO_V1
+PG_FUNCTION_INFO_V1(_rserv_log_);
+PG_FUNCTION_INFO_V1(_rserv_sync_);
+PG_FUNCTION_INFO_V1(_rserv_debug_);
+Datum _rserv_log_(PG_FUNCTION_ARGS);
+Datum _rserv_sync_(PG_FUNCTION_ARGS);
+Datum _rserv_debug_(PG_FUNCTION_ARGS);
+#else
+HeapTuple _rserv_log_(void);
+int32 _rserv_sync_(int32);
+int32 _rserv_debug_(int32);
+#endif
+
+static int debug = 0;
+
+static char* OutputValue(char *key, char *buf, int size);
+
+#ifdef PG_FUNCTION_INFO_V1
+Datum
+_rserv_log_(PG_FUNCTION_ARGS)
+#else
+HeapTuple
+_rserv_log_()
+#endif
+{
+ Trigger *trigger; /* to get trigger name */
+ int nargs; /* # of args specified in CREATE TRIGGER */
+ char **args; /* argument: argnum */
+ Relation rel; /* triggered relation */
+ HeapTuple tuple; /* tuple to return */
+ HeapTuple newtuple = NULL;/* tuple to return */
+ TupleDesc tupdesc; /* tuple description */
+ int keynum;
+ char *key;
+ char *okey;
+ char *newkey = NULL;
+ int deleted;
+ char sql[8192];
+ char outbuf[8192];
+ char oidbuf[64];
+ int ret;
+
+ /* Called by trigger manager ? */
+ if (!CurrentTriggerData)
+ elog(ERROR, "_rserv_log_: triggers are not initialized");
+
+ /* Should be called for ROW trigger */
+ if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event))
+ elog(ERROR, "_rserv_log_: can't process STATEMENT events");
+
+ tuple = CurrentTriggerData->tg_trigtuple;
+
+ trigger = CurrentTriggerData->tg_trigger;
+ nargs = trigger->tgnargs;
+ args = trigger->tgargs;
+
+ if (nargs != 1) /* odd number of arguments! */
+ elog(ERROR, "_rserv_log_: need in *one* argument");
+
+ keynum = atoi(args[0]);
+
+ if (keynum < 0 && keynum != ObjectIdAttributeNumber)
+ elog(ERROR, "_rserv_log_: invalid keynum %d", keynum);
+
+ rel = CurrentTriggerData->tg_relation;
+ tupdesc = rel->rd_att;
+
+ deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ?
+ 1 : 0;
+
+ if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event))
+ newtuple = CurrentTriggerData->tg_newtuple;
+
+ /*
+ * Setting CurrentTriggerData to NULL prevents direct calls to trigger
+ * functions in queries. Normally, trigger functions have to be called
+ * by trigger manager code only.
+ */
+ CurrentTriggerData = NULL;
+
+ /* Connect to SPI manager */
+ if ((ret = SPI_connect()) < 0)
+ elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret);
+
+ if (keynum == ObjectIdAttributeNumber)
+ {
+ sprintf(oidbuf, "%u", tuple->t_data->t_oid);
+ key = oidbuf;
+ }
+ else
+ key = SPI_getvalue(tuple, tupdesc, keynum);
+
+ if (key == NULL)
+ elog(ERROR, "_rserv_log_: key must be not null");
+
+ if (newtuple && keynum != ObjectIdAttributeNumber)
+ {
+ newkey = SPI_getvalue(newtuple, tupdesc, keynum);
+ if (newkey == NULL)
+ elog(ERROR, "_rserv_log_: key must be not null");
+ if (strcmp(newkey, key) == 0)
+ newkey = NULL;
+ else
+ deleted = 1; /* old key was deleted */
+ }
+
+ if (strpbrk(key, "\\ \n'"))
+ okey = OutputValue(key, outbuf, sizeof(outbuf));
+ else
+ okey = key;
+
+ sprintf(sql, "update _RSERV_LOG_ set logid = %d, logtime = now(), "
+ "deleted = %d where reloid = %u and key = '%s'",
+ GetCurrentTransactionId(), deleted, rel->rd_id, okey);
+
+ if (debug)
+ elog(NOTICE, sql);
+
+ ret = SPI_exec(sql, 0);
+
+ if (ret < 0)
+ elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret);
+
+ /*
+ * If no tuple was UPDATEd then do INSERT...
+ */
+ if (SPI_processed > 1)
+ elog(ERROR, "_rserv_log_: duplicate tuples");
+ else if (SPI_processed == 0)
+ {
+ sprintf(sql, "insert into _RSERV_LOG_ "
+ "(reloid, logid, logtime, deleted, key) "
+ "values (%u, %d, now(), %d, '%s')",
+ rel->rd_id, GetCurrentTransactionId(),
+ deleted, okey);
+
+ if (debug)
+ elog(NOTICE, sql);
+
+ ret = SPI_exec(sql, 0);
+
+ if (ret < 0)
+ elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret);
+ }
+
+ if (okey != key && okey != outbuf)
+ pfree(okey);
+
+ if (newkey)
+ {
+ if (strpbrk(newkey, "\\ \n'"))
+ okey = OutputValue(newkey, outbuf, sizeof(outbuf));
+ else
+ okey = newkey;
+
+ sprintf(sql, "insert into _RSERV_LOG_ "
+ "(reloid, logid, logtime, deleted, key) "
+ "values (%u, %d, now(), 0, '%s')",
+ rel->rd_id, GetCurrentTransactionId(), okey);
+
+ if (debug)
+ elog(NOTICE, sql);
+
+ ret = SPI_exec(sql, 0);
+
+ if (ret < 0)
+ elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret);
+
+ if (okey != newkey && okey != outbuf)
+ pfree(okey);
+ }
+
+ SPI_finish();
+
+#ifdef PG_FUNCTION_INFO_V1
+ return (PointerGetDatum(tuple));
+#else
+ return (tuple);
+#endif
+}
+
+#ifdef PG_FUNCTION_INFO_V1
+Datum
+_rserv_sync_(PG_FUNCTION_ARGS)
+#else
+int32
+_rserv_sync_(int32 server)
+#endif
+{
+#ifdef PG_FUNCTION_INFO_V1
+ int32 server = PG_GETARG_INT32(0);
+#endif
+ char sql[8192];
+ char buf[8192];
+ char *active = buf;
+ uint32 xcnt;
+ int ret;
+
+ if (SerializableSnapshot == NULL)
+ elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL");
+
+ buf[0] = 0;
+ for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++)
+ {
+ sprintf(buf + strlen(buf), "%s%u", (xcnt) ? ", " : "",
+ SerializableSnapshot->xip[xcnt]);
+ }
+
+ if ((ret = SPI_connect()) < 0)
+ elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret);
+
+ sprintf(sql, "insert into _RSERV_SYNC_ "
+ "(server, syncid, synctime, status, minid, maxid, active) "
+ "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')",
+ server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active);
+
+ ret = SPI_exec(sql, 0);
+
+ if (ret < 0)
+ elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret);
+
+ SPI_finish();
+
+ return (0);
+}
+
+#ifdef PG_FUNCTION_INFO_V1
+Datum
+_rserv_debug_(PG_FUNCTION_ARGS)
+#else
+int32
+_rserv_debug_(int32 newval)
+#endif
+{
+#ifdef PG_FUNCTION_INFO_V1
+ int32 newval = PG_GETARG_INT32(0);
+#endif
+ int32 oldval = debug;
+
+ debug = newval;
+
+ return (oldval);
+}
+
+#define ExtendBy 1024
+
+static char*
+OutputValue(char *key, char *buf, int size)
+{
+ int i = 0;
+ char *out = buf;
+ char *subst = NULL;
+ int slen = 0;
+
+ size--;
+ for ( ; ; )
+ {
+ switch (*key)
+ {
+ case '\\': subst ="\\\\";
+ slen = 2;
+ break;
+ case ' ': subst = "\\011";
+ slen = 4;
+ break;
+ case '\n': subst = "\\012";
+ slen = 4;
+ break;
+ case '\'': subst = "\\047";
+ slen = 4;
+ break;
+ case '\0': out[i] = 0;
+ return(out);
+ default: slen = 1;
+ break;
+ }
+
+ if (i + slen >= size)
+ {
+ if (out == buf)
+ {
+ out = (char*) palloc(size + ExtendBy);
+ strncpy(out, buf, i);
+ size += ExtendBy;
+ }
+ else
+ {
+ out = (char*) repalloc(out, size + ExtendBy);
+ size += ExtendBy;
+ }
+ }
+
+ if (slen == 1)
+ out[i++] = *key;
+ else
+ {
+ memcpy(out + i, subst, slen);
+ i += slen;
+ }
+ key++;
+ }
+
+ return(out);
+
+}