aboutsummaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/access/heapam.h3
-rw-r--r--src/include/access/heapam_xlog.h14
-rw-r--r--src/include/access/rewriteheap.h29
-rw-r--r--src/include/access/transam.h5
-rw-r--r--src/include/access/tuptoaster.h27
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.h12
-rw-r--r--src/include/commands/vacuum.h4
-rw-r--r--src/include/replication/decode.h19
-rw-r--r--src/include/replication/logical.h100
-rw-r--r--src/include/replication/logicalfuncs.h24
-rw-r--r--src/include/replication/output_plugin.h98
-rw-r--r--src/include/replication/reorderbuffer.h351
-rw-r--r--src/include/replication/slot.h64
-rw-r--r--src/include/replication/snapbuild.h83
-rw-r--r--src/include/storage/itemptr.h3
-rw-r--r--src/include/storage/proc.h6
-rw-r--r--src/include/storage/procarray.h10
-rw-r--r--src/include/storage/sinval.h2
-rw-r--r--src/include/utils/inval.h1
-rw-r--r--src/include/utils/snapmgr.h11
-rw-r--r--src/include/utils/snapshot.h32
-rw-r--r--src/include/utils/tqual.h15
24 files changed, 897 insertions, 19 deletions
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index bfdadc3d5bb..0f802577c70 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -164,8 +164,7 @@ extern void heap_restrpos(HeapScanDesc scan);
extern void heap_sync(Relation relation);
/* in heap/pruneheap.c */
-extern void heap_page_prune_opt(Relation relation, Buffer buffer,
- TransactionId OldestXmin);
+extern void heap_page_prune_opt(Relation relation, Buffer buffer);
extern int heap_page_prune(Relation relation, Buffer buffer,
TransactionId OldestXmin,
bool report_stats, TransactionId *latestRemovedXid);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index d4383ab2cbe..194635952cb 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -48,7 +48,7 @@
* the ones above associated with RM_HEAP_ID. XLOG_HEAP_OPMASK applies to
* these, too.
*/
-/* 0x00 is free, was XLOG_HEAP2_FREEZE */
+#define XLOG_HEAP2_REWRITE 0x00
#define XLOG_HEAP2_CLEAN 0x10
#define XLOG_HEAP2_FREEZE_PAGE 0x20
#define XLOG_HEAP2_CLEANUP_INFO 0x30
@@ -332,6 +332,17 @@ typedef struct xl_heap_new_cid
xl_heaptid target;
} xl_heap_new_cid;
+/* logical rewrite xlog record header */
+typedef struct xl_heap_rewrite_mapping
+{
+ TransactionId mapped_xid; /* xid that might need to see the row */
+ Oid mapped_db; /* DbOid or InvalidOid for shared rels */
+ Oid mapped_rel; /* Oid of the mapped relation */
+ off_t offset; /* How far have we written so far */
+ uint32 num_mappings; /* Number of in-memory mappings */
+ XLogRecPtr start_lsn; /* Insert LSN at begin of rewrite */
+} xl_heap_rewrite_mapping;
+
#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid)
extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
@@ -341,6 +352,7 @@ extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void heap2_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void heap2_desc(StringInfo buf, uint8 xl_info, char *rec);
+extern void heap_xlog_logical_rewrite(XLogRecPtr lsn, XLogRecord *r);
extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
TransactionId latestRemovedXid);
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index d098a0b1711..07df3b4f2b0 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -14,12 +14,14 @@
#define REWRITE_HEAP_H
#include "access/htup.h"
+#include "storage/itemptr.h"
+#include "storage/relfilenode.h"
#include "utils/relcache.h"
/* struct definition is private to rewriteheap.c */
typedef struct RewriteStateData *RewriteState;
-extern RewriteState begin_heap_rewrite(Relation NewHeap,
+extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap,
TransactionId OldestXmin, TransactionId FreezeXid,
MultiXactId MultiXactCutoff, bool use_wal);
extern void end_heap_rewrite(RewriteState state);
@@ -27,4 +29,29 @@ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
HeapTuple newTuple);
extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
+/*
+ * On-Disk data format for an individual logical rewrite mapping.
+ */
+typedef struct LogicalRewriteMappingData
+{
+ RelFileNode old_node;
+ RelFileNode new_node;
+ ItemPointerData old_tid;
+ ItemPointerData new_tid;
+} LogicalRewriteMappingData;
+
+/* ---
+ * The filename consists out of the following, dash separated,
+ * components:
+ * 1) database oid or InvalidOid for shared relations
+ * 2) the oid of the relation
+ * 3) xid we are mapping for
+ * 4) upper 32bit of the LSN at which a rewrite started
+ * 5) lower 32bit of the LSN at which a rewrite started
+ * 6) xid of the xact performing the mapping
+ * ---
+ */
+#define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
+void CheckPointLogicalRewriteHeap(void);
+
#endif /* REWRITE_HEAP_H */
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 8376dfd669b..a9774e9f593 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -63,6 +63,11 @@
(AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
(int32) ((id1) - (id2)) < 0)
+/* compare two XIDs already known to be normal; this is a macro for speed */
+#define NormalTransactionIdFollows(id1, id2) \
+ (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
+ (int32) ((id1) - (id2)) > 0)
+
/* ----------
* Object ID (OID) zero is InvalidOid.
*
diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h
index 5adf4f28169..296d016c9fc 100644
--- a/src/include/access/tuptoaster.h
+++ b/src/include/access/tuptoaster.h
@@ -98,9 +98,34 @@
/* Size of an EXTERNAL datum that contains a standard TOAST pointer */
#define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external))
-/* Size of an indirect datum that contains an indirect TOAST pointer */
+/* Size of an indirect datum that contains a standard TOAST pointer */
#define INDIRECT_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_indirect))
+/*
+ * Testing whether an externally-stored value is compressed now requires
+ * comparing extsize (the actual length of the external data) to rawsize
+ * (the original uncompressed datum's size). The latter includes VARHDRSZ
+ * overhead, the former doesn't. We never use compression unless it actually
+ * saves space, so we expect either equality or less-than.
+ */
+#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
+ ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
+
+/*
+ * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
+ * into a local "struct varatt_external" toast pointer. This should be
+ * just a memcpy, but some versions of gcc seem to produce broken code
+ * that assumes the datum contents are aligned. Introducing an explicit
+ * intermediate "varattrib_1b_e *" variable seems to fix it.
+ */
+#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
+do { \
+ varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
+ Assert(VARATT_IS_EXTERNAL(attre)); \
+ Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \
+ memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
+} while (0)
+
/* ----------
* toast_insert_or_update -
*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 11ab2771990..a238292b76e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -288,6 +288,7 @@ extern int XLogFileOpen(XLogSegNo segno);
extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
+extern XLogSegNo XLogGetLastRemovedSegno(void);
extern void XLogSetAsyncXactLSN(XLogRecPtr record);
extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 80560574bf5..22dd0fc58e8 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201403031
+#define CATALOG_VERSION_NO 201403032
#endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 7a11721ba44..c5706518722 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4804,8 +4804,18 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
-DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,25,26,16,28,3220}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
+DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slotname,plugin,slotname,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DESCR("set up a logical replication slot");
+DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
+DESCR("get changes from replication slot");
+DATA(insert OID = 3783 ( pg_logical_slot_get_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_binary_changes _null_ _null_ _null_ ));
+DESCR("get binary changes from replication slot");
+DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_changes _null_ _null_ _null_ ));
+DESCR("peek at changes from replication slot");
+DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
+DESCR("peek at binary changes from replication slot");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 70350e02cb2..058dc5f6675 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -157,10 +157,10 @@ extern void vac_update_relstats(Relation relation,
bool hasindex,
TransactionId frozenxid,
MultiXactId minmulti);
-extern void vacuum_set_xid_limits(int freeze_min_age, int freeze_table_age,
+extern void vacuum_set_xid_limits(Relation rel,
+ int freeze_min_age, int freeze_table_age,
int multixact_freeze_min_age,
int multixact_freeze_table_age,
- bool sharedRel,
TransactionId *oldestXmin,
TransactionId *freezeLimit,
TransactionId *xidFullScanLimit,
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 00000000000..7f55d789a23
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ * PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/reorderbuffer.h"
+#include "replication/logical.h"
+
+void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+ XLogRecord *record);
+
+#endif
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644
index 00000000000..e65c8b8075f
--- /dev/null
+++ b/src/include/replication/logical.h
@@ -0,0 +1,100 @@
+/*-------------------------------------------------------------------------
+ * logical.h
+ * PostgreSQL logical decoding coordination
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "replication/slot.h"
+
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "replication/output_plugin.h"
+
+struct LogicalDecodingContext;
+
+typedef void (*LogicalOutputPluginWriterWrite) (
+ struct LogicalDecodingContext *lr,
+ XLogRecPtr Ptr,
+ TransactionId xid,
+ bool last_write
+);
+
+typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
+
+typedef struct LogicalDecodingContext
+{
+ /* memory context this is all allocated in */
+ MemoryContext context;
+
+ /* infrastructure pieces */
+ XLogReaderState *reader;
+ ReplicationSlot *slot;
+ struct ReorderBuffer *reorder;
+ struct SnapBuild *snapshot_builder;
+
+ OutputPluginCallbacks callbacks;
+ OutputPluginOptions options;
+
+ /*
+ * User specified options
+ */
+ List *output_plugin_options;
+
+ /*
+ * User-Provided callback for writing/streaming out data.
+ */
+ LogicalOutputPluginWriterPrepareWrite prepare_write;
+ LogicalOutputPluginWriterWrite write;
+
+ /*
+ * Output buffer.
+ */
+ StringInfo out;
+
+ /*
+ * Private data pointer of the output plugin.
+ */
+ void *output_plugin_private;
+
+ /*
+ * Private data pointer for the data writer.
+ */
+ void *output_writer_private;
+
+ /*
+ * State for writing output.
+ */
+ bool accept_writes;
+ bool prepared_write;
+ XLogRecPtr write_location;
+ TransactionId write_xid;
+} LogicalDecodingContext;
+
+extern void CheckLogicalDecodingRequirements(void);
+
+extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
+ List *output_plugin_options,
+ XLogPageReadCB read_page,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write);
+extern LogicalDecodingContext *CreateDecodingContext(
+ XLogRecPtr start_lsn,
+ List *output_plugin_options,
+ XLogPageReadCB read_page,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write);
+extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
+extern bool DecodingContextReady(LogicalDecodingContext *ctx);
+extern void FreeDecodingContext(LogicalDecodingContext *ctx);
+
+extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
+extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
+ XLogRecPtr restart_lsn);
+extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+
+#endif
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
new file mode 100644
index 00000000000..21bf44ec4b7
--- /dev/null
+++ b/src/include/replication/logicalfuncs.h
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ * logicalfuncs.h
+ * PostgreSQL WAL to logical transformation support functions
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALFUNCS_H
+#define LOGICALFUNCS_H
+
+#include "replication/logical.h"
+
+extern int logical_read_local_xlog_page(XLogReaderState *state,
+ XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr,
+ char *cur_page, TimeLineID *pageTLI);
+
+extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+
+#endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
new file mode 100644
index 00000000000..c47c24c8dbe
--- /dev/null
+++ b/src/include/replication/output_plugin.h
@@ -0,0 +1,98 @@
+/*-------------------------------------------------------------------------
+ * output_plugin.h
+ * PostgreSQL Logical Decode Plugin Interface
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef OUTPUT_PLUGIN_H
+#define OUTPUT_PLUGIN_H
+
+#include "replication/reorderbuffer.h"
+
+struct LogicalDecodingContext;
+struct OutputPluginCallbacks;
+
+typedef enum OutputPluginOutputType
+{
+ OUTPUT_PLUGIN_BINARY_OUTPUT,
+ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
+} OutputPluginOutputType;
+
+/*
+ * Options set by the output plugin, in the startup callback.
+ */
+typedef struct OutputPluginOptions
+{
+ OutputPluginOutputType output_type;
+} OutputPluginOptions;
+
+/*
+ * Type of the shared library symbol _PG_output_plugin_init that is looked up
+ * when loading an output plugin shared library.
+ */
+typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
+
+/*
+ * Callback that gets called in a user-defined plugin. ctx->private_data can
+ * be set to some private data.
+ *
+ * "is_init" will be set to "true" if the decoding slot just got defined. When
+ * the same slot is used from there one, it will be "false".
+ */
+typedef void (*LogicalDecodeStartupCB) (
+ struct LogicalDecodingContext *ctx,
+ OutputPluginOptions *options,
+ bool is_init
+);
+
+/*
+ * Callback called for every (explicit or implicit) BEGIN of a successful
+ * transaction.
+ */
+typedef void (*LogicalDecodeBeginCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn);
+
+/*
+ * Callback for every individual change in a successful transaction.
+ */
+typedef void (*LogicalDecodeChangeCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change
+);
+
+/*
+ * Called for every (explicit or implicit) COMMIT of a successful transaction.
+ */
+typedef void (*LogicalDecodeCommitCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called to shutdown an output plugin.
+ */
+typedef void (*LogicalDecodeShutdownCB) (
+ struct LogicalDecodingContext *
+);
+
+/*
+ * Output plugin callbacks
+ */
+typedef struct OutputPluginCallbacks
+{
+ LogicalDecodeStartupCB startup_cb;
+ LogicalDecodeBeginCB begin_cb;
+ LogicalDecodeChangeCB change_cb;
+ LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeShutdownCB shutdown_cb;
+} OutputPluginCallbacks;
+
+void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
+void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+
+#endif /* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
new file mode 100644
index 00000000000..01eabfb7be7
--- /dev/null
+++ b/src/include/replication/reorderbuffer.h
@@ -0,0 +1,351 @@
+/*
+ * reorderbuffer.h
+ * PostgreSQL logical replay/reorder buffer management.
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * src/include/replication/reorderbuffer.h
+ */
+#ifndef REORDERBUFFER_H
+#define REORDERBUFFER_H
+
+#include "access/htup_details.h"
+
+#include "lib/ilist.h"
+
+#include "storage/sinval.h"
+
+#include "utils/hsearch.h"
+#include "utils/rel.h"
+#include "utils/snapshot.h"
+#include "utils/timestamp.h"
+
+/* an individual tuple, stored in one chunk of memory */
+typedef struct ReorderBufferTupleBuf
+{
+ /* position in preallocated list */
+ slist_node node;
+
+ /* tuple, stored sequentially */
+ HeapTupleData tuple;
+ HeapTupleHeaderData header;
+ char data[MaxHeapTupleSize];
+} ReorderBufferTupleBuf;
+
+/* types of the change passed to a 'change' callback */
+enum ReorderBufferChangeType
+{
+ REORDER_BUFFER_CHANGE_INSERT,
+ REORDER_BUFFER_CHANGE_UPDATE,
+ REORDER_BUFFER_CHANGE_DELETE
+};
+
+/*
+ * a single 'change', can be an insert (with one tuple), an update (old, new),
+ * or a delete (old).
+ *
+ * The same struct is also used internally for other purposes but that should
+ * never be visible outside reorderbuffer.c.
+ */
+typedef struct ReorderBufferChange
+{
+ XLogRecPtr lsn;
+
+ /* type of change */
+ union
+ {
+ enum ReorderBufferChangeType action;
+ /* do not leak internal enum values to the outside */
+ int action_internal;
+ };
+
+ /*
+ * Context data for the change, which part of the union is valid depends
+ * on action/action_internal.
+ */
+ union
+ {
+ /* old, new tuples when action == *_INSERT|UPDATE|DELETE */
+ struct
+ {
+ /* relation that has been changed */
+ RelFileNode relnode;
+ /* valid for DELETE || UPDATE */
+ ReorderBufferTupleBuf *oldtuple;
+ /* valid for INSERT || UPDATE */
+ ReorderBufferTupleBuf *newtuple;
+ } tp;
+
+ /* new snapshot */
+ Snapshot snapshot;
+
+ /* new command id for existing snapshot in a catalog changing tx */
+ CommandId command_id;
+
+ /* new cid mapping for catalog changing transaction */
+ struct
+ {
+ RelFileNode node;
+ ItemPointerData tid;
+ CommandId cmin;
+ CommandId cmax;
+ CommandId combocid;
+ } tuplecid;
+ };
+
+ /*
+ * While in use this is how a change is linked into a transactions,
+ * otherwise it's the preallocated list.
+ */
+ dlist_node node;
+} ReorderBufferChange;
+
+typedef struct ReorderBufferTXN
+{
+ /*
+ * The transactions transaction id, can be a toplevel or sub xid.
+ */
+ TransactionId xid;
+
+ /* did the TX have catalog changes */
+ bool has_catalog_changes;
+
+ /*
+ * Do we know this is a subxact?
+ */
+ bool is_known_as_subxact;
+
+ /*
+ * LSN of the first data carrying, WAL record with knowledge about this
+ * xid. This is allowed to *not* be first record adorned with this xid, if
+ * the previous records aren't relevant for logical decoding.
+ */
+ XLogRecPtr first_lsn;
+
+ /* ----
+ * LSN of the record that lead to this xact to be committed or
+ * aborted. This can be a
+ * * plain commit record
+ * * plain commit record, of a parent transaction
+ * * prepared transaction commit
+ * * plain abort record
+ * * prepared transaction abort
+ * * error during decoding
+ * ----
+ */
+ XLogRecPtr final_lsn;
+
+ /*
+ * LSN pointing to the end of the commit record + 1.
+ */
+ XLogRecPtr end_lsn;
+
+ /*
+ * LSN of the last lsn at which snapshot information reside, so we can
+ * restart decoding from there and fully recover this transaction from
+ * WAL.
+ */
+ XLogRecPtr restart_decoding_lsn;
+
+ /*
+ * Commit time, only known when we read the actual commit record.
+ */
+ TimestampTz commit_time;
+
+ /*
+ * Base snapshot or NULL.
+ */
+ Snapshot base_snapshot;
+ XLogRecPtr base_snapshot_lsn;
+
+ /*
+ * How many ReorderBufferChange's do we have in this txn.
+ *
+ * Changes in subtransactions are *not* included but tracked separately.
+ */
+ uint64 nentries;
+
+ /*
+ * How many of the above entries are stored in memory in contrast to being
+ * spilled to disk.
+ */
+ uint64 nentries_mem;
+
+ /*
+ * List of ReorderBufferChange structs, including new Snapshots and new
+ * CommandIds
+ */
+ dlist_head changes;
+
+ /*
+ * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
+ * Those are always assigned to the toplevel transaction. (Keep track of
+ * #entries to create a hash of the right size)
+ */
+ dlist_head tuplecids;
+ uint64 ntuplecids;
+
+ /*
+ * On-demand built hash for looking up the above values.
+ */
+ HTAB *tuplecid_hash;
+
+ /*
+ * Hash containing (potentially partial) toast entries. NULL if no toast
+ * tuples have been found for the current change.
+ */
+ HTAB *toast_hash;
+
+ /*
+ * non-hierarchical list of subtransactions that are *not* aborted. Only
+ * used in toplevel transactions.
+ */
+ dlist_head subtxns;
+ uint32 nsubtxns;
+
+ /*
+ * Stored cache invalidations. This is not a linked list because we get
+ * all the invalidations at once.
+ */
+ uint32 ninvalidations;
+ SharedInvalidationMessage *invalidations;
+
+ /* ---
+ * Position in one of three lists:
+ * * list of subtransactions if we are *known* to be subxact
+ * * list of toplevel xacts (can be a as-yet unknown subxact)
+ * * list of preallocated ReorderBufferTXNs
+ * ---
+ */
+ dlist_node node;
+
+} ReorderBufferTXN;
+
+/* so we can define the callbacks used inside struct ReorderBuffer itself */
+typedef struct ReorderBuffer ReorderBuffer;
+
+/* change callback signature */
+typedef void (*ReorderBufferApplyChangeCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/* begin callback signature */
+typedef void (*ReorderBufferBeginCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+/* commit callback signature */
+typedef void (*ReorderBufferCommitCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+struct ReorderBuffer
+{
+ /*
+ * xid => ReorderBufferTXN lookup table
+ */
+ HTAB *by_txn;
+
+ /*
+ * Transactions that could be a toplevel xact, ordered by LSN of the first
+ * record bearing that xid..
+ */
+ dlist_head toplevel_by_lsn;
+
+ /*
+ * one-entry sized cache for by_txn. Very frequently the same txn gets
+ * looked up over and over again.
+ */
+ TransactionId by_txn_last_xid;
+ ReorderBufferTXN *by_txn_last_txn;
+
+ /*
+ * Callacks to be called when a transactions commits.
+ */
+ ReorderBufferBeginCB begin;
+ ReorderBufferApplyChangeCB apply_change;
+ ReorderBufferCommitCB commit;
+
+ /*
+ * Pointer that will be passed untouched to the callbacks.
+ */
+ void *private_data;
+
+ /*
+ * Private memory context.
+ */
+ MemoryContext context;
+
+ /*
+ * Data structure slab cache.
+ *
+ * We allocate/deallocate some structures very frequently, to avoid bigger
+ * overhead we cache some unused ones here.
+ *
+ * The maximum number of cached entries is controlled by const variables
+ * ontop of reorderbuffer.c
+ */
+
+ /* cached ReorderBufferTXNs */
+ dlist_head cached_transactions;
+ Size nr_cached_transactions;
+
+ /* cached ReorderBufferChanges */
+ dlist_head cached_changes;
+ Size nr_cached_changes;
+
+ /* cached ReorderBufferTupleBufs */
+ slist_head cached_tuplebufs;
+ Size nr_cached_tuplebufs;
+
+ XLogRecPtr current_restart_decoding_lsn;
+
+ /* buffer for disk<->memory conversions */
+ char *outbuf;
+ Size outbufsize;
+};
+
+
+ReorderBuffer *ReorderBufferAllocate(void);
+void ReorderBufferFree(ReorderBuffer *);
+
+ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
+void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
+ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
+void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
+
+void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void ReorderBufferCommit(ReorderBuffer *, TransactionId,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time);
+void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
+void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
+void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
+void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+
+void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
+void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
+void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ CommandId cid);
+void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ RelFileNode node, ItemPointerData pt,
+ CommandId cmin, CommandId cmax, CommandId combocid);
+void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ Size nmsgs, SharedInvalidationMessage *msgs);
+bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid);
+void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
+bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
+
+ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
+
+void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
+
+void StartupReorderBuffer(void);
+
+#endif
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 089b0f4b70c..c354c9133bf 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -16,6 +16,24 @@
#include "storage/shmem.h"
#include "storage/spin.h"
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts.
+ *
+ * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
+ */
+typedef enum ReplicationSlotPersistency
+{
+ RS_PERSISTENT,
+ RS_EPHEMERAL
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
typedef struct ReplicationSlotPersistentData
{
/* The slot's identifier */
@@ -25,6 +43,11 @@ typedef struct ReplicationSlotPersistentData
Oid database;
/*
+ * The slot's behaviour when being dropped (or restored after a crash).
+ */
+ ReplicationSlotPersistency persistency;
+
+ /*
* xmin horizon for data
*
* NB: This may represent a value that hasn't been written to disk yet;
@@ -32,9 +55,22 @@ typedef struct ReplicationSlotPersistentData
*/
TransactionId xmin;
+ /*
+ * xmin horizon for catalog tuples
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId catalog_xmin;
+
/* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn;
+ /* oldest LSN that the client has acked receipt for */
+ XLogRecPtr confirmed_flush;
+
+ /* plugin name */
+ NameData plugin;
} ReplicationSlotPersistentData;
/*
@@ -67,12 +103,26 @@ typedef struct ReplicationSlot
* same as the persistent value (data.xmin).
*/
TransactionId effective_xmin;
+ TransactionId effective_catalog_xmin;
/* data surviving shutdowns and crashes */
ReplicationSlotPersistentData data;
/* is somebody performing io on this slot? */
LWLock *io_in_progress_lock;
+
+ /* all the remaining data is only used for logical slots */
+
+ /* ----
+ * When the client has confirmed flushes >= candidate_xmin_lsn we can
+ * advance the catalog xmin, when restart_valid has been passed,
+ * restart_lsn can be increased.
+ * ----
+ */
+ TransactionId candidate_catalog_xmin;
+ XLogRecPtr candidate_xmin_lsn;
+ XLogRecPtr candidate_restart_valid;
+ XLogRecPtr candidate_restart_lsn;
} ReplicationSlot;
/*
@@ -97,8 +147,11 @@ extern Size ReplicationSlotsShmemSize(void);
extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
-extern void ReplicationSlotCreate(const char *name, bool db_specific);
+extern void ReplicationSlotCreate(const char *name, bool db_specific,
+ ReplicationSlotPersistency p);
+extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name);
+
extern void ReplicationSlotAcquire(const char *name);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotSave(void);
@@ -106,15 +159,20 @@ extern void ReplicationSlotMarkDirty(void);
/* misc stuff */
extern bool ReplicationSlotValidateName(const char *name, int elevel);
-extern void ReplicationSlotsComputeRequiredXmin(void);
+extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void);
+extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
+extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo);
extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
-extern void ReplicationSlotAtProcExit(void);
/* SQL callable functions */
+extern Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
+extern Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS);
+extern Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
#endif /* SLOT_H */
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
new file mode 100644
index 00000000000..087c0e510d5
--- /dev/null
+++ b/src/include/replication/snapbuild.h
@@ -0,0 +1,83 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild.h
+ * Exports from replication/logical/snapbuild.c.
+ *
+ * Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * src/include/replication/snapbuild.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SNAPBUILD_H
+#define SNAPBUILD_H
+
+#include "access/xlogdefs.h"
+#include "utils/snapmgr.h"
+
+typedef enum
+{
+ /*
+ * Initial state, we can't do much yet.
+ */
+ SNAPBUILD_START,
+
+ /*
+ * We have collected enough information to decode tuples in transactions
+ * that started after this.
+ *
+ * Once we reached this we start to collect changes. We cannot apply them
+ * yet because the might be based on transactions that were still running
+ * when we reached them yet.
+ */
+ SNAPBUILD_FULL_SNAPSHOT,
+
+ /*
+ * Found a point after hitting built_full_snapshot where all transactions
+ * that were running at that point finished. Till we reach that we hold
+ * off calling any commit callbacks.
+ */
+ SNAPBUILD_CONSISTENT
+} SnapBuildState;
+
+/* forward declare so we don't have to expose the struct to the public */
+struct SnapBuild;
+typedef struct SnapBuild SnapBuild;
+
+/* forward declare so we don't have to include reorderbuffer.h */
+struct ReorderBuffer;
+
+/* forward declare so we don't have to include heapam_xlog.h */
+struct xl_heap_new_cid;
+struct xl_running_xacts;
+
+extern void CheckPointSnapBuild(void);
+
+extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
+ TransactionId xmin_horizon, XLogRecPtr start_lsn);
+extern void FreeSnapshotBuilder(SnapBuild *cache);
+
+extern void SnapBuildSnapDecRefcount(Snapshot snap);
+
+extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
+extern void SnapBuildClearExportedSnapshot(void);
+
+extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+
+extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+
+extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
+ TransactionId xid, int nsubxacts,
+ TransactionId *subxacts);
+extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
+ TransactionId xid, int nsubxacts,
+ TransactionId *subxacts);
+extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
+ XLogRecPtr lsn);
+extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
+ XLogRecPtr lsn, struct xl_heap_new_cid *cid);
+extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
+ struct xl_running_xacts *running);
+extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
+
+#endif /* SNAPBUILD_H */
diff --git a/src/include/storage/itemptr.h b/src/include/storage/itemptr.h
index 67bbdbb988a..0b81d53f5f8 100644
--- a/src/include/storage/itemptr.h
+++ b/src/include/storage/itemptr.h
@@ -116,6 +116,9 @@ typedef ItemPointerData *ItemPointer;
/*
* ItemPointerCopy
* Copies the contents of one disk item pointer to another.
+ *
+ * Should there ever be padding in an ItemPointer this would need to be handled
+ * differently as it's used as hash key.
*/
#define ItemPointerCopy(fromPointer, toPointer) \
( \
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index a3cadd9a017..5218b448cd6 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -41,10 +41,12 @@ struct XidCache
#define PROC_IS_AUTOVACUUM 0x01 /* is it an autovac worker? */
#define PROC_IN_VACUUM 0x02 /* currently running lazy vacuum */
#define PROC_IN_ANALYZE 0x04 /* currently running analyze */
-#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */
+#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */
+#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical decoding */
/* flags reset at EOXact */
-#define PROC_VACUUM_STATE_MASK (0x0E)
+#define PROC_VACUUM_STATE_MASK \
+ (PROC_IN_VACUUM | PROC_IN_ANALYZE | PROC_VACUUM_FOR_WRAPAROUND)
/*
* We allow a small number of "weak" relation locks (AccesShareLock,
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index d1a58a3661b..d0b4103a09e 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -15,6 +15,7 @@
#define PROCARRAY_H
#include "storage/standby.h"
+#include "utils/relcache.h"
#include "utils/snapshot.h"
@@ -50,8 +51,9 @@ extern RunningTransactions GetRunningTransactionData(void);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern bool TransactionIdIsActive(TransactionId xid);
-extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
+extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
extern TransactionId GetOldestActiveTransactionId(void);
+extern TransactionId GetOldestSafeDecodingTransactionId(void);
extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);
@@ -77,6 +79,10 @@ extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, const TransactionId *xids,
TransactionId latestXid);
-extern void ProcArraySetReplicationSlotXmin(TransactionId xmin);
+extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
+ TransactionId catalog_xmin, bool already_locked);
+
+extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
+ TransactionId *catalog_xmin);
#endif /* PROCARRAY_H */
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 0cae810f49e..d5bb850337d 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -147,4 +147,6 @@ extern void ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs
int nmsgs, bool RelcacheInitFileInval,
Oid dbid, Oid tsid);
+extern void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg);
+
#endif /* SINVAL_H */
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index c1409c863db..6156e0219d0 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -64,4 +64,5 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
+extern void InvalidateSystemCaches(void);
#endif /* INVAL_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index c601770ec99..abe7016d040 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -23,12 +23,14 @@ extern bool FirstSnapshotSet;
extern TransactionId TransactionXmin;
extern TransactionId RecentXmin;
extern TransactionId RecentGlobalXmin;
+extern TransactionId RecentGlobalDataXmin;
extern Snapshot GetTransactionSnapshot(void);
extern Snapshot GetLatestSnapshot(void);
extern void SnapshotSetCommandId(CommandId curcid);
extern Snapshot GetCatalogSnapshot(Oid relid);
+extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid);
extern void InvalidateCatalogSnapshot(void);
extern void PushActiveSnapshot(Snapshot snapshot);
@@ -53,4 +55,13 @@ extern bool XactHasExportedSnapshots(void);
extern void DeleteAllExportedSnapshotFiles(void);
extern bool ThereAreNoPriorRegisteredSnapshots(void);
+extern char *ExportSnapshot(Snapshot snapshot);
+
+/* Support for catalog timetravel for logical decoding */
+struct HTAB;
+extern struct HTAB *HistoricSnapshotGetTupleCids(void);
+extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids);
+extern void TeardownHistoricSnapshot(bool is_error);
+extern bool HistoricSnapshotActive(void);
+
#endif /* SNAPMGR_H */
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index c542238825a..4b256074b0b 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -30,6 +30,22 @@ typedef struct SnapshotData *Snapshot;
typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup,
Snapshot snapshot, Buffer buffer);
+/*
+ * Struct representing all kind of possible snapshots.
+ *
+ * There are several different kinds of snapshots:
+ * * Normal MVCC snapshots
+ * * MVCC snapshots taken during recovery (in Hot-Standby mode)
+ * * Historic MVCC snapshots used during logical decoding
+ * * snapshots passed to HeapTupleSatisfiesDirty()
+ * * snapshots used for SatisfiesAny, Toast, Self where no members are
+ * accessed.
+ *
+ * TODO: It's probably a good idea to split this struct using a NodeTag
+ * similar to how parser and executor nodes are handled, with one type for
+ * each different kind of snapshot to avoid overloading the meaning of
+ * individual fields.
+ */
typedef struct SnapshotData
{
SnapshotSatisfiesFunc satisfies; /* tuple test function */
@@ -46,11 +62,23 @@ typedef struct SnapshotData
*/
TransactionId xmin; /* all XID < xmin are visible to me */
TransactionId xmax; /* all XID >= xmax are invisible to me */
- TransactionId *xip; /* array of xact IDs in progress */
+ /*
+ * For normal MVCC snapshot this contains the all xact IDs that are in
+ * progress, unless the snapshot was taken during recovery in which case
+ * it's empty. For historic MVCC snapshots, the meaning is inverted,
+ * i.e. it contains *committed* transactions between xmin and xmax.
+ */
+ TransactionId *xip;
uint32 xcnt; /* # of xact ids in xip[] */
/* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
int32 subxcnt; /* # of xact ids in subxip[] */
- TransactionId *subxip; /* array of subxact IDs in progress */
+ /*
+ * For non-historic MVCC snapshots, this contains subxact IDs that are in
+ * progress (and other transactions that are in progress if taken during
+ * recovery). For historic snapshot it contains *all* xids assigned to the
+ * replayed transaction, including the toplevel xid.
+ */
+ TransactionId *subxip;
bool suboverflowed; /* has the subxip array overflowed? */
bool takenDuringRecovery; /* recovery-shaped snapshot? */
bool copied; /* false if it's a static snapshot */
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index e34c28a4f78..48abe62983d 100644
--- a/src/include/utils/tqual.h
+++ b/src/include/utils/tqual.h
@@ -22,6 +22,7 @@
extern PGDLLIMPORT SnapshotData SnapshotSelfData;
extern PGDLLIMPORT SnapshotData SnapshotAnyData;
extern PGDLLIMPORT SnapshotData SnapshotToastData;
+extern PGDLLIMPORT SnapshotData CatalogSnapshotData;
#define SnapshotSelf (&SnapshotSelfData)
#define SnapshotAny (&SnapshotAnyData)
@@ -37,7 +38,8 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData;
/* This macro encodes the knowledge of which snapshots are MVCC-safe */
#define IsMVCCSnapshot(snapshot) \
- ((snapshot)->satisfies == HeapTupleSatisfiesMVCC)
+ ((snapshot)->satisfies == HeapTupleSatisfiesMVCC || \
+ (snapshot)->satisfies == HeapTupleSatisfiesHistoricMVCC)
/*
* HeapTupleSatisfiesVisibility
@@ -73,6 +75,8 @@ extern bool HeapTupleSatisfiesToast(HeapTuple htup,
Snapshot snapshot, Buffer buffer);
extern bool HeapTupleSatisfiesDirty(HeapTuple htup,
Snapshot snapshot, Buffer buffer);
+extern bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup,
+ Snapshot snapshot, Buffer buffer);
/* Special "satisfies" routines with different APIs */
extern HTSU_Result HeapTupleSatisfiesUpdate(HeapTuple htup,
@@ -86,4 +90,13 @@ extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
uint16 infomask, TransactionId xid);
extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple);
+/*
+ * To avoid leaking to much knowledge about reorderbuffer implementation
+ * details this is implemented in reorderbuffer.c not tqual.c.
+ */
+extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
+ Snapshot snapshot,
+ HeapTuple htup,
+ Buffer buffer,
+ CommandId *cmin, CommandId *cmax);
#endif /* TQUAL_H */