diff options
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 1ff2c12240d..bf72ad45ec7 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -969,28 +969,40 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder) pfree(workspace); /* - * Either all the xacts got purged or none. It is only possible to - * partially remove the xids from this array if one or more of the xids - * are still running but not all. That can happen if we start decoding - * from a point (LSN where the snapshot state became consistent) where all - * the xacts in this were running and then at least one of those got - * committed and a few are still running. We will never start from such a - * point because we won't move the slot's restart_lsn past the point where - * the oldest running transaction's restart_decoding_lsn is. + * Purge xids in ->catchange as well. The purged array must also be sorted + * in xidComparator order. */ - if (builder->catchange.xcnt == 0 || - TransactionIdFollowsOrEquals(builder->catchange.xip[0], - builder->xmin)) - return; + if (builder->catchange.xcnt > 0) + { + /* + * Since catchange.xip is sorted, we find the lower bound of xids that + * are still interesting. + */ + for (off = 0; off < builder->catchange.xcnt; off++) + { + if (TransactionIdFollowsOrEquals(builder->catchange.xip[off], + builder->xmin)) + break; + } - Assert(TransactionIdFollows(builder->xmin, - builder->catchange.xip[builder->catchange.xcnt - 1])); - pfree(builder->catchange.xip); - builder->catchange.xip = NULL; - builder->catchange.xcnt = 0; + surviving_xids = builder->catchange.xcnt - off; - elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u", - builder->xmin); + if (surviving_xids > 0) + { + memmove(builder->catchange.xip, &(builder->catchange.xip[off]), + surviving_xids * sizeof(TransactionId)); + } + else + { + pfree(builder->catchange.xip); + builder->catchange.xip = NULL; + } + + elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u", + (uint32) builder->catchange.xcnt, (uint32) surviving_xids, + builder->xmin, builder->xmax); + builder->catchange.xcnt = surviving_xids; + } } /* |