diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 159 |
1 files changed, 113 insertions, 46 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c9c1d1036e0..57854b0aa57 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -401,6 +401,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: if (change->data.tp.newtuple) { ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple); @@ -420,8 +421,9 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) change->data.snapshot = NULL; } break; + /* no data in addition to the struct itself */ + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: - break; case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; } @@ -1317,6 +1319,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_TRY(); { ReorderBufferChange *change; + ReorderBufferChange *specinsert = NULL; if (using_subtxn) BeginInternalSubTransaction("replay"); @@ -1333,6 +1336,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, switch (change->action) { + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + /* + * Confirmation for speculative insertion arrived. Simply + * use as a normal record. It'll be cleaned up at the end + * of INSERT processing. + */ + Assert(specinsert->data.tp.oldtuple == NULL); + change = specinsert; + change->action = REORDER_BUFFER_CHANGE_INSERT; + + /* intentionally fall through */ case REORDER_BUFFER_CHANGE_INSERT: case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: @@ -1348,7 +1362,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (reloid == InvalidOid && change->data.tp.newtuple == NULL && change->data.tp.oldtuple == NULL) - continue; + goto change_done; else if (reloid == InvalidOid) elog(ERROR, "could not map filenode \"%s\" to relation OID", relpathperm(change->data.tp.relnode, @@ -1362,50 +1376,92 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); - if (RelationIsLogicallyLogged(relation)) + if (!RelationIsLogicallyLogged(relation)) + goto change_done; + + /* + * For now ignore sequence changes entirely. Most of + * the time they don't log changes using records we + * understand, so it doesn't make sense to handle the + * few cases we do. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + goto change_done; + + /* user-triggered change */ + if (!IsToastRelation(relation)) { + ReorderBufferToastReplace(rb, txn, relation, change); + rb->apply_change(rb, txn, relation, change); + /* - * For now ignore sequence changes entirely. Most of - * the time they don't log changes using records we - * understand, so it doesn't make sense to handle the - * few cases we do. + * Only clear reassembled toast chunks if we're + * sure they're not required anymore. The creator + * of the tuple tells us. */ - if (relation->rd_rel->relkind == RELKIND_SEQUENCE) - { - } - /* user-triggered change */ - else if (!IsToastRelation(relation)) - { - ReorderBufferToastReplace(rb, txn, relation, change); - rb->apply_change(rb, txn, relation, change); - - /* - * Only clear reassembled toast chunks if we're - * sure they're not required anymore. The creator - * of the tuple tells us. - */ - if (change->data.tp.clear_toast_afterwards) - ReorderBufferToastReset(rb, txn); - } - /* we're not interested in toast deletions */ - else if (change->action == REORDER_BUFFER_CHANGE_INSERT) - { - /* - * Need to reassemble the full toasted Datum in - * memory, to ensure the chunks don't get reused - * till we're done remove it from the list of this - * transaction's changes. Otherwise it will get - * freed/reused while restoring spooled data from - * disk. - */ - dlist_delete(&change->node); - ReorderBufferToastAppendChunk(rb, txn, relation, - change); - } + if (change->data.tp.clear_toast_afterwards) + ReorderBufferToastReset(rb, txn); + } + /* we're not interested in toast deletions */ + else if (change->action == REORDER_BUFFER_CHANGE_INSERT) + { + /* + * Need to reassemble the full toasted Datum in + * memory, to ensure the chunks don't get reused till + * we're done remove it from the list of this + * transaction's changes. Otherwise it will get + * freed/reused while restoring spooled data from + * disk. + */ + dlist_delete(&change->node); + ReorderBufferToastAppendChunk(rb, txn, relation, + change); + } + + change_done: + /* + * Either speculative insertion was confirmed, or it was + * unsuccessful and the record isn't needed anymore. + */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + if (relation != NULL) + { + RelationClose(relation); + relation = NULL; } - RelationClose(relation); break; + + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + /* + * Speculative insertions are dealt with by delaying the + * processing of the insert until the confirmation record + * arrives. For that we simply unlink the record from the + * chain, so it does not get freed/reused while restoring + * spooled data from disk. + * + * This is safe in the face of concurrent catalog changes + * because the relevant relation can't be changed between + * speculative insertion and confirmation due to + * CheckTableNotInUse() and locking. + */ + + /* clear out a pending (and thus failed) speculation */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + + /* and memorize the pending insertion */ + dlist_delete(&change->node); + specinsert = change; + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: /* get rid of the old */ TeardownHistoricSnapshot(false); @@ -1474,6 +1530,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } } + /* + * There's a a speculative insertion remaining, just clean in up, it + * can't have been successful, otherwise we'd gotten a confirmation + * record. + */ + if (specinsert) + { + ReorderBufferReturnChange(rb, specinsert); + specinsert = NULL; + } + /* clean up the iterator */ ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; @@ -2001,11 +2068,11 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, switch (change->action) { + /* fall through these, they're all similar enough */ case REORDER_BUFFER_CHANGE_INSERT: - /* fall through */ case REORDER_BUFFER_CHANGE_UPDATE: - /* fall through */ case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: { char *data; ReorderBufferTupleBuf *oldtup, @@ -2083,9 +2150,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: - /* ReorderBufferChange contains everything important */ - break; case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; @@ -2256,11 +2322,11 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* restore individual stuff */ switch (change->action) { + /* fall through these, they're all similar enough */ case REORDER_BUFFER_CHANGE_INSERT: - /* fall through */ case REORDER_BUFFER_CHANGE_UPDATE: - /* fall through */ case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: if (change->data.tp.newtuple) { Size len = offsetof(ReorderBufferTupleBuf, t_data) + @@ -2309,6 +2375,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } /* the base struct contains all the data, easy peasy */ + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; |