diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2005-06-17 22:32:51 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2005-06-17 22:32:51 +0000 |
commit | d0a89683a3a4dd8e76ef0a99101355999e519df5 (patch) | |
tree | b19aaf7f03cbcc851b00ca6e472cc7d3e5a20ca1 /src/backend/access | |
parent | 5495575903e35ceb40d32055ab55e9377460208f (diff) | |
download | postgresql-d0a89683a3a4dd8e76ef0a99101355999e519df5.tar.gz postgresql-d0a89683a3a4dd8e76ef0a99101355999e519df5.zip |
Two-phase commit. Original patch by Heikki Linnakangas, with additional
hacking by Alvaro Herrera and Tom Lane.
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/Makefile | 4 | ||||
-rw-r--r-- | src/backend/access/transam/subtrans.c | 19 | ||||
-rw-r--r-- | src/backend/access/transam/transam.c | 25 | ||||
-rw-r--r-- | src/backend/access/transam/twophase.c | 1659 | ||||
-rw-r--r-- | src/backend/access/transam/twophase_rmgr.c | 49 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 640 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 92 |
7 files changed, 2273 insertions, 215 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 295ecf9e14f..c6ecef17246 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -4,7 +4,7 @@ # Makefile for access/transam # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/access/transam/Makefile,v 1.20 2005/04/28 21:47:10 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/access/transam/Makefile,v 1.21 2005/06/17 22:32:42 tgl Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o +OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o all: SUBSYS.o diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 0b774363888..cea778d6a1c 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -22,7 +22,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/subtrans.c,v 1.8 2005/05/19 21:35:45 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/subtrans.c,v 1.9 2005/06/17 22:32:42 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -222,22 +222,33 @@ ZeroSUBTRANSPage(int pageno) /* * This must be called ONCE during postmaster or standalone-backend startup, * after StartupXLOG has initialized ShmemVariableCache->nextXid. + * + * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid + * if there are none. */ void -StartupSUBTRANS(void) +StartupSUBTRANS(TransactionId oldestActiveXID) { int startPage; + int endPage; /* * Since we don't expect pg_subtrans to be valid across crashes, we - * initialize the currently-active page to zeroes during startup. + * initialize the currently-active page(s) to zeroes during startup. * Whenever we advance into a new page, ExtendSUBTRANS will likewise * zero the new page without regard to whatever was previously on * disk. */ LWLockAcquire(SubtransControlLock, LW_EXCLUSIVE); - startPage = TransactionIdToPage(ShmemVariableCache->nextXid); + startPage = TransactionIdToPage(oldestActiveXID); + endPage = TransactionIdToPage(ShmemVariableCache->nextXid); + + while (startPage != endPage) + { + (void) ZeroSUBTRANSPage(startPage); + startPage++; + } (void) ZeroSUBTRANSPage(startPage); LWLockRelease(SubtransControlLock); diff --git a/src/backend/access/transam/transam.c b/src/backend/access/transam/transam.c index f88c25a37db..5fa6f82daf4 100644 --- a/src/backend/access/transam/transam.c +++ b/src/backend/access/transam/transam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/transam.c,v 1.64 2005/02/20 21:46:48 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/transam.c,v 1.65 2005/06/17 22:32:42 tgl Exp $ * * NOTES * This file contains the high level access-method interface to the @@ -173,6 +173,14 @@ TransactionIdDidCommit(TransactionId transactionId) * recursively. However, if it's older than TransactionXmin, we can't * look at pg_subtrans; instead assume that the parent crashed without * cleaning up its children. + * + * Originally we Assert'ed that the result of SubTransGetParent was + * not zero. However with the introduction of prepared transactions, + * there can be a window just after database startup where we do not + * have complete knowledge in pg_subtrans of the transactions after + * TransactionXmin. StartupSUBTRANS() has ensured that any missing + * information will be zeroed. Since this case should not happen under + * normal conditions, it seems reasonable to emit a WARNING for it. */ if (xidstatus == TRANSACTION_STATUS_SUB_COMMITTED) { @@ -181,7 +189,12 @@ TransactionIdDidCommit(TransactionId transactionId) if (TransactionIdPrecedes(transactionId, TransactionXmin)) return false; parentXid = SubTransGetParent(transactionId); - Assert(TransactionIdIsValid(parentXid)); + if (!TransactionIdIsValid(parentXid)) + { + elog(WARNING, "no pg_subtrans entry for subcommitted XID %u", + transactionId); + return false; + } return TransactionIdDidCommit(parentXid); } @@ -224,7 +237,13 @@ TransactionIdDidAbort(TransactionId transactionId) if (TransactionIdPrecedes(transactionId, TransactionXmin)) return true; parentXid = SubTransGetParent(transactionId); - Assert(TransactionIdIsValid(parentXid)); + if (!TransactionIdIsValid(parentXid)) + { + /* see notes in TransactionIdDidCommit */ + elog(WARNING, "no pg_subtrans entry for subcommitted XID %u", + transactionId); + return true; + } return TransactionIdDidAbort(parentXid); } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c new file mode 100644 index 00000000000..01cc50a6a46 --- /dev/null +++ b/src/backend/access/transam/twophase.c @@ -0,0 +1,1659 @@ +/*------------------------------------------------------------------------- + * + * twophase.c + * Two-phase commit support functions. + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/transam/twophase.c,v 1.1 2005/06/17 22:32:42 tgl Exp $ + * + * NOTES + * Each global transaction is associated with a global transaction + * identifier (GID). The client assigns a GID to a postgres + * transaction with the PREPARE TRANSACTION command. + * + * We keep all active global transactions in a shared memory array. + * When the PREPARE TRANSACTION command is issued, the GID is + * reserved for the transaction in the array. This is done before + * a WAL entry is made, because the reservation checks for duplicate + * GIDs and aborts the transaction if there already is a global + * transaction in prepared state with the same GID. + * + * A global transaction (gxact) also has a dummy PGPROC that is entered + * into the ProcArray array; this is what keeps the XID considered + * running by TransactionIdIsInProgress. It is also convenient as a + * PGPROC to hook the gxact's locks to. + * + * In order to survive crashes and shutdowns, all prepared + * transactions must be stored in permanent storage. This includes + * locking information, pending notifications etc. All that state + * information is written to the per-transaction state file in + * the pg_twophase directory. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <unistd.h> + +#include "access/heapam.h" +#include "access/subtrans.h" +#include "access/twophase.h" +#include "access/twophase_rmgr.h" +#include "access/xact.h" +#include "catalog/pg_type.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/smgr.h" +#include "utils/builtins.h" +#include "pgstat.h" + + +/* + * Directory where Two-phase commit files reside within PGDATA + */ +#define TWOPHASE_DIR "pg_twophase" + +/* GUC variable, can't be changed after startup */ +int max_prepared_xacts = 50; + +/* + * This struct describes one global transaction that is in prepared state + * or attempting to become prepared. + * + * The first component of the struct is a dummy PGPROC that is inserted + * into the global ProcArray so that the transaction appears to still be + * running and holding locks. It must be first because we cast pointers + * to PGPROC and pointers to GlobalTransactionData back and forth. + * + * The lifecycle of a global transaction is: + * + * 1. After checking that the requested GID is not in use, set up an + * entry in the TwoPhaseState->prepXacts array with the correct XID and GID, + * with locking_xid = my own XID and valid = false. + * + * 2. After successfully completing prepare, set valid = true and enter the + * contained PGPROC into the global ProcArray. + * + * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry + * is valid and its locking_xid is no longer active, then store my current + * XID into locking_xid. This prevents concurrent attempts to commit or + * rollback the same prepared xact. + * + * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry + * from the ProcArray and the TwoPhaseState->prepXacts array and return it to + * the freelist. + * + * Note that if the preparing transaction fails between steps 1 and 2, the + * entry will remain in prepXacts until recycled. We can detect recyclable + * entries by checking for valid = false and locking_xid no longer active. + * + * typedef struct GlobalTransactionData *GlobalTransaction appears in + * twophase.h + */ +#define GIDSIZE 200 + +typedef struct GlobalTransactionData +{ + PGPROC proc; /* dummy proc */ + AclId owner; /* ID of user that executed the xact */ + TransactionId locking_xid; /* top-level XID of backend working on xact */ + bool valid; /* TRUE if fully prepared */ + char gid[GIDSIZE]; /* The GID assigned to the prepared xact */ +} GlobalTransactionData; + +/* + * Two Phase Commit shared state. Access to this struct is protected + * by TwoPhaseStateLock. + */ +typedef struct TwoPhaseStateData +{ + /* Head of linked list of free GlobalTransactionData structs */ + SHMEM_OFFSET freeGXacts; + + /* Number of valid prepXacts entries. */ + int numPrepXacts; + + /* + * There are max_prepared_xacts items in this array, but C wants a + * fixed-size array. + */ + GlobalTransaction prepXacts[1]; /* VARIABLE LENGTH ARRAY */ +} TwoPhaseStateData; /* VARIABLE LENGTH STRUCT */ + +static TwoPhaseStateData *TwoPhaseState; + + +static void RecordTransactionCommitPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels); +static void RecordTransactionAbortPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels); +static void ProcessRecords(char *bufptr, TransactionId xid, + const TwoPhaseCallback callbacks[]); + + +/* + * Initialization of shared memory + */ +int +TwoPhaseShmemSize(void) +{ + /* Need the fixed struct, the array of pointers, and the GTD structs */ + return MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + + sizeof(GlobalTransaction) * max_prepared_xacts) + + sizeof(GlobalTransactionData) * max_prepared_xacts; +} + +void +TwoPhaseShmemInit(void) +{ + bool found; + + TwoPhaseState = ShmemInitStruct("Prepared Transaction Table", + TwoPhaseShmemSize(), + &found); + if (!IsUnderPostmaster) + { + GlobalTransaction gxacts; + int i; + + Assert(!found); + TwoPhaseState->freeGXacts = INVALID_OFFSET; + TwoPhaseState->numPrepXacts = 0; + + /* + * Initialize the linked list of free GlobalTransactionData structs + */ + gxacts = (GlobalTransaction) + ((char *) TwoPhaseState + + MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) + + sizeof(GlobalTransaction) * max_prepared_xacts)); + for (i = 0; i < max_prepared_xacts; i++) + { + gxacts[i].proc.links.next = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = MAKE_OFFSET(&gxacts[i]); + } + } + else + Assert(found); +} + + +/* + * MarkAsPreparing + * Reserve the GID for the given transaction. + * + * Internally, this creates a gxact struct and puts it into the active array. + * NOTE: this is also used when reloading a gxact after a crash; so avoid + * assuming that we can use very much backend context. + */ +GlobalTransaction +MarkAsPreparing(TransactionId xid, Oid databaseid, char *gid, AclId owner) +{ + GlobalTransaction gxact; + int i; + + if (strlen(gid) >= GIDSIZE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("global transaction identifier \"%s\" is too long", + gid))); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + /* + * First, find and recycle any gxacts that failed during prepare. + * We do this partly to ensure we don't mistakenly say their GIDs + * are still reserved, and partly so we don't fail on out-of-slots + * unnecessarily. + */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + if (!gxact->valid && !TransactionIdIsActive(gxact->locking_xid)) + { + /* It's dead Jim ... remove from the active array */ + TwoPhaseState->numPrepXacts--; + TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; + /* and put it back in the freelist */ + gxact->proc.links.next = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact); + /* Back up index count too, so we don't miss scanning one */ + i--; + } + } + + /* Check for conflicting GID */ + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + gxact = TwoPhaseState->prepXacts[i]; + if (strcmp(gxact->gid, gid) == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("global transaction identifier \"%s\" is already in use", + gid))); + } + } + + /* Get a free gxact from the freelist */ + if (TwoPhaseState->freeGXacts == INVALID_OFFSET) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of prepared transactions reached"), + errhint("Increase max_prepared_transactions (currently %d).", + max_prepared_xacts))); + gxact = (GlobalTransaction) MAKE_PTR(TwoPhaseState->freeGXacts); + TwoPhaseState->freeGXacts = gxact->proc.links.next; + + /* Initialize it */ + MemSet(&gxact->proc, 0, sizeof(PGPROC)); + SHMQueueElemInit(&(gxact->proc.links)); + gxact->proc.waitStatus = STATUS_OK; + gxact->proc.xid = xid; + gxact->proc.xmin = InvalidTransactionId; + gxact->proc.pid = 0; + gxact->proc.databaseId = databaseid; + gxact->proc.lwWaiting = false; + gxact->proc.lwExclusive = false; + gxact->proc.lwWaitLink = NULL; + gxact->proc.waitLock = NULL; + gxact->proc.waitProcLock = NULL; + SHMQueueInit(&(gxact->proc.procLocks)); + /* subxid data must be filled later by GXactLoadSubxactData */ + gxact->proc.subxids.overflowed = false; + gxact->proc.subxids.nxids = 0; + + gxact->owner = owner; + gxact->locking_xid = xid; + gxact->valid = false; + strcpy(gxact->gid, gid); + + /* And insert it into the active array */ + Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); + TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + + LWLockRelease(TwoPhaseStateLock); + + return gxact; +} + +/* + * GXactLoadSubxactData + * + * If the transaction being persisted had any subtransactions, this must + * be called before MarkAsPrepared() to load information into the dummy + * PGPROC. + */ +static void +GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, + TransactionId *children) +{ + /* We need no extra lock since the GXACT isn't valid yet */ + if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS) + { + gxact->proc.subxids.overflowed = true; + nsubxacts = PGPROC_MAX_CACHED_SUBXIDS; + } + if (nsubxacts > 0) + { + memcpy(gxact->proc.subxids.xids, children, + nsubxacts * sizeof(TransactionId)); + gxact->proc.subxids.nxids = nsubxacts; + } +} + +/* + * MarkAsPrepared + * Mark the GXACT as fully valid, and enter it into the global ProcArray. + */ +void +MarkAsPrepared(GlobalTransaction gxact) +{ + /* Lock here may be overkill, but I'm not convinced of that ... */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(!gxact->valid); + gxact->valid = true; + LWLockRelease(TwoPhaseStateLock); + + /* + * Put it into the global ProcArray so TransactionIdInProgress considers + * the XID as still running. + */ + ProcArrayAdd(&gxact->proc); +} + +/* + * LockGXact + * Locate the prepared transaction and mark it busy for COMMIT or PREPARE. + */ +static GlobalTransaction +LockGXact(char *gid, AclId user) +{ + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs */ + if (!gxact->valid) + continue; + if (strcmp(gxact->gid, gid) != 0) + continue; + + /* Found it, but has someone else got it locked? */ + if (TransactionIdIsValid(gxact->locking_xid)) + { + if (TransactionIdIsActive(gxact->locking_xid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepared transaction with gid \"%s\" is busy", + gid))); + gxact->locking_xid = InvalidTransactionId; + } + + if (user != gxact->owner && !superuser_arg(user)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to finish prepared transaction"), + errhint("Must be superuser or the user that prepared the transaction."))); + + /* OK for me to lock it */ + gxact->locking_xid = GetTopTransactionId(); + + LWLockRelease(TwoPhaseStateLock); + + return gxact; + } + + LWLockRelease(TwoPhaseStateLock); + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("prepared transaction with gid \"%s\" does not exist", + gid))); + + /* NOTREACHED */ + return NULL; +} + +/* + * RemoveGXact + * Remove the prepared transaction from the shared memory array. + * + * NB: caller should have already removed it from ProcArray + */ +static void +RemoveGXact(GlobalTransaction gxact) +{ + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + if (gxact == TwoPhaseState->prepXacts[i]) + { + /* remove from the active array */ + TwoPhaseState->numPrepXacts--; + TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts]; + + /* and put it back in the freelist */ + gxact->proc.links.next = TwoPhaseState->freeGXacts; + TwoPhaseState->freeGXacts = MAKE_OFFSET(gxact); + + LWLockRelease(TwoPhaseStateLock); + + return; + } + } + + LWLockRelease(TwoPhaseStateLock); + + elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); +} + +/* + * Returns an array of all prepared transactions for the user-level + * function pg_prepared_xact. + * + * The returned array and all its elements are copies of internal data + * structures, to minimize the time we need to hold the TwoPhaseStateLock. + * + * WARNING -- we return even those transactions that are not fully prepared + * yet. The caller should filter them out if he doesn't want them. + * + * The returned array is palloc'd. + */ +static int +GetPreparedTransactionList(GlobalTransaction *gxacts) +{ + GlobalTransaction array; + int num; + int i; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + if (TwoPhaseState->numPrepXacts == 0) + { + LWLockRelease(TwoPhaseStateLock); + + *gxacts = NULL; + return 0; + } + + num = TwoPhaseState->numPrepXacts; + array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num); + *gxacts = array; + for (i = 0; i < num; i++) + memcpy(array + i, TwoPhaseState->prepXacts[i], + sizeof(GlobalTransactionData)); + + LWLockRelease(TwoPhaseStateLock); + + return num; +} + + +/* Working status for pg_prepared_xact */ +typedef struct +{ + GlobalTransaction array; + int ngxacts; + int currIdx; +} Working_State; + +/* + * pg_prepared_xact + * Produce a view with one row per prepared transaction. + * + * This function is here so we don't have to export the + * GlobalTransactionData struct definition. + */ +Datum +pg_prepared_xact(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + Working_State *status; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * Switch to memory context appropriate for multiple function + * calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + /* this had better match pg_prepared_xacts view in system_views.sql */ + tupdesc = CreateTemplateTupleDesc(4, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "ownerid", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "dbid", + OIDOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect all the 2PC status information that we will format and + * send out as a result set. + */ + status = (Working_State *) palloc(sizeof(Working_State)); + funcctx->user_fctx = (void *) status; + + status->ngxacts = GetPreparedTransactionList(&status->array); + status->currIdx = 0; + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + status = (Working_State *) funcctx->user_fctx; + + while (status->array != NULL && status->currIdx < status->ngxacts) + { + GlobalTransaction gxact = &status->array[status->currIdx++]; + Datum values[4]; + bool nulls[4]; + HeapTuple tuple; + Datum result; + + if (!gxact->valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = TransactionIdGetDatum(gxact->proc.xid); + values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact->gid)); + values[2] = Int32GetDatum(gxact->owner); + values[3] = ObjectIdGetDatum(gxact->proc.databaseId); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * TwoPhaseGetDummyProc + * Get the PGPROC that represents a prepared transaction specified by XID + */ +PGPROC * +TwoPhaseGetDummyProc(TransactionId xid) +{ + PGPROC *result = NULL; + int i; + + static TransactionId cached_xid = InvalidTransactionId; + static PGPROC *cached_proc = NULL; + + /* + * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called + * repeatedly for the same XID. We can save work with a simple cache. + */ + if (xid == cached_xid) + return cached_proc; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + if (gxact->proc.xid == xid) + { + result = &gxact->proc; + break; + } + } + + LWLockRelease(TwoPhaseStateLock); + + if (result == NULL) /* should not happen */ + elog(ERROR, "failed to find dummy PGPROC for xid %u", xid); + + cached_xid = xid; + cached_proc = result; + + return result; +} + +/************************************************************************/ +/* State file support */ +/************************************************************************/ + +#define TwoPhaseFilePath(path, xid) \ + snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid) + +/* + * 2PC state file format: + * + * 1. TwoPhaseFileHeader + * 2. TransactionId[] (subtransactions) + * 3. RelFileNode[] (files to be deleted at commit) + * 4. RelFileNode[] (files to be deleted at abort) + * 5. TwoPhaseRecordOnDisk + * 6. ... + * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID) + * 8. CRC32 + * + * Each segment except the final CRC32 is MAXALIGN'd. + */ + +/* + * Header for a 2PC state file + */ +#define TWOPHASE_MAGIC 0x57F94530 /* format identifier */ + +typedef struct TwoPhaseFileHeader +{ + uint32 magic; /* format identifier */ + uint32 total_len; /* actual file length */ + TransactionId xid; /* original transaction XID */ + Oid database; /* OID of database it was in */ + AclId owner; /* user running the transaction */ + int32 nsubxacts; /* number of following subxact XIDs */ + int32 ncommitrels; /* number of delete-on-commit rels */ + int32 nabortrels; /* number of delete-on-abort rels */ + char gid[GIDSIZE]; /* GID for transaction */ +} TwoPhaseFileHeader; + +/* + * Header for each record in a state file + * + * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header. + * The rmgr data will be stored starting on a MAXALIGN boundary. + */ +typedef struct TwoPhaseRecordOnDisk +{ + uint32 len; /* length of rmgr data */ + TwoPhaseRmgrId rmid; /* resource manager for this record */ + uint16 info; /* flag bits for use by rmgr */ +} TwoPhaseRecordOnDisk; + +/* + * During prepare, the state file is assembled in memory before writing it + * to WAL and the actual state file. We use a chain of XLogRecData blocks + * so that we will be able to pass the state file contents directly to + * XLogInsert. + */ +static struct xllist +{ + XLogRecData *head; /* first data block in the chain */ + XLogRecData *tail; /* last block in chain */ + uint32 bytes_free; /* free bytes left in tail block */ + uint32 total_len; /* total data bytes in chain */ +} records; + + +/* + * Append a block of data to records data structure. + * + * NB: each block is padded to a MAXALIGN multiple. This must be + * accounted for when the file is later read! + * + * The data is copied, so the caller is free to modify it afterwards. + */ +static void +save_state_data(const void *data, uint32 len) +{ + uint32 padlen = MAXALIGN(len); + + if (padlen > records.bytes_free) + { + records.tail->next = palloc0(sizeof(XLogRecData)); + records.tail = records.tail->next; + records.tail->buffer = InvalidBuffer; + records.tail->len = 0; + records.tail->next = NULL; + + records.bytes_free = Max(padlen, 512); + records.tail->data = palloc(records.bytes_free); + } + + memcpy(((char *) records.tail->data) + records.tail->len, data, len); + records.tail->len += padlen; + records.bytes_free -= padlen; + records.total_len += padlen; +} + +/* + * Start preparing a state file. + * + * Initializes data structure and inserts the 2PC file header record. + */ +void +StartPrepare(GlobalTransaction gxact) +{ + TransactionId xid = gxact->proc.xid; + TwoPhaseFileHeader hdr; + TransactionId *children; + RelFileNode *commitrels; + RelFileNode *abortrels; + + /* Initialize linked list */ + records.head = palloc0(sizeof(XLogRecData)); + records.head->buffer = InvalidBuffer; + records.head->len = 0; + records.head->next = NULL; + + records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512); + records.head->data = palloc(records.bytes_free); + + records.tail = records.head; + + records.total_len = 0; + + /* Create header */ + hdr.magic = TWOPHASE_MAGIC; + hdr.total_len = 0; /* EndPrepare will fill this in */ + hdr.xid = xid; + hdr.database = MyDatabaseId; + hdr.owner = GetUserId(); + hdr.nsubxacts = xactGetCommittedChildren(&children); + hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels); + hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels); + StrNCpy(hdr.gid, gxact->gid, GIDSIZE); + + save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); + + /* Add the additional info about subxacts and deletable files */ + if (hdr.nsubxacts > 0) + { + save_state_data(children, hdr.nsubxacts * sizeof(TransactionId)); + /* While we have the child-xact data, stuff it in the gxact too */ + GXactLoadSubxactData(gxact, hdr.nsubxacts, children); + pfree(children); + } + if (hdr.ncommitrels > 0) + { + save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode)); + pfree(commitrels); + } + if (hdr.nabortrels > 0) + { + save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode)); + pfree(abortrels); + } +} + +/* + * Finish preparing state file. + * + * Calculates CRC and writes state file to WAL and in pg_twophase directory. + */ +void +EndPrepare(GlobalTransaction gxact) +{ + TransactionId xid = gxact->proc.xid; + TwoPhaseFileHeader *hdr; + char path[MAXPGPATH]; + XLogRecData *record; + XLogRecPtr recptr; + pg_crc32 statefile_crc; + pg_crc32 bogus_crc; + int fd; + + /* Add the end sentinel to the list of 2PC records */ + RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, + NULL, 0); + + /* Go back and fill in total_len in the file header record */ + hdr = (TwoPhaseFileHeader *) records.head->data; + Assert(hdr->magic == TWOPHASE_MAGIC); + hdr->total_len = records.total_len + sizeof(pg_crc32); + + /* + * Create the 2PC state file. + * + * Note: because we use BasicOpenFile(), we are responsible for ensuring + * the FD gets closed in any error exit path. Once we get into the + * critical section, though, it doesn't matter since any failure causes + * PANIC anyway. + */ + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create twophase state file \"%s\": %m", + path))); + + /* Write data to file, and calculate CRC as we pass over it */ + INIT_CRC32(statefile_crc); + + for (record = records.head; record != NULL; record = record->next) + { + COMP_CRC32(statefile_crc, record->data, record->len); + if ((write(fd, record->data, record->len)) != record->len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + } + + FIN_CRC32(statefile_crc); + + /* + * Write a deliberately bogus CRC to the state file, and flush it to disk. + * This is to minimize the odds of failure within the critical section + * below --- in particular, running out of disk space. + * + * On most filesystems, write() rather than fsync() detects out-of-space, + * so the fsync might be considered optional. Using it means there + * are three fsyncs not two associated with preparing a transaction; is + * the risk of an error from fsync high enough to justify that? + */ + bogus_crc = ~ statefile_crc; + + if ((write(fd, &bogus_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + + if (pg_fsync(fd) != 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file: %m"))); + } + + /* Back up to prepare for rewriting the CRC */ + if (lseek(fd, -((off_t) sizeof(pg_crc32)), SEEK_CUR) < 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek twophase state file: %m"))); + } + + /* + * The state file isn't valid yet, because we haven't written the correct + * CRC yet. Before we do that, insert entry in WAL and flush it to disk. + * + * Between the time we have written the WAL entry and the time we + * flush the correct state file CRC to disk, we have an inconsistency: + * the xact is prepared according to WAL but not according to our on-disk + * state. We use a critical section to force a PANIC if we are unable to + * complete the flush --- then, WAL replay should repair the + * inconsistency. + * + * We have to lock out checkpoint start here, too; otherwise a checkpoint + * starting immediately after the WAL record is inserted could complete + * before we've finished flushing, meaning that the WAL record would not + * get replayed if a crash follows. + */ + START_CRIT_SECTION(); + + LWLockAcquire(CheckpointStartLock, LW_SHARED); + + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head); + XLogFlush(recptr); + + /* If we crash now, we have prepared: WAL replay will fix things */ + + /* write correct CRC, flush, and close file */ + if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + + if (pg_fsync(fd) != 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file: %m"))); + } + + if (close(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close twophase state file: %m"))); + + LWLockRelease(CheckpointStartLock); + + END_CRIT_SECTION(); + + records.tail = records.head = NULL; +} + +/* + * Register a 2PC record to be written to state file. + */ +void +RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, + const void *data, uint32 len) +{ + TwoPhaseRecordOnDisk record; + + record.rmid = rmid; + record.info = info; + record.len = len; + save_state_data(&record, sizeof(TwoPhaseRecordOnDisk)); + if (len > 0) + save_state_data(data, len); +} + + +/* + * Read and validate the state file for xid. + * + * If it looks OK (has a valid magic number and CRC), return the palloc'd + * contents of the file. Otherwise return NULL. + */ +static char * +ReadTwoPhaseFile(TransactionId xid) +{ + char path[MAXPGPATH]; + char *buf; + TwoPhaseFileHeader *hdr; + int fd; + struct stat stat; + uint32 crc_offset; + pg_crc32 calc_crc, file_crc; + + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (fd < 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not open twophase state file \"%s\": %m", + path))); + return NULL; + } + + /* + * Check file length. We can determine a lower bound pretty easily. + * We set an upper bound mainly to avoid palloc() failure on a corrupt + * file. + */ + if (fstat(fd, &stat)) + { + close(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not stat twophase state file \"%s\": %m", + path))); + return NULL; + } + + if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) + + MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) + + sizeof(pg_crc32)) || + stat.st_size > 10000000) + { + close(fd); + return NULL; + } + + crc_offset = stat.st_size - sizeof(pg_crc32); + if (crc_offset != MAXALIGN(crc_offset)) + { + close(fd); + return NULL; + } + + /* + * OK, slurp in the file. + */ + buf = (char *) palloc(stat.st_size); + + if (read(fd, buf, stat.st_size) != stat.st_size) + { + close(fd); + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not read twophase state file \"%s\": %m", + path))); + pfree(buf); + return NULL; + } + + close(fd); + + hdr = (TwoPhaseFileHeader *) buf; + if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size) + { + pfree(buf); + return NULL; + } + + INIT_CRC32(calc_crc); + COMP_CRC32(calc_crc, buf, crc_offset); + FIN_CRC32(calc_crc); + + file_crc = *((pg_crc32 *) (buf + crc_offset)); + + if (!EQ_CRC32(calc_crc, file_crc)) + { + pfree(buf); + return NULL; + } + + return buf; +} + + +/* + * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED + */ +void +FinishPreparedTransaction(char *gid, bool isCommit) +{ + GlobalTransaction gxact; + TransactionId xid; + char *buf; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *children; + RelFileNode *commitrels; + RelFileNode *abortrels; + int i; + + /* + * Validate the GID, and lock the GXACT to ensure that two backends + * do not try to commit the same GID at once. + */ + gxact = LockGXact(gid, GetUserId()); + xid = gxact->proc.xid; + + /* + * Read and validate the state file + */ + buf = ReadTwoPhaseFile(xid); + if (buf == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("twophase state file for transaction %u is corrupt", + xid))); + + /* + * Disassemble the header area + */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + children = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + commitrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + abortrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + /* + * The order of operations here is critical: make the XLOG entry for + * commit or abort, then mark the transaction committed or aborted in + * pg_clog, then remove its PGPROC from the global ProcArray (which + * means TransactionIdIsInProgress will stop saying the prepared xact + * is in progress), then run the post-commit or post-abort callbacks. + * The callbacks will release the locks the transaction held. + */ + if (isCommit) + RecordTransactionCommitPrepared(xid, + hdr->nsubxacts, children, + hdr->ncommitrels, commitrels); + else + RecordTransactionAbortPrepared(xid, + hdr->nsubxacts, children, + hdr->nabortrels, abortrels); + + ProcArrayRemove(&gxact->proc); + + /* + * In case we fail while running the callbacks, mark the gxact invalid + * so no one else will try to commit/rollback, and so it can be recycled + * properly later. It is still locked by our XID so it won't go away yet. + */ + gxact->valid = false; + + if (isCommit) + ProcessRecords(bufptr, xid, twophase_postcommit_callbacks); + else + ProcessRecords(bufptr, xid, twophase_postabort_callbacks); + + /* + * We also have to remove any files that were supposed to be dropped. + * NB: this code knows that we couldn't be dropping any temp rels ... + */ + if (isCommit) + { + for (i = 0; i < hdr->ncommitrels; i++) + smgrdounlink(smgropen(commitrels[i]), false, false); + } + else + { + for (i = 0; i < hdr->nabortrels; i++) + smgrdounlink(smgropen(abortrels[i]), false, false); + } + + pgstat_count_xact_commit(); + + /* + * And now we can clean up our mess. + */ + RemoveTwoPhaseFile(xid, true); + + RemoveGXact(gxact); + + pfree(buf); +} + +/* + * Scan a 2PC state file (already read into memory by ReadTwoPhaseFile) + * and call the indicated callbacks for each 2PC record. + */ +static void +ProcessRecords(char *bufptr, TransactionId xid, + const TwoPhaseCallback callbacks[]) +{ + for (;;) + { + TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr; + + Assert(record->rmid <= TWOPHASE_RM_MAX_ID); + if (record->rmid == TWOPHASE_RM_END_ID) + break; + + bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk)); + + if (callbacks[record->rmid] != NULL) + callbacks[record->rmid](xid, record->info, + (void *) bufptr, record->len); + + bufptr += MAXALIGN(record->len); + } +} + +/* + * Remove the 2PC file for the specified XID. + * + * If giveWarning is false, do not complain about file-not-present; + * this is an expected case during WAL replay. + */ +void +RemoveTwoPhaseFile(TransactionId xid, bool giveWarning) +{ + char path[MAXPGPATH]; + + TwoPhaseFilePath(path, xid); + if (unlink(path)) + if (errno != ENOENT || giveWarning) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove two-phase state file \"%s\": %m", + path))); +} + +/* + * Recreates a state file. This is used in WAL replay. + * + * Note: content and len don't include CRC. + */ +void +RecreateTwoPhaseFile(TransactionId xid, void *content, int len) +{ + char path[MAXPGPATH]; + pg_crc32 statefile_crc; + int fd; + + /* Recompute CRC */ + INIT_CRC32(statefile_crc); + COMP_CRC32(statefile_crc, content, len); + FIN_CRC32(statefile_crc); + + TwoPhaseFilePath(path, xid); + + fd = BasicOpenFile(path, + O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not recreate twophase state file \"%s\": %m", + path))); + + /* Write content and CRC */ + if (write(fd, content, len) != len) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + if (write(fd, &statefile_crc, sizeof(pg_crc32)) != sizeof(pg_crc32)) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write twophase state file: %m"))); + } + + /* Sync and close the file */ + if (pg_fsync(fd) != 0) + { + close(fd); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync twophase state file: %m"))); + } + + if (close(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close twophase state file: %m"))); +} + +/* + * PrescanPreparedTransactions + * + * Scan the pg_twophase directory and determine the range of valid XIDs + * present. This is run during database startup, after we have completed + * reading WAL. ShmemVariableCache->nextXid has been set to one more than + * the highest XID for which evidence exists in WAL. + * + * We throw away any prepared xacts with main XID beyond nextXid --- if any + * are present, it suggests that the DBA has done a PITR recovery to an + * earlier point in time without cleaning out pg_twophase. We dare not + * try to recover such prepared xacts since they likely depend on database + * state that doesn't exist now. + * + * However, we will advance nextXid beyond any subxact XIDs belonging to + * valid prepared xacts. We need to do this since subxact commit doesn't + * write a WAL entry, and so there might be no evidence in WAL of those + * subxact XIDs. + * + * Our other responsibility is to determine and return the oldest valid XID + * among the prepared xacts (if none, return ShmemVariableCache->nextXid). + * This is needed to synchronize pg_subtrans startup properly. + */ +TransactionId +PrescanPreparedTransactions(void) +{ + TransactionId origNextXid = ShmemVariableCache->nextXid; + TransactionId result = origNextXid; + char dir[MAXPGPATH]; + DIR *cldir; + struct dirent *clde; + + snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR); + + cldir = AllocateDir(dir); + if (cldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", dir))); + + errno = 0; + while ((clde = readdir(cldir)) != NULL) + { + if (strlen(clde->d_name) == 8 && + strspn(clde->d_name, "0123456789ABCDEF") == 8) + { + TransactionId xid; + char *buf; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + int i; + + xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + ereport(WARNING, + (errmsg("removing future twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + /* + * Note: we can't check if already processed because clog + * subsystem isn't up yet. + */ + + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid); + if (buf == NULL) + { + ereport(WARNING, + (errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + if (!TransactionIdEquals(hdr->xid, xid)) + { + ereport(WARNING, + (errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + pfree(buf); + errno = 0; + continue; + } + + /* + * OK, we think this file is valid. Incorporate xid into the + * running-minimum result. + */ + if (TransactionIdPrecedes(xid, result)) + result = xid; + + /* + * Examine subtransaction XIDs ... they should all follow main + * XID, and they may force us to advance nextXid. + */ + subxids = (TransactionId *) + (buf + MAXALIGN(sizeof(TwoPhaseFileHeader))); + for (i = 0; i < hdr->nsubxacts; i++) + { + TransactionId subxid = subxids[i]; + + Assert(TransactionIdFollows(subxid, xid)); + if (TransactionIdFollowsOrEquals(subxid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = subxid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + } + + pfree(buf); + } + errno = 0; + } +#ifdef WIN32 + + /* + * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but + * not in released version + */ + if (GetLastError() == ERROR_NO_MORE_FILES) + errno = 0; +#endif + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", dir))); + + FreeDir(cldir); + + return result; +} + +/* + * RecoverPreparedTransactions + * + * Scan the pg_twophase directory and reload shared-memory state for each + * prepared transaction (reacquire locks, etc). This is run during database + * startup. + */ +void +RecoverPreparedTransactions(void) +{ + char dir[MAXPGPATH]; + DIR *cldir; + struct dirent *clde; + + snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR); + + cldir = AllocateDir(dir); + if (cldir == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", dir))); + + errno = 0; + while ((clde = readdir(cldir)) != NULL) + { + if (strlen(clde->d_name) == 8 && + strspn(clde->d_name, "0123456789ABCDEF") == 8) + { + TransactionId xid; + char *buf; + char *bufptr; + TwoPhaseFileHeader *hdr; + TransactionId *subxids; + GlobalTransaction gxact; + int i; + + xid = (TransactionId) strtoul(clde->d_name, NULL, 16); + + /* Already processed? */ + if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) + { + ereport(WARNING, + (errmsg("removing stale twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + /* Read and validate file */ + buf = ReadTwoPhaseFile(xid); + if (buf == NULL) + { + ereport(WARNING, + (errmsg("removing corrupt twophase state file \"%s\"", + clde->d_name))); + RemoveTwoPhaseFile(xid, true); + errno = 0; + continue; + } + + ereport(LOG, + (errmsg("recovering prepared transaction %u", xid))); + + /* Deconstruct header */ + hdr = (TwoPhaseFileHeader *) buf; + Assert(TransactionIdEquals(hdr->xid, xid)); + bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); + subxids = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + /* + * Reconstruct subtrans state for the transaction --- needed + * because pg_subtrans is not preserved over a restart + */ + for (i = 0; i < hdr->nsubxacts; i++) + SubTransSetParent(subxids[i], xid); + + /* + * Recreate its GXACT and dummy PGPROC + */ + gxact = MarkAsPreparing(xid, hdr->database, hdr->gid, hdr->owner); + GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); + MarkAsPrepared(gxact); + + /* + * Recover other state (notably locks) using resource managers + */ + ProcessRecords(bufptr, xid, twophase_recover_callbacks); + + pfree(buf); + } + errno = 0; + } +#ifdef WIN32 + + /* + * This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but + * not in released version + */ + if (GetLastError() == ERROR_NO_MORE_FILES) + errno = 0; +#endif + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", dir))); + + FreeDir(cldir); +} + +/* + * RecordTransactionCommitPrepared + * + * This is basically the same as RecordTransactionCommit: in particular, + * we must take the CheckpointStartLock to avoid a race condition. + * + * We know the transaction made at least one XLOG entry (its PREPARE), + * so it is never possible to optimize out the commit record. + */ +static void +RecordTransactionCommitPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels) +{ + XLogRecData rdata[3]; + int lastrdata = 0; + xl_xact_commit_prepared xlrec; + XLogRecPtr recptr; + + START_CRIT_SECTION(); + + /* See notes in RecordTransactionCommit */ + LWLockAcquire(CheckpointStartLock, LW_SHARED); + + /* Emit the XLOG commit record */ + xlrec.xid = xid; + xlrec.crec.xtime = time(NULL); + xlrec.crec.nrels = nrels; + xlrec.crec.nsubxacts = nchildren; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactCommitPrepared; + rdata[0].buffer = InvalidBuffer; + /* dump rels to delete */ + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].data = (char *) rels; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].buffer = InvalidBuffer; + lastrdata = 1; + } + /* dump committed child Xids */ + if (nchildren > 0) + { + rdata[lastrdata].next = &(rdata[2]); + rdata[2].data = (char *) children; + rdata[2].len = nchildren * sizeof(TransactionId); + rdata[2].buffer = InvalidBuffer; + lastrdata = 2; + } + rdata[lastrdata].next = NULL; + + recptr = XLogInsert(RM_XACT_ID, + XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN, + rdata); + + /* we don't currently try to sleep before flush here ... */ + + /* Flush XLOG to disk */ + XLogFlush(recptr); + + /* Mark the transaction committed in pg_clog */ + TransactionIdCommit(xid); + /* to avoid race conditions, the parent must commit first */ + TransactionIdCommitTree(nchildren, children); + + /* Checkpoint is allowed again */ + LWLockRelease(CheckpointStartLock); + + END_CRIT_SECTION(); +} + +/* + * RecordTransactionAbortPrepared + * + * This is basically the same as RecordTransactionAbort. + * + * We know the transaction made at least one XLOG entry (its PREPARE), + * so it is never possible to optimize out the abort record. + */ +static void +RecordTransactionAbortPrepared(TransactionId xid, + int nchildren, + TransactionId *children, + int nrels, + RelFileNode *rels) +{ + XLogRecData rdata[3]; + int lastrdata = 0; + xl_xact_abort_prepared xlrec; + XLogRecPtr recptr; + + /* + * Catch the scenario where we aborted partway through + * RecordTransactionCommitPrepared ... + */ + if (TransactionIdDidCommit(xid)) + elog(PANIC, "cannot abort transaction %u, it was already committed", + xid); + + START_CRIT_SECTION(); + + /* Emit the XLOG abort record */ + xlrec.xid = xid; + xlrec.arec.xtime = time(NULL); + xlrec.arec.nrels = nrels; + xlrec.arec.nsubxacts = nchildren; + rdata[0].data = (char *) (&xlrec); + rdata[0].len = MinSizeOfXactAbortPrepared; + rdata[0].buffer = InvalidBuffer; + /* dump rels to delete */ + if (nrels > 0) + { + rdata[0].next = &(rdata[1]); + rdata[1].data = (char *) rels; + rdata[1].len = nrels * sizeof(RelFileNode); + rdata[1].buffer = InvalidBuffer; + lastrdata = 1; + } + /* dump committed child Xids */ + if (nchildren > 0) + { + rdata[lastrdata].next = &(rdata[2]); + rdata[2].data = (char *) children; + rdata[2].len = nchildren * sizeof(TransactionId); + rdata[2].buffer = InvalidBuffer; + lastrdata = 2; + } + rdata[lastrdata].next = NULL; + + recptr = XLogInsert(RM_XACT_ID, + XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN, + rdata); + + /* Always flush, since we're about to remove the 2PC state file */ + XLogFlush(recptr); + + /* + * Mark the transaction aborted in clog. This is not absolutely + * necessary but we may as well do it while we are here. + */ + TransactionIdAbort(xid); + TransactionIdAbortTree(nchildren, children); + + END_CRIT_SECTION(); +} + diff --git a/src/backend/access/transam/twophase_rmgr.c b/src/backend/access/transam/twophase_rmgr.c new file mode 100644 index 00000000000..e78f8b2fbb3 --- /dev/null +++ b/src/backend/access/transam/twophase_rmgr.c @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * twophase_rmgr.c + * Two-phase-commit resource managers tables + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/transam/twophase_rmgr.c,v 1.1 2005/06/17 22:32:42 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/twophase_rmgr.h" +#include "commands/async.h" +#include "storage/lock.h" +#include "utils/flatfiles.h" +#include "utils/inval.h" + + +const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] = +{ + NULL, /* END ID */ + lock_twophase_recover, /* Lock */ + NULL, /* Inval */ + NULL, /* flat file update */ + NULL /* notify/listen */ +}; + +const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] = +{ + NULL, /* END ID */ + lock_twophase_postcommit, /* Lock */ + inval_twophase_postcommit, /* Inval */ + flatfile_twophase_postcommit, /* flat file update */ + notify_twophase_postcommit /* notify/listen */ +}; + +const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] = +{ + NULL, /* END ID */ + lock_twophase_postabort, /* Lock */ + NULL, /* Inval */ + NULL, /* flat file update */ + NULL /* notify/listen */ +}; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 040a4ab0b79..74163b7f576 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.204 2005/06/06 20:22:57 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.205 2005/06/17 22:32:42 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -22,6 +22,7 @@ #include "access/multixact.h" #include "access/subtrans.h" +#include "access/twophase.h" #include "access/xact.h" #include "catalog/heap.h" #include "catalog/index.h" @@ -68,7 +69,8 @@ typedef enum TransState TRANS_START, TRANS_INPROGRESS, TRANS_COMMIT, - TRANS_ABORT + TRANS_ABORT, + TRANS_PREPARE } TransState; /* @@ -90,6 +92,7 @@ typedef enum TBlockState TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */ + TBLOCK_PREPARE, /* live xact, PREPARE received */ /* subtransaction states */ TBLOCK_SUBBEGIN, /* starting a subtransaction */ @@ -172,6 +175,12 @@ static CommandId currentCommandId; static AbsoluteTime xactStartTime; /* integer part */ static int xactStartTimeUsec; /* microsecond part */ +/* + * GID to be used for preparing the current transaction. This is also + * global to a whole transaction, so we don't keep it in the state stack. + */ +static char *prepareGID; + /* * List of add-on start- and end-of-xact callbacks @@ -267,10 +276,12 @@ IsTransactionState(void) return true; case TRANS_ABORT: return true; + case TRANS_PREPARE: + return true; } /* - * Shouldn't get here, but lint is not happy with this... + * Shouldn't get here, but lint is not happy without this... */ return false; } @@ -660,12 +671,12 @@ void RecordTransactionCommit(void) { int nrels; - RelFileNode *rptr; + RelFileNode *rels; int nchildren; TransactionId *children; /* Get data needed for commit record */ - nrels = smgrGetPendingDeletes(true, &rptr); + nrels = smgrGetPendingDeletes(true, &rels); nchildren = xactGetCommittedChildren(&children); /* @@ -726,7 +737,7 @@ RecordTransactionCommit(void) if (nrels > 0) { rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rptr; + rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; @@ -809,12 +820,9 @@ RecordTransactionCommit(void) MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; - /* Show myself as out of the transaction in PGPROC array */ - MyProc->logRec.xrecoff = 0; - /* And clean up local data */ - if (rptr) - pfree(rptr); + if (rels) + pfree(rels); if (children) pfree(children); } @@ -970,12 +978,12 @@ static void RecordTransactionAbort(void) { int nrels; - RelFileNode *rptr; + RelFileNode *rels; int nchildren; TransactionId *children; /* Get data needed for abort record */ - nrels = smgrGetPendingDeletes(false, &rptr); + nrels = smgrGetPendingDeletes(false, &rels); nchildren = xactGetCommittedChildren(&children); /* @@ -1026,7 +1034,7 @@ RecordTransactionAbort(void) if (nrels > 0) { rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rptr; + rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; @@ -1069,12 +1077,9 @@ RecordTransactionAbort(void) MyXactMadeXLogEntry = false; MyXactMadeTempRelUpdate = false; - /* Show myself as out of the transaction in PGPROC array */ - MyProc->logRec.xrecoff = 0; - /* And clean up local data */ - if (rptr) - pfree(rptr); + if (rels) + pfree(rels); if (children) pfree(children); } @@ -1166,13 +1171,13 @@ static void RecordSubTransactionAbort(void) { int nrels; - RelFileNode *rptr; + RelFileNode *rels; TransactionId xid = GetCurrentTransactionId(); int nchildren; TransactionId *children; /* Get data needed for abort record */ - nrels = smgrGetPendingDeletes(false, &rptr); + nrels = smgrGetPendingDeletes(false, &rels); nchildren = xactGetCommittedChildren(&children); /* @@ -1212,7 +1217,7 @@ RecordSubTransactionAbort(void) if (nrels > 0) { rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rptr; + rdata[1].data = (char *) rels; rdata[1].len = nrels * sizeof(RelFileNode); rdata[1].buffer = InvalidBuffer; lastrdata = 1; @@ -1256,8 +1261,8 @@ RecordSubTransactionAbort(void) XidCacheRemoveRunningXids(xid, nchildren, children); /* And clean up local data */ - if (rptr) - pfree(rptr); + if (rels) + pfree(rels); if (children) pfree(children); } @@ -1419,8 +1424,11 @@ StartTransaction(void) ShowTransactionState("StartTransaction"); } + /* * CommitTransaction + * + * NB: if you change this routine, better look at PrepareTransaction too! */ static void CommitTransaction(void) @@ -1510,6 +1518,8 @@ CommitTransaction(void) * xid 0 as running as well, or it will be able to see two tuple versions * - one deleted by xid 1 and one inserted by xid 0. See notes in * GetSnapshotData. + * + * Note: MyProc may be null during bootstrap. *---------- */ if (MyProc != NULL) @@ -1608,6 +1618,225 @@ CommitTransaction(void) RESUME_INTERRUPTS(); } + +/* + * PrepareTransaction + * + * NB: if you change this routine, better look at CommitTransaction too! + */ +static void +PrepareTransaction(void) +{ + TransactionState s = CurrentTransactionState; + TransactionId xid = GetCurrentTransactionId(); + GlobalTransaction gxact; + + ShowTransactionState("PrepareTransaction"); + + /* + * check the current transaction state + */ + if (s->state != TRANS_INPROGRESS) + elog(WARNING, "PrepareTransaction while in %s state", + TransStateAsString(s->state)); + Assert(s->parent == NULL); + + /* + * Do pre-commit processing (most of this stuff requires database + * access, and in fact could still cause an error...) + * + * It is possible for PrepareHoldablePortals to invoke functions that + * queue deferred triggers, and it's also possible that triggers create + * holdable cursors. So we have to loop until there's nothing left to + * do. + */ + for (;;) + { + /* + * Fire all currently pending deferred triggers. + */ + AfterTriggerFireDeferred(); + + /* + * Convert any open holdable cursors into static portals. If there + * weren't any, we are done ... otherwise loop back to check if they + * queued deferred triggers. Lather, rinse, repeat. + */ + if (!PrepareHoldablePortals()) + break; + } + + /* Now we can shut down the deferred-trigger manager */ + AfterTriggerEndXact(true); + + /* Close any open regular cursors */ + AtCommit_Portals(); + + /* + * Let ON COMMIT management do its thing (must happen after closing + * cursors, to avoid dangling-reference problems) + */ + PreCommit_on_commit_actions(); + + /* close large objects before lower-level cleanup */ + AtEOXact_LargeObject(true); + + /* NOTIFY and flatfiles will be handled below */ + + /* Prevent cancel/die interrupt while cleaning up */ + HOLD_INTERRUPTS(); + + /* + * set the current transaction state information appropriately during + * the processing + */ + s->state = TRANS_PREPARE; + + /* Tell bufmgr and smgr to prepare for commit */ + BufmgrCommit(); + + /* + * Reserve the GID for this transaction. This could fail if the + * requested GID is invalid or already in use. + */ + gxact = MarkAsPreparing(xid, MyDatabaseId, prepareGID, GetUserId()); + prepareGID = NULL; + + /* + * Collect data for the 2PC state file. Note that in general, no actual + * state change should happen in the called modules during this step, + * since it's still possible to fail before commit, and in that case we + * want transaction abort to be able to clean up. (In particular, the + * AtPrepare routines may error out if they find cases they cannot + * handle.) State cleanup should happen in the PostPrepare routines + * below. However, some modules can go ahead and clear state here + * because they wouldn't do anything with it during abort anyway. + * + * Note: because the 2PC state file records will be replayed in the same + * order they are made, the order of these calls has to match the order + * in which we want things to happen during COMMIT PREPARED or + * ROLLBACK PREPARED; in particular, pay attention to whether things + * should happen before or after releasing the transaction's locks. + */ + StartPrepare(gxact); + + AtPrepare_Notify(); + AtPrepare_UpdateFlatFiles(); + AtPrepare_Inval(); + AtPrepare_Locks(); + + /* + * Here is where we really truly prepare. + * + * We have to record transaction prepares even if we didn't + * make any updates, because the transaction manager might + * get confused if we lose a global transaction. + */ + EndPrepare(gxact); + + /* + * Mark the prepared transaction as valid. As soon as we mark ourselves + * not running in MyProc below, others can commit/rollback the xact. + * + * NB: a side effect of this is to make a dummy ProcArray entry for the + * prepared XID. This must happen before we clear the XID from MyProc, + * else there is a window where the XID is not running according to + * TransactionIdInProgress, and onlookers would be entitled to assume + * the xact crashed. Instead we have a window where the same XID + * appears twice in ProcArray, which is OK. + */ + MarkAsPrepared(gxact); + + /* + * Now we clean up backend-internal state and release internal + * resources. + */ + + /* Break the chain of back-links in the XLOG records I output */ + MyLastRecPtr.xrecoff = 0; + MyXactMadeXLogEntry = false; + MyXactMadeTempRelUpdate = false; + + /* + * Let others know about no transaction in progress by me. This has + * to be done *after* the prepared transaction has been marked valid, + * else someone may think it is unlocked and recyclable. + */ + + /* Lock ProcArrayLock because that's what GetSnapshotData uses. */ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->xid = InvalidTransactionId; + MyProc->xmin = InvalidTransactionId; + + /* Clear the subtransaction-XID cache too while holding the lock */ + MyProc->subxids.nxids = 0; + MyProc->subxids.overflowed = false; + + LWLockRelease(ProcArrayLock); + + /* + * This is all post-transaction cleanup. Note that if an error is raised + * here, it's too late to abort the transaction. This should be just + * noncritical resource releasing. See notes in CommitTransaction. + */ + + CallXactCallbacks(XACT_EVENT_PREPARE); + + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_BEFORE_LOCKS, + true, true); + + /* Check we've released all buffer pins */ + AtEOXact_Buffers(true); + + /* notify and flatfiles don't need a postprepare call */ + + PostPrepare_Inval(); + + PostPrepare_smgr(); + + AtEOXact_MultiXact(); + + PostPrepare_Locks(xid); + + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_LOCKS, + true, true); + ResourceOwnerRelease(TopTransactionResourceOwner, + RESOURCE_RELEASE_AFTER_LOCKS, + true, true); + + /* PREPARE acts the same as COMMIT as far as GUC is concerned */ + AtEOXact_GUC(true, false); + AtEOXact_SPI(true); + AtEOXact_on_commit_actions(true); + AtEOXact_Namespace(true); + /* smgrcommit already done */ + AtEOXact_Files(); + + CurrentResourceOwner = NULL; + ResourceOwnerDelete(TopTransactionResourceOwner); + s->curTransactionOwner = NULL; + CurTransactionResourceOwner = NULL; + TopTransactionResourceOwner = NULL; + + AtCommit_Memory(); + + s->transactionId = InvalidTransactionId; + s->subTransactionId = InvalidSubTransactionId; + s->nestingLevel = 0; + s->childXids = NIL; + + /* + * done with 1st phase commit processing, set current transaction + * state back to default + */ + s->state = TRANS_DEFAULT; + + RESUME_INTERRUPTS(); +} + + /* * AbortTransaction */ @@ -1640,7 +1869,7 @@ AbortTransaction(void) /* * check the current transaction state */ - if (s->state != TRANS_INPROGRESS) + if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); Assert(s->parent == NULL); @@ -1833,6 +2062,7 @@ StartTransactionCommand(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(ERROR, "StartTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -1935,6 +2165,15 @@ CommitTransactionCommand(void) break; /* + * We are completing a "PREPARE TRANSACTION" command. Do it and + * return to the idle state. + */ + case TBLOCK_PREPARE: + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; + + /* * We were just issued a SAVEPOINT inside a transaction block. * Start a subtransaction. (DefineSavepoint already did * PushTransaction, so as to have someplace to put the @@ -1964,6 +2203,12 @@ CommitTransactionCommand(void) CommitTransaction(); s->blockState = TBLOCK_DEFAULT; } + else if (s->blockState == TBLOCK_PREPARE) + { + Assert(s->parent == NULL); + PrepareTransaction(); + s->blockState = TBLOCK_DEFAULT; + } else { Assert(s->blockState == TBLOCK_INPROGRESS || @@ -2156,6 +2401,17 @@ AbortCurrentTransaction(void) break; /* + * Here, we failed while trying to PREPARE. Clean up the + * transaction and return to idle state (we do not want to + * stay in the transaction). + */ + case TBLOCK_PREPARE: + AbortTransaction(); + CleanupTransaction(); + s->blockState = TBLOCK_DEFAULT; + break; + + /* * We got an error inside a subtransaction. Abort just the * subtransaction, and go to the persistent SUBABORT state * until we get ROLLBACK. @@ -2487,6 +2743,7 @@ BeginTransactionBlock(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "BeginTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2494,6 +2751,57 @@ BeginTransactionBlock(void) } /* + * PrepareTransactionBlock + * This executes a PREPARE command. + * + * Since PREPARE may actually do a ROLLBACK, the result indicates what + * happened: TRUE for PREPARE, FALSE for ROLLBACK. + * + * Note that we don't actually do anything here except change blockState. + * The real work will be done in the upcoming PrepareTransaction(). + * We do it this way because it's not convenient to change memory context, + * resource owner, etc while executing inside a Portal. + */ +bool +PrepareTransactionBlock(char *gid) +{ + TransactionState s; + bool result; + + /* Set up to commit the current transaction */ + result = EndTransactionBlock(); + + /* If successful, change outer tblock state to PREPARE */ + if (result) + { + s = CurrentTransactionState; + + while (s->parent != NULL) + s = s->parent; + + if (s->blockState == TBLOCK_END) + { + /* Save GID where PrepareTransaction can find it again */ + prepareGID = MemoryContextStrdup(TopTransactionContext, gid); + + s->blockState = TBLOCK_PREPARE; + } + else + { + /* + * ignore case where we are not in a transaction; + * EndTransactionBlock already issued a warning. + */ + Assert(s->blockState == TBLOCK_STARTED); + /* Don't send back a PREPARE result tag... */ + result = false; + } + } + + return result; +} + +/* * EndTransactionBlock * This executes a COMMIT command. * @@ -2603,6 +2911,7 @@ EndTransactionBlock(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2694,6 +3003,7 @@ UserAbortTransactionBlock(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "UserAbortTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2740,6 +3050,7 @@ DefineSavepoint(char *name) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "DefineSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2795,6 +3106,7 @@ ReleaseSavepoint(List *options) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "ReleaseSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2892,6 +3204,7 @@ RollbackToSavepoint(List *options) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2999,6 +3312,7 @@ BeginInternalSubTransaction(char *name) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "BeginInternalSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -3064,6 +3378,7 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: + case TBLOCK_PREPARE: elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -3111,6 +3426,7 @@ AbortOutOfAnyTransaction(void) case TBLOCK_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: + case TBLOCK_PREPARE: /* In a transaction, so clean up */ AbortTransaction(); CleanupTransaction(); @@ -3202,6 +3518,7 @@ TransactionBlockStatusCode(void) case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBEND: + case TBLOCK_PREPARE: return 'T'; /* in transaction */ case TBLOCK_ABORT: case TBLOCK_SUBABORT: @@ -3684,6 +4001,8 @@ BlockStateAsString(TBlockState blockState) return "ABORT END"; case TBLOCK_ABORT_PENDING: return "ABORT PEND"; + case TBLOCK_PREPARE: + return "PREPARE"; case TBLOCK_SUBBEGIN: return "SUB BEGIN"; case TBLOCK_SUBINPROGRESS: @@ -3717,12 +4036,14 @@ TransStateAsString(TransState state) return "DEFAULT"; case TRANS_START: return "START"; + case TRANS_INPROGRESS: + return "INPROGR"; case TRANS_COMMIT: return "COMMIT"; case TRANS_ABORT: return "ABORT"; - case TRANS_INPROGRESS: - return "INPROGR"; + case TRANS_PREPARE: + return "PREPARE"; } return "UNRECOGNIZED"; } @@ -3767,6 +4088,76 @@ xactGetCommittedChildren(TransactionId **ptr) * XLOG support routines */ +static void +xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid) +{ + TransactionId *sub_xids; + TransactionId max_xid; + int i; + + TransactionIdCommit(xid); + + /* Mark committed subtransactions as committed */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + TransactionIdCommitTree(xlrec->nsubxacts, sub_xids); + + /* Make sure nextXid is beyond any XID mentioned in the record */ + max_xid = xid; + for (i = 0; i < xlrec->nsubxacts; i++) + { + if (TransactionIdPrecedes(max_xid, sub_xids[i])) + max_xid = sub_xids[i]; + } + if (TransactionIdFollowsOrEquals(max_xid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = max_xid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + + /* Make sure files supposed to be dropped are dropped */ + for (i = 0; i < xlrec->nrels; i++) + { + XLogCloseRelation(xlrec->xnodes[i]); + smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + } +} + +static void +xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) +{ + TransactionId *sub_xids; + TransactionId max_xid; + int i; + + TransactionIdAbort(xid); + + /* Mark subtransactions as aborted */ + sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); + TransactionIdAbortTree(xlrec->nsubxacts, sub_xids); + + /* Make sure nextXid is beyond any XID mentioned in the record */ + max_xid = xid; + for (i = 0; i < xlrec->nsubxacts; i++) + { + if (TransactionIdPrecedes(max_xid, sub_xids[i])) + max_xid = sub_xids[i]; + } + if (TransactionIdFollowsOrEquals(max_xid, + ShmemVariableCache->nextXid)) + { + ShmemVariableCache->nextXid = max_xid; + TransactionIdAdvance(ShmemVariableCache->nextXid); + } + + /* Make sure files supposed to be dropped are dropped */ + for (i = 0; i < xlrec->nrels; i++) + { + XLogCloseRelation(xlrec->xnodes[i]); + smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + } +} + void xact_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -3775,138 +4166,137 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record) if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); - TransactionId *sub_xids; - TransactionId max_xid; - int i; - TransactionIdCommit(record->xl_xid); + xact_redo_commit(xlrec, record->xl_xid); + } + else if (info == XLOG_XACT_ABORT) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); - /* Mark committed subtransactions as committed */ - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - TransactionIdCommitTree(xlrec->nsubxacts, sub_xids); + xact_redo_abort(xlrec, record->xl_xid); + } + else if (info == XLOG_XACT_PREPARE) + { + /* the record contents are exactly the 2PC file */ + RecreateTwoPhaseFile(record->xl_xid, + XLogRecGetData(record), record->xl_len); + } + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); - /* Make sure nextXid is beyond any XID mentioned in the record */ - max_xid = record->xl_xid; - for (i = 0; i < xlrec->nsubxacts; i++) - { - if (TransactionIdPrecedes(max_xid, sub_xids[i])) - max_xid = sub_xids[i]; - } - if (TransactionIdFollowsOrEquals(max_xid, - ShmemVariableCache->nextXid)) - { - ShmemVariableCache->nextXid = max_xid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - } + xact_redo_commit(&xlrec->crec, xlrec->xid); + RemoveTwoPhaseFile(xlrec->xid, false); + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record); - /* Make sure files supposed to be dropped are dropped */ + xact_redo_abort(&xlrec->arec, xlrec->xid); + RemoveTwoPhaseFile(xlrec->xid, false); + } + else + elog(PANIC, "xact_redo: unknown op code %u", info); +} + +static void +xact_desc_commit(char *buf, xl_xact_commit *xlrec) +{ + struct tm *tm = localtime(&xlrec->xtime); + int i; + + sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); + if (xlrec->nrels > 0) + { + sprintf(buf + strlen(buf), "; rels:"); for (i = 0; i < xlrec->nrels; i++) { - XLogCloseRelation(xlrec->xnodes[i]); - smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + RelFileNode rnode = xlrec->xnodes[i]; + + sprintf(buf + strlen(buf), " %u/%u/%u", + rnode.spcNode, rnode.dbNode, rnode.relNode); } } - else if (info == XLOG_XACT_ABORT) + if (xlrec->nsubxacts > 0) { - xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); - TransactionId *sub_xids; - TransactionId max_xid; - int i; - - TransactionIdAbort(record->xl_xid); - - /* Mark subtransactions as aborted */ - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - TransactionIdAbortTree(xlrec->nsubxacts, sub_xids); + TransactionId *xacts = (TransactionId *) + &xlrec->xnodes[xlrec->nrels]; - /* Make sure nextXid is beyond any XID mentioned in the record */ - max_xid = record->xl_xid; + sprintf(buf + strlen(buf), "; subxacts:"); for (i = 0; i < xlrec->nsubxacts; i++) - { - if (TransactionIdPrecedes(max_xid, sub_xids[i])) - max_xid = sub_xids[i]; - } - if (TransactionIdFollowsOrEquals(max_xid, - ShmemVariableCache->nextXid)) - { - ShmemVariableCache->nextXid = max_xid; - TransactionIdAdvance(ShmemVariableCache->nextXid); - } + sprintf(buf + strlen(buf), " %u", xacts[i]); + } +} + +static void +xact_desc_abort(char *buf, xl_xact_abort *xlrec) +{ + struct tm *tm = localtime(&xlrec->xtime); + int i; - /* Make sure files supposed to be dropped are dropped */ + sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u", + tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec); + if (xlrec->nrels > 0) + { + sprintf(buf + strlen(buf), "; rels:"); for (i = 0; i < xlrec->nrels; i++) { - XLogCloseRelation(xlrec->xnodes[i]); - smgrdounlink(smgropen(xlrec->xnodes[i]), false, true); + RelFileNode rnode = xlrec->xnodes[i]; + + sprintf(buf + strlen(buf), " %u/%u/%u", + rnode.spcNode, rnode.dbNode, rnode.relNode); } } - else - elog(PANIC, "xact_redo: unknown op code %u", info); + if (xlrec->nsubxacts > 0) + { + TransactionId *xacts = (TransactionId *) + &xlrec->xnodes[xlrec->nrels]; + + sprintf(buf + strlen(buf), "; subxacts:"); + for (i = 0; i < xlrec->nsubxacts; i++) + sprintf(buf + strlen(buf), " %u", xacts[i]); + } } void xact_desc(char *buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; - int i; if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) rec; - struct tm *tm = localtime(&xlrec->xtime); - - sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u", - tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, - tm->tm_hour, tm->tm_min, tm->tm_sec); - if (xlrec->nrels > 0) - { - sprintf(buf + strlen(buf), "; rels:"); - for (i = 0; i < xlrec->nrels; i++) - { - RelFileNode rnode = xlrec->xnodes[i]; - sprintf(buf + strlen(buf), " %u/%u/%u", - rnode.spcNode, rnode.dbNode, rnode.relNode); - } - } - if (xlrec->nsubxacts > 0) - { - TransactionId *xacts = (TransactionId *) - &xlrec->xnodes[xlrec->nrels]; - - sprintf(buf + strlen(buf), "; subxacts:"); - for (i = 0; i < xlrec->nsubxacts; i++) - sprintf(buf + strlen(buf), " %u", xacts[i]); - } + strcat(buf, "commit: "); + xact_desc_commit(buf, xlrec); } else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) rec; - struct tm *tm = localtime(&xlrec->xtime); - sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u", - tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, - tm->tm_hour, tm->tm_min, tm->tm_sec); - if (xlrec->nrels > 0) - { - sprintf(buf + strlen(buf), "; rels:"); - for (i = 0; i < xlrec->nrels; i++) - { - RelFileNode rnode = xlrec->xnodes[i]; + strcat(buf, "abort: "); + xact_desc_abort(buf, xlrec); + } + else if (info == XLOG_XACT_PREPARE) + { + strcat(buf, "prepare"); + } + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec; - sprintf(buf + strlen(buf), " %u/%u/%u", - rnode.spcNode, rnode.dbNode, rnode.relNode); - } - } - if (xlrec->nsubxacts > 0) - { - TransactionId *xacts = (TransactionId *) - &xlrec->xnodes[xlrec->nrels]; + sprintf(buf + strlen(buf), "commit %u: ", xlrec->xid); + xact_desc_commit(buf, &xlrec->crec); + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec; - sprintf(buf + strlen(buf), "; subxacts:"); - for (i = 0; i < xlrec->nsubxacts; i++) - sprintf(buf + strlen(buf), " %u", xacts[i]); - } + sprintf(buf + strlen(buf), "abort %u: ", xlrec->xid); + xact_desc_abort(buf, &xlrec->arec); } else strcat(buf, "UNKNOWN"); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c5469d174f2..15b82ee9be8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.200 2005/06/15 01:36:08 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.201 2005/06/17 22:32:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -25,6 +25,7 @@ #include "access/clog.h" #include "access/multixact.h" #include "access/subtrans.h" +#include "access/twophase.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xlog_internal.h" @@ -814,18 +815,6 @@ begin:; /* Compute record's XLOG location */ INSERT_RECPTR(RecPtr, Insert, curridx); - /* If first XLOG record of transaction, save it in PGPROC array */ - if (MyLastRecPtr.xrecoff == 0 && !no_tran) - { - /* - * We do not acquire ProcArrayLock here because of possible deadlock. - * Anyone who wants to inspect other procs' logRec must acquire - * WALInsertLock, instead. A better solution would be a per-PROC - * spinlock, but no time for that before 7.2 --- tgl 12/19/01. - */ - MyProc->logRec = RecPtr; - } - #ifdef WAL_DEBUG if (XLOG_DEBUG) { @@ -3827,6 +3816,7 @@ BootStrapXLOG(void) BootStrapCLOG(); BootStrapSUBTRANS(); BootStrapMultiXact(); + free(buffer); } @@ -4268,6 +4258,7 @@ StartupXLOG(void) uint32 endLogSeg; XLogRecord *record; uint32 freespace; + TransactionId oldestActiveXID; CritSectionCount++; @@ -4678,33 +4669,8 @@ StartupXLOG(void) XLogCtl->Write.curridx = NextBufIdx(0); } -#ifdef NOT_USED - /* UNDO */ - if (InRecovery) - { - RecPtr = ReadRecPtr; - if (XLByteLT(checkPoint.undo, RecPtr)) - { - ereport(LOG, - (errmsg("undo starts at %X/%X", - RecPtr.xlogid, RecPtr.xrecoff))); - do - { - record = ReadRecord(&RecPtr, PANIC); - if (TransactionIdIsValid(record->xl_xid) && - !TransactionIdDidCommit(record->xl_xid)) - RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record); - RecPtr = record->xl_prev; - } while (XLByteLE(checkPoint.undo, RecPtr)); - ereport(LOG, - (errmsg("undo done at %X/%X", - ReadRecPtr.xlogid, ReadRecPtr.xrecoff))); - } - else - ereport(LOG, - (errmsg("undo is not required"))); - } -#endif + /* Pre-scan prepared transactions to find out the range of XIDs present */ + oldestActiveXID = PrescanPreparedTransactions(); if (InRecovery) { @@ -4767,9 +4733,12 @@ StartupXLOG(void) /* Start up the commit log and related stuff, too */ StartupCLOG(); - StartupSUBTRANS(); + StartupSUBTRANS(oldestActiveXID); StartupMultiXact(); + /* Reload shared-memory state for prepared transactions */ + RecoverPreparedTransactions(); + ereport(LOG, (errmsg("database system is ready"))); CritSectionCount--; @@ -5096,31 +5065,6 @@ CreateCheckPoint(bool shutdown, bool force) } /* - * Get UNDO record ptr - this is oldest of PGPROC->logRec values. We - * do this while holding insert lock to ensure that we won't miss any - * about-to-commit transactions (UNDO must include all xacts that have - * commits after REDO point). - * - * XXX temporarily ifdef'd out to avoid three-way deadlock condition: - * GetUndoRecPtr needs to grab ProcArrayLock to ensure that it is looking - * at a stable set of proc records, but grabbing ProcArrayLock while - * holding WALInsertLock is no good. GetNewTransactionId may cause a - * WAL record to be written while holding XidGenLock, and - * GetSnapshotData needs to get XidGenLock while holding ProcArrayLock, - * so there's a risk of deadlock. Need to find a better solution. See - * pgsql-hackers discussion of 17-Dec-01. - * - * XXX actually, the whole UNDO code is dead code and unlikely to ever be - * revived, so the lack of a good solution here is not troubling. - */ -#ifdef NOT_USED - checkPoint.undo = GetUndoRecPtr(); - - if (shutdown && checkPoint.undo.xrecoff != 0) - elog(PANIC, "active transaction while database system is shutting down"); -#endif - - /* * Now we can release insert lock and checkpoint start lock, allowing * other xacts to proceed even while we are flushing disk buffers. */ @@ -5195,22 +5139,8 @@ CreateCheckPoint(bool shutdown, bool force) /* * Select point at which we can truncate the log, which we base on the * prior checkpoint's earliest info. - * - * With UNDO support: oldest item is redo or undo, whichever is older; - * but watch out for case that undo = 0. - * - * Without UNDO support: just use the redo pointer. This allows xlog - * space to be freed much faster when there are long-running - * transactions. */ -#ifdef NOT_USED - if (ControlFile->checkPointCopy.undo.xrecoff != 0 && - XLByteLT(ControlFile->checkPointCopy.undo, - ControlFile->checkPointCopy.redo)) - XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg); - else -#endif - XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); + XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); /* * Update the control file. |