diff options
Diffstat (limited to 'src/include')
24 files changed, 897 insertions, 19 deletions
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index bfdadc3d5bb..0f802577c70 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -164,8 +164,7 @@ extern void heap_restrpos(HeapScanDesc scan); extern void heap_sync(Relation relation); /* in heap/pruneheap.c */ -extern void heap_page_prune_opt(Relation relation, Buffer buffer, - TransactionId OldestXmin); +extern void heap_page_prune_opt(Relation relation, Buffer buffer); extern int heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin, bool report_stats, TransactionId *latestRemovedXid); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index d4383ab2cbe..194635952cb 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -48,7 +48,7 @@ * the ones above associated with RM_HEAP_ID. XLOG_HEAP_OPMASK applies to * these, too. */ -/* 0x00 is free, was XLOG_HEAP2_FREEZE */ +#define XLOG_HEAP2_REWRITE 0x00 #define XLOG_HEAP2_CLEAN 0x10 #define XLOG_HEAP2_FREEZE_PAGE 0x20 #define XLOG_HEAP2_CLEANUP_INFO 0x30 @@ -332,6 +332,17 @@ typedef struct xl_heap_new_cid xl_heaptid target; } xl_heap_new_cid; +/* logical rewrite xlog record header */ +typedef struct xl_heap_rewrite_mapping +{ + TransactionId mapped_xid; /* xid that might need to see the row */ + Oid mapped_db; /* DbOid or InvalidOid for shared rels */ + Oid mapped_rel; /* Oid of the mapped relation */ + off_t offset; /* How far have we written so far */ + uint32 num_mappings; /* Number of in-memory mappings */ + XLogRecPtr start_lsn; /* Insert LSN at begin of rewrite */ +} xl_heap_rewrite_mapping; + #define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid) extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, @@ -341,6 +352,7 @@ extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); extern void heap2_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap2_desc(StringInfo buf, uint8 xl_info, char *rec); +extern void heap_xlog_logical_rewrite(XLogRecPtr lsn, XLogRecord *r); extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid); diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h index d098a0b1711..07df3b4f2b0 100644 --- a/src/include/access/rewriteheap.h +++ b/src/include/access/rewriteheap.h @@ -14,12 +14,14 @@ #define REWRITE_HEAP_H #include "access/htup.h" +#include "storage/itemptr.h" +#include "storage/relfilenode.h" #include "utils/relcache.h" /* struct definition is private to rewriteheap.c */ typedef struct RewriteStateData *RewriteState; -extern RewriteState begin_heap_rewrite(Relation NewHeap, +extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap, TransactionId OldestXmin, TransactionId FreezeXid, MultiXactId MultiXactCutoff, bool use_wal); extern void end_heap_rewrite(RewriteState state); @@ -27,4 +29,29 @@ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple, HeapTuple newTuple); extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple); +/* + * On-Disk data format for an individual logical rewrite mapping. + */ +typedef struct LogicalRewriteMappingData +{ + RelFileNode old_node; + RelFileNode new_node; + ItemPointerData old_tid; + ItemPointerData new_tid; +} LogicalRewriteMappingData; + +/* --- + * The filename consists out of the following, dash separated, + * components: + * 1) database oid or InvalidOid for shared relations + * 2) the oid of the relation + * 3) xid we are mapping for + * 4) upper 32bit of the LSN at which a rewrite started + * 5) lower 32bit of the LSN at which a rewrite started + * 6) xid of the xact performing the mapping + * --- + */ +#define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x" +void CheckPointLogicalRewriteHeap(void); + #endif /* REWRITE_HEAP_H */ diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 8376dfd669b..a9774e9f593 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -63,6 +63,11 @@ (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ (int32) ((id1) - (id2)) < 0) +/* compare two XIDs already known to be normal; this is a macro for speed */ +#define NormalTransactionIdFollows(id1, id2) \ + (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ + (int32) ((id1) - (id2)) > 0) + /* ---------- * Object ID (OID) zero is InvalidOid. * diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h index 5adf4f28169..296d016c9fc 100644 --- a/src/include/access/tuptoaster.h +++ b/src/include/access/tuptoaster.h @@ -98,9 +98,34 @@ /* Size of an EXTERNAL datum that contains a standard TOAST pointer */ #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external)) -/* Size of an indirect datum that contains an indirect TOAST pointer */ +/* Size of an indirect datum that contains a standard TOAST pointer */ #define INDIRECT_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_indirect)) +/* + * Testing whether an externally-stored value is compressed now requires + * comparing extsize (the actual length of the external data) to rawsize + * (the original uncompressed datum's size). The latter includes VARHDRSZ + * overhead, the former doesn't. We never use compression unless it actually + * saves space, so we expect either equality or less-than. + */ +#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \ + ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ) + +/* + * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum + * into a local "struct varatt_external" toast pointer. This should be + * just a memcpy, but some versions of gcc seem to produce broken code + * that assumes the datum contents are aligned. Introducing an explicit + * intermediate "varattrib_1b_e *" variable seems to fix it. + */ +#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \ +do { \ + varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \ + Assert(VARATT_IS_EXTERNAL(attre)); \ + Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \ + memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \ +} while (0) + /* ---------- * toast_insert_or_update - * diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 11ab2771990..a238292b76e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -288,6 +288,7 @@ extern int XLogFileOpen(XLogSegNo segno); extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); +extern XLogSegNo XLogGetLastRemovedSegno(void); extern void XLogSetAsyncXactLSN(XLogRecPtr record); extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 80560574bf5..22dd0fc58e8 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201403031 +#define CATALOG_VERSION_NO 201403032 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 7a11721ba44..c5706518722 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -4804,8 +4804,18 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,25,26,16,28,3220}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); +DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slotname,plugin,slotname,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); +DESCR("set up a logical replication slot"); +DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); +DESCR("get changes from replication slot"); +DATA(insert OID = 3783 ( pg_logical_slot_get_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_binary_changes _null_ _null_ _null_ )); +DESCR("get binary changes from replication slot"); +DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_changes _null_ _null_ _null_ )); +DESCR("peek at changes from replication slot"); +DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); +DESCR("peek at binary changes from replication slot"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 70350e02cb2..058dc5f6675 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -157,10 +157,10 @@ extern void vac_update_relstats(Relation relation, bool hasindex, TransactionId frozenxid, MultiXactId minmulti); -extern void vacuum_set_xid_limits(int freeze_min_age, int freeze_table_age, +extern void vacuum_set_xid_limits(Relation rel, + int freeze_min_age, int freeze_table_age, int multixact_freeze_min_age, int multixact_freeze_table_age, - bool sharedRel, TransactionId *oldestXmin, TransactionId *freezeLimit, TransactionId *xidFullScanLimit, diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h new file mode 100644 index 00000000000..7f55d789a23 --- /dev/null +++ b/src/include/replication/decode.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * decode.h + * PostgreSQL WAL to logical transformation + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef DECODE_H +#define DECODE_H + +#include "access/xlogreader.h" +#include "replication/reorderbuffer.h" +#include "replication/logical.h" + +void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, + XLogRecord *record); + +#endif diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h new file mode 100644 index 00000000000..e65c8b8075f --- /dev/null +++ b/src/include/replication/logical.h @@ -0,0 +1,100 @@ +/*------------------------------------------------------------------------- + * logical.h + * PostgreSQL logical decoding coordination + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICAL_H +#define LOGICAL_H + +#include "replication/slot.h" + +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "replication/output_plugin.h" + +struct LogicalDecodingContext; + +typedef void (*LogicalOutputPluginWriterWrite) ( + struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid, + bool last_write +); + +typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; + +typedef struct LogicalDecodingContext +{ + /* memory context this is all allocated in */ + MemoryContext context; + + /* infrastructure pieces */ + XLogReaderState *reader; + ReplicationSlot *slot; + struct ReorderBuffer *reorder; + struct SnapBuild *snapshot_builder; + + OutputPluginCallbacks callbacks; + OutputPluginOptions options; + + /* + * User specified options + */ + List *output_plugin_options; + + /* + * User-Provided callback for writing/streaming out data. + */ + LogicalOutputPluginWriterPrepareWrite prepare_write; + LogicalOutputPluginWriterWrite write; + + /* + * Output buffer. + */ + StringInfo out; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; + + /* + * Private data pointer for the data writer. + */ + void *output_writer_private; + + /* + * State for writing output. + */ + bool accept_writes; + bool prepared_write; + XLogRecPtr write_location; + TransactionId write_xid; +} LogicalDecodingContext; + +extern void CheckLogicalDecodingRequirements(void); + +extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, + List *output_plugin_options, + XLogPageReadCB read_page, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write); +extern LogicalDecodingContext *CreateDecodingContext( + XLogRecPtr start_lsn, + List *output_plugin_options, + XLogPageReadCB read_page, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write); +extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); +extern bool DecodingContextReady(LogicalDecodingContext *ctx); +extern void FreeDecodingContext(LogicalDecodingContext *ctx); + +extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); +extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, + XLogRecPtr restart_lsn); +extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); + +#endif diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h new file mode 100644 index 00000000000..21bf44ec4b7 --- /dev/null +++ b/src/include/replication/logicalfuncs.h @@ -0,0 +1,24 @@ +/*------------------------------------------------------------------------- + * logicalfuncs.h + * PostgreSQL WAL to logical transformation support functions + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALFUNCS_H +#define LOGICALFUNCS_H + +#include "replication/logical.h" + +extern int logical_read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, + char *cur_page, TimeLineID *pageTLI); + +extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); + +#endif diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h new file mode 100644 index 00000000000..c47c24c8dbe --- /dev/null +++ b/src/include/replication/output_plugin.h @@ -0,0 +1,98 @@ +/*------------------------------------------------------------------------- + * output_plugin.h + * PostgreSQL Logical Decode Plugin Interface + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef OUTPUT_PLUGIN_H +#define OUTPUT_PLUGIN_H + +#include "replication/reorderbuffer.h" + +struct LogicalDecodingContext; +struct OutputPluginCallbacks; + +typedef enum OutputPluginOutputType +{ + OUTPUT_PLUGIN_BINARY_OUTPUT, + OUTPUT_PLUGIN_TEXTUAL_OUTPUT +} OutputPluginOutputType; + +/* + * Options set by the output plugin, in the startup callback. + */ +typedef struct OutputPluginOptions +{ + OutputPluginOutputType output_type; +} OutputPluginOptions; + +/* + * Type of the shared library symbol _PG_output_plugin_init that is looked up + * when loading an output plugin shared library. + */ +typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb); + +/* + * Callback that gets called in a user-defined plugin. ctx->private_data can + * be set to some private data. + * + * "is_init" will be set to "true" if the decoding slot just got defined. When + * the same slot is used from there one, it will be "false". + */ +typedef void (*LogicalDecodeStartupCB) ( + struct LogicalDecodingContext *ctx, + OutputPluginOptions *options, + bool is_init +); + +/* + * Callback called for every (explicit or implicit) BEGIN of a successful + * transaction. + */ +typedef void (*LogicalDecodeBeginCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn); + +/* + * Callback for every individual change in a successful transaction. + */ +typedef void (*LogicalDecodeChangeCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change +); + +/* + * Called for every (explicit or implicit) COMMIT of a successful transaction. + */ +typedef void (*LogicalDecodeCommitCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called to shutdown an output plugin. + */ +typedef void (*LogicalDecodeShutdownCB) ( + struct LogicalDecodingContext * +); + +/* + * Output plugin callbacks + */ +typedef struct OutputPluginCallbacks +{ + LogicalDecodeStartupCB startup_cb; + LogicalDecodeBeginCB begin_cb; + LogicalDecodeChangeCB change_cb; + LogicalDecodeCommitCB commit_cb; + LogicalDecodeShutdownCB shutdown_cb; +} OutputPluginCallbacks; + +void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); +void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); + +#endif /* OUTPUT_PLUGIN_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h new file mode 100644 index 00000000000..01eabfb7be7 --- /dev/null +++ b/src/include/replication/reorderbuffer.h @@ -0,0 +1,351 @@ +/* + * reorderbuffer.h + * PostgreSQL logical replay/reorder buffer management. + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * src/include/replication/reorderbuffer.h + */ +#ifndef REORDERBUFFER_H +#define REORDERBUFFER_H + +#include "access/htup_details.h" + +#include "lib/ilist.h" + +#include "storage/sinval.h" + +#include "utils/hsearch.h" +#include "utils/rel.h" +#include "utils/snapshot.h" +#include "utils/timestamp.h" + +/* an individual tuple, stored in one chunk of memory */ +typedef struct ReorderBufferTupleBuf +{ + /* position in preallocated list */ + slist_node node; + + /* tuple, stored sequentially */ + HeapTupleData tuple; + HeapTupleHeaderData header; + char data[MaxHeapTupleSize]; +} ReorderBufferTupleBuf; + +/* types of the change passed to a 'change' callback */ +enum ReorderBufferChangeType +{ + REORDER_BUFFER_CHANGE_INSERT, + REORDER_BUFFER_CHANGE_UPDATE, + REORDER_BUFFER_CHANGE_DELETE +}; + +/* + * a single 'change', can be an insert (with one tuple), an update (old, new), + * or a delete (old). + * + * The same struct is also used internally for other purposes but that should + * never be visible outside reorderbuffer.c. + */ +typedef struct ReorderBufferChange +{ + XLogRecPtr lsn; + + /* type of change */ + union + { + enum ReorderBufferChangeType action; + /* do not leak internal enum values to the outside */ + int action_internal; + }; + + /* + * Context data for the change, which part of the union is valid depends + * on action/action_internal. + */ + union + { + /* old, new tuples when action == *_INSERT|UPDATE|DELETE */ + struct + { + /* relation that has been changed */ + RelFileNode relnode; + /* valid for DELETE || UPDATE */ + ReorderBufferTupleBuf *oldtuple; + /* valid for INSERT || UPDATE */ + ReorderBufferTupleBuf *newtuple; + } tp; + + /* new snapshot */ + Snapshot snapshot; + + /* new command id for existing snapshot in a catalog changing tx */ + CommandId command_id; + + /* new cid mapping for catalog changing transaction */ + struct + { + RelFileNode node; + ItemPointerData tid; + CommandId cmin; + CommandId cmax; + CommandId combocid; + } tuplecid; + }; + + /* + * While in use this is how a change is linked into a transactions, + * otherwise it's the preallocated list. + */ + dlist_node node; +} ReorderBufferChange; + +typedef struct ReorderBufferTXN +{ + /* + * The transactions transaction id, can be a toplevel or sub xid. + */ + TransactionId xid; + + /* did the TX have catalog changes */ + bool has_catalog_changes; + + /* + * Do we know this is a subxact? + */ + bool is_known_as_subxact; + + /* + * LSN of the first data carrying, WAL record with knowledge about this + * xid. This is allowed to *not* be first record adorned with this xid, if + * the previous records aren't relevant for logical decoding. + */ + XLogRecPtr first_lsn; + + /* ---- + * LSN of the record that lead to this xact to be committed or + * aborted. This can be a + * * plain commit record + * * plain commit record, of a parent transaction + * * prepared transaction commit + * * plain abort record + * * prepared transaction abort + * * error during decoding + * ---- + */ + XLogRecPtr final_lsn; + + /* + * LSN pointing to the end of the commit record + 1. + */ + XLogRecPtr end_lsn; + + /* + * LSN of the last lsn at which snapshot information reside, so we can + * restart decoding from there and fully recover this transaction from + * WAL. + */ + XLogRecPtr restart_decoding_lsn; + + /* + * Commit time, only known when we read the actual commit record. + */ + TimestampTz commit_time; + + /* + * Base snapshot or NULL. + */ + Snapshot base_snapshot; + XLogRecPtr base_snapshot_lsn; + + /* + * How many ReorderBufferChange's do we have in this txn. + * + * Changes in subtransactions are *not* included but tracked separately. + */ + uint64 nentries; + + /* + * How many of the above entries are stored in memory in contrast to being + * spilled to disk. + */ + uint64 nentries_mem; + + /* + * List of ReorderBufferChange structs, including new Snapshots and new + * CommandIds + */ + dlist_head changes; + + /* + * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples. + * Those are always assigned to the toplevel transaction. (Keep track of + * #entries to create a hash of the right size) + */ + dlist_head tuplecids; + uint64 ntuplecids; + + /* + * On-demand built hash for looking up the above values. + */ + HTAB *tuplecid_hash; + + /* + * Hash containing (potentially partial) toast entries. NULL if no toast + * tuples have been found for the current change. + */ + HTAB *toast_hash; + + /* + * non-hierarchical list of subtransactions that are *not* aborted. Only + * used in toplevel transactions. + */ + dlist_head subtxns; + uint32 nsubtxns; + + /* + * Stored cache invalidations. This is not a linked list because we get + * all the invalidations at once. + */ + uint32 ninvalidations; + SharedInvalidationMessage *invalidations; + + /* --- + * Position in one of three lists: + * * list of subtransactions if we are *known* to be subxact + * * list of toplevel xacts (can be a as-yet unknown subxact) + * * list of preallocated ReorderBufferTXNs + * --- + */ + dlist_node node; + +} ReorderBufferTXN; + +/* so we can define the callbacks used inside struct ReorderBuffer itself */ +typedef struct ReorderBuffer ReorderBuffer; + +/* change callback signature */ +typedef void (*ReorderBufferApplyChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* begin callback signature */ +typedef void (*ReorderBufferBeginCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* commit callback signature */ +typedef void (*ReorderBufferCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +struct ReorderBuffer +{ + /* + * xid => ReorderBufferTXN lookup table + */ + HTAB *by_txn; + + /* + * Transactions that could be a toplevel xact, ordered by LSN of the first + * record bearing that xid.. + */ + dlist_head toplevel_by_lsn; + + /* + * one-entry sized cache for by_txn. Very frequently the same txn gets + * looked up over and over again. + */ + TransactionId by_txn_last_xid; + ReorderBufferTXN *by_txn_last_txn; + + /* + * Callacks to be called when a transactions commits. + */ + ReorderBufferBeginCB begin; + ReorderBufferApplyChangeCB apply_change; + ReorderBufferCommitCB commit; + + /* + * Pointer that will be passed untouched to the callbacks. + */ + void *private_data; + + /* + * Private memory context. + */ + MemoryContext context; + + /* + * Data structure slab cache. + * + * We allocate/deallocate some structures very frequently, to avoid bigger + * overhead we cache some unused ones here. + * + * The maximum number of cached entries is controlled by const variables + * ontop of reorderbuffer.c + */ + + /* cached ReorderBufferTXNs */ + dlist_head cached_transactions; + Size nr_cached_transactions; + + /* cached ReorderBufferChanges */ + dlist_head cached_changes; + Size nr_cached_changes; + + /* cached ReorderBufferTupleBufs */ + slist_head cached_tuplebufs; + Size nr_cached_tuplebufs; + + XLogRecPtr current_restart_decoding_lsn; + + /* buffer for disk<->memory conversions */ + char *outbuf; + Size outbufsize; +}; + + +ReorderBuffer *ReorderBufferAllocate(void); +void ReorderBufferFree(ReorderBuffer *); + +ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); +void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); +ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); +void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); + +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferCommit(ReorderBuffer *, TransactionId, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time); +void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); +void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn); +void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); +void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); + +void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); +void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); +void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + CommandId cid); +void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + RelFileNode node, ItemPointerData pt, + CommandId cmin, CommandId cmax, CommandId combocid); +void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs); +bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid); +void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); +bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); +bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); + +ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); + +void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); + +void StartupReorderBuffer(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 089b0f4b70c..c354c9133bf 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -16,6 +16,24 @@ #include "storage/shmem.h" #include "storage/spin.h" +/* + * Behaviour of replication slots, upon release or crash. + * + * Slots marked as PERSISTENT are crashsafe and will not be dropped when + * released. Slots marked as EPHEMERAL will be dropped when released or after + * restarts. + * + * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist(). + */ +typedef enum ReplicationSlotPersistency +{ + RS_PERSISTENT, + RS_EPHEMERAL +} ReplicationSlotPersistency; + +/* + * On-Disk data of a replication slot, preserved across restarts. + */ typedef struct ReplicationSlotPersistentData { /* The slot's identifier */ @@ -25,6 +43,11 @@ typedef struct ReplicationSlotPersistentData Oid database; /* + * The slot's behaviour when being dropped (or restored after a crash). + */ + ReplicationSlotPersistency persistency; + + /* * xmin horizon for data * * NB: This may represent a value that hasn't been written to disk yet; @@ -32,9 +55,22 @@ typedef struct ReplicationSlotPersistentData */ TransactionId xmin; + /* + * xmin horizon for catalog tuples + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId catalog_xmin; + /* oldest LSN that might be required by this replication slot */ XLogRecPtr restart_lsn; + /* oldest LSN that the client has acked receipt for */ + XLogRecPtr confirmed_flush; + + /* plugin name */ + NameData plugin; } ReplicationSlotPersistentData; /* @@ -67,12 +103,26 @@ typedef struct ReplicationSlot * same as the persistent value (data.xmin). */ TransactionId effective_xmin; + TransactionId effective_catalog_xmin; /* data surviving shutdowns and crashes */ ReplicationSlotPersistentData data; /* is somebody performing io on this slot? */ LWLock *io_in_progress_lock; + + /* all the remaining data is only used for logical slots */ + + /* ---- + * When the client has confirmed flushes >= candidate_xmin_lsn we can + * advance the catalog xmin, when restart_valid has been passed, + * restart_lsn can be increased. + * ---- + */ + TransactionId candidate_catalog_xmin; + XLogRecPtr candidate_xmin_lsn; + XLogRecPtr candidate_restart_valid; + XLogRecPtr candidate_restart_lsn; } ReplicationSlot; /* @@ -97,8 +147,11 @@ extern Size ReplicationSlotsShmemSize(void); extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ -extern void ReplicationSlotCreate(const char *name, bool db_specific); +extern void ReplicationSlotCreate(const char *name, bool db_specific, + ReplicationSlotPersistency p); +extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name); + extern void ReplicationSlotAcquire(const char *name); extern void ReplicationSlotRelease(void); extern void ReplicationSlotSave(void); @@ -106,15 +159,20 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); -extern void ReplicationSlotsComputeRequiredXmin(void); +extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); +extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); +extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); + extern void StartupReplicationSlots(XLogRecPtr checkPointRedo); extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); -extern void ReplicationSlotAtProcExit(void); /* SQL callable functions */ +extern Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS); +extern Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS); +extern Datum pg_drop_replication_slot(PG_FUNCTION_ARGS); extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS); #endif /* SLOT_H */ diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h new file mode 100644 index 00000000000..087c0e510d5 --- /dev/null +++ b/src/include/replication/snapbuild.h @@ -0,0 +1,83 @@ +/*------------------------------------------------------------------------- + * + * snapbuild.h + * Exports from replication/logical/snapbuild.c. + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * src/include/replication/snapbuild.h + * + *------------------------------------------------------------------------- + */ +#ifndef SNAPBUILD_H +#define SNAPBUILD_H + +#include "access/xlogdefs.h" +#include "utils/snapmgr.h" + +typedef enum +{ + /* + * Initial state, we can't do much yet. + */ + SNAPBUILD_START, + + /* + * We have collected enough information to decode tuples in transactions + * that started after this. + * + * Once we reached this we start to collect changes. We cannot apply them + * yet because the might be based on transactions that were still running + * when we reached them yet. + */ + SNAPBUILD_FULL_SNAPSHOT, + + /* + * Found a point after hitting built_full_snapshot where all transactions + * that were running at that point finished. Till we reach that we hold + * off calling any commit callbacks. + */ + SNAPBUILD_CONSISTENT +} SnapBuildState; + +/* forward declare so we don't have to expose the struct to the public */ +struct SnapBuild; +typedef struct SnapBuild SnapBuild; + +/* forward declare so we don't have to include reorderbuffer.h */ +struct ReorderBuffer; + +/* forward declare so we don't have to include heapam_xlog.h */ +struct xl_heap_new_cid; +struct xl_running_xacts; + +extern void CheckPointSnapBuild(void); + +extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, + TransactionId xmin_horizon, XLogRecPtr start_lsn); +extern void FreeSnapshotBuilder(SnapBuild *cache); + +extern void SnapBuildSnapDecRefcount(Snapshot snap); + +extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); +extern void SnapBuildClearExportedSnapshot(void); + +extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate); + +extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); + +extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, + TransactionId xid, int nsubxacts, + TransactionId *subxacts); +extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn, + TransactionId xid, int nsubxacts, + TransactionId *subxacts); +extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn); +extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn, struct xl_heap_new_cid *cid); +extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, + struct xl_running_xacts *running); +extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); + +#endif /* SNAPBUILD_H */ diff --git a/src/include/storage/itemptr.h b/src/include/storage/itemptr.h index 67bbdbb988a..0b81d53f5f8 100644 --- a/src/include/storage/itemptr.h +++ b/src/include/storage/itemptr.h @@ -116,6 +116,9 @@ typedef ItemPointerData *ItemPointer; /* * ItemPointerCopy * Copies the contents of one disk item pointer to another. + * + * Should there ever be padding in an ItemPointer this would need to be handled + * differently as it's used as hash key. */ #define ItemPointerCopy(fromPointer, toPointer) \ ( \ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index a3cadd9a017..5218b448cd6 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -41,10 +41,12 @@ struct XidCache #define PROC_IS_AUTOVACUUM 0x01 /* is it an autovac worker? */ #define PROC_IN_VACUUM 0x02 /* currently running lazy vacuum */ #define PROC_IN_ANALYZE 0x04 /* currently running analyze */ -#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ +#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ +#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical decoding */ /* flags reset at EOXact */ -#define PROC_VACUUM_STATE_MASK (0x0E) +#define PROC_VACUUM_STATE_MASK \ + (PROC_IN_VACUUM | PROC_IN_ANALYZE | PROC_VACUUM_FOR_WRAPAROUND) /* * We allow a small number of "weak" relation locks (AccesShareLock, diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index d1a58a3661b..d0b4103a09e 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -15,6 +15,7 @@ #define PROCARRAY_H #include "storage/standby.h" +#include "utils/relcache.h" #include "utils/snapshot.h" @@ -50,8 +51,9 @@ extern RunningTransactions GetRunningTransactionData(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); -extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum); +extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum); extern TransactionId GetOldestActiveTransactionId(void); +extern TransactionId GetOldestSafeDecodingTransactionId(void); extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids); extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids); @@ -77,6 +79,10 @@ extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); -extern void ProcArraySetReplicationSlotXmin(TransactionId xmin); +extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, + TransactionId catalog_xmin, bool already_locked); + +extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, + TransactionId *catalog_xmin); #endif /* PROCARRAY_H */ diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 0cae810f49e..d5bb850337d 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -147,4 +147,6 @@ extern void ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs int nmsgs, bool RelcacheInitFileInval, Oid dbid, Oid tsid); +extern void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg); + #endif /* SINVAL_H */ diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index c1409c863db..6156e0219d0 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -64,4 +64,5 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); +extern void InvalidateSystemCaches(void); #endif /* INVAL_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index c601770ec99..abe7016d040 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -23,12 +23,14 @@ extern bool FirstSnapshotSet; extern TransactionId TransactionXmin; extern TransactionId RecentXmin; extern TransactionId RecentGlobalXmin; +extern TransactionId RecentGlobalDataXmin; extern Snapshot GetTransactionSnapshot(void); extern Snapshot GetLatestSnapshot(void); extern void SnapshotSetCommandId(CommandId curcid); extern Snapshot GetCatalogSnapshot(Oid relid); +extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid); extern void InvalidateCatalogSnapshot(void); extern void PushActiveSnapshot(Snapshot snapshot); @@ -53,4 +55,13 @@ extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); extern bool ThereAreNoPriorRegisteredSnapshots(void); +extern char *ExportSnapshot(Snapshot snapshot); + +/* Support for catalog timetravel for logical decoding */ +struct HTAB; +extern struct HTAB *HistoricSnapshotGetTupleCids(void); +extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids); +extern void TeardownHistoricSnapshot(bool is_error); +extern bool HistoricSnapshotActive(void); + #endif /* SNAPMGR_H */ diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index c542238825a..4b256074b0b 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -30,6 +30,22 @@ typedef struct SnapshotData *Snapshot; typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup, Snapshot snapshot, Buffer buffer); +/* + * Struct representing all kind of possible snapshots. + * + * There are several different kinds of snapshots: + * * Normal MVCC snapshots + * * MVCC snapshots taken during recovery (in Hot-Standby mode) + * * Historic MVCC snapshots used during logical decoding + * * snapshots passed to HeapTupleSatisfiesDirty() + * * snapshots used for SatisfiesAny, Toast, Self where no members are + * accessed. + * + * TODO: It's probably a good idea to split this struct using a NodeTag + * similar to how parser and executor nodes are handled, with one type for + * each different kind of snapshot to avoid overloading the meaning of + * individual fields. + */ typedef struct SnapshotData { SnapshotSatisfiesFunc satisfies; /* tuple test function */ @@ -46,11 +62,23 @@ typedef struct SnapshotData */ TransactionId xmin; /* all XID < xmin are visible to me */ TransactionId xmax; /* all XID >= xmax are invisible to me */ - TransactionId *xip; /* array of xact IDs in progress */ + /* + * For normal MVCC snapshot this contains the all xact IDs that are in + * progress, unless the snapshot was taken during recovery in which case + * it's empty. For historic MVCC snapshots, the meaning is inverted, + * i.e. it contains *committed* transactions between xmin and xmax. + */ + TransactionId *xip; uint32 xcnt; /* # of xact ids in xip[] */ /* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */ int32 subxcnt; /* # of xact ids in subxip[] */ - TransactionId *subxip; /* array of subxact IDs in progress */ + /* + * For non-historic MVCC snapshots, this contains subxact IDs that are in + * progress (and other transactions that are in progress if taken during + * recovery). For historic snapshot it contains *all* xids assigned to the + * replayed transaction, including the toplevel xid. + */ + TransactionId *subxip; bool suboverflowed; /* has the subxip array overflowed? */ bool takenDuringRecovery; /* recovery-shaped snapshot? */ bool copied; /* false if it's a static snapshot */ diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h index e34c28a4f78..48abe62983d 100644 --- a/src/include/utils/tqual.h +++ b/src/include/utils/tqual.h @@ -22,6 +22,7 @@ extern PGDLLIMPORT SnapshotData SnapshotSelfData; extern PGDLLIMPORT SnapshotData SnapshotAnyData; extern PGDLLIMPORT SnapshotData SnapshotToastData; +extern PGDLLIMPORT SnapshotData CatalogSnapshotData; #define SnapshotSelf (&SnapshotSelfData) #define SnapshotAny (&SnapshotAnyData) @@ -37,7 +38,8 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData; /* This macro encodes the knowledge of which snapshots are MVCC-safe */ #define IsMVCCSnapshot(snapshot) \ - ((snapshot)->satisfies == HeapTupleSatisfiesMVCC) + ((snapshot)->satisfies == HeapTupleSatisfiesMVCC || \ + (snapshot)->satisfies == HeapTupleSatisfiesHistoricMVCC) /* * HeapTupleSatisfiesVisibility @@ -73,6 +75,8 @@ extern bool HeapTupleSatisfiesToast(HeapTuple htup, Snapshot snapshot, Buffer buffer); extern bool HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot, Buffer buffer); +extern bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, + Snapshot snapshot, Buffer buffer); /* Special "satisfies" routines with different APIs */ extern HTSU_Result HeapTupleSatisfiesUpdate(HeapTuple htup, @@ -86,4 +90,13 @@ extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid); extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple); +/* + * To avoid leaking to much knowledge about reorderbuffer implementation + * details this is implemented in reorderbuffer.c not tqual.c. + */ +extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data, + Snapshot snapshot, + HeapTuple htup, + Buffer buffer, + CommandId *cmin, CommandId *cmax); #endif /* TQUAL_H */ |