diff options
Diffstat (limited to 'src/backend/replication/logical/logicalfuncs.c')
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 82 |
1 files changed, 40 insertions, 42 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 012987a9727..c7f1597f96e 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -276,25 +276,31 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, static Datum pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) { - Name name = PG_GETARG_NAME(0); + Name name; XLogRecPtr upto_lsn; int32 upto_nchanges; - ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; MemoryContext per_query_ctx; MemoryContext oldcontext; - XLogRecPtr end_of_wal; XLogRecPtr startptr; - LogicalDecodingContext *ctx; - ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; Size ndim; List *options = NIL; DecodingOutputState *p; + check_permissions(); + + CheckLogicalDecodingRequirements(); + + if (PG_ARGISNULL(0)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("slot name must not be null"))); + name = PG_GETARG_NAME(0); + if (PG_ARGISNULL(1)) upto_lsn = InvalidXLogRecPtr; else @@ -305,6 +311,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else upto_nchanges = PG_GETARG_INT32(2); + if (PG_ARGISNULL(3)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("options array must not be null"))); + arr = PG_GETARG_ARRAYTYPE_P(3); + /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, @@ -324,16 +336,11 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - check_permissions(); - - CheckLogicalDecodingRequirements(); - - arr = PG_GETARG_ARRAYTYPE_P(3); - ndim = ARR_NDIM(arr); - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); + /* Deconstruct options array */ + ndim = ARR_NDIM(arr); if (ndim > 1) { ereport(ERROR, @@ -382,7 +389,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - CheckLogicalDecodingRequirements(); ReplicationSlotAcquire(NameStr(*name)); PG_TRY(); @@ -444,6 +450,23 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin break; CHECK_FOR_INTERRUPTS(); } + + tuplestore_donestoring(tupstore); + + CurrentResourceOwner = old_resowner; + + /* + * Next time, start where we left off. (Hunting things, the family + * business..) + */ + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) + LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + ReplicationSlotRelease(); + InvalidateSystemCaches(); } PG_CATCH(); { @@ -454,23 +477,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin } PG_END_TRY(); - tuplestore_donestoring(tupstore); - - CurrentResourceOwner = old_resowner; - - /* - * Next time, start where we left off. (Hunting things, the family - * business..) - */ - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) - LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); - - /* free context, call shutdown callback */ - FreeDecodingContext(ctx); - - ReplicationSlotRelease(); - InvalidateSystemCaches(); - return (Datum) 0; } @@ -480,9 +486,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, true, false); } /* @@ -491,9 +495,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, false, false); } /* @@ -502,9 +504,7 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, true, true); } /* @@ -513,7 +513,5 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true); - - return ret; + return pg_logical_slot_get_changes_guts(fcinfo, false, true); } |