aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logicalfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logicalfuncs.c')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c82
1 files changed, 40 insertions, 42 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 012987a9727..c7f1597f96e 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -276,25 +276,31 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
static Datum
pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
{
- Name name = PG_GETARG_NAME(0);
+ Name name;
XLogRecPtr upto_lsn;
int32 upto_nchanges;
-
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
-
XLogRecPtr end_of_wal;
XLogRecPtr startptr;
-
LogicalDecodingContext *ctx;
-
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
Size ndim;
List *options = NIL;
DecodingOutputState *p;
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ if (PG_ARGISNULL(0))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("slot name must not be null")));
+ name = PG_GETARG_NAME(0);
+
if (PG_ARGISNULL(1))
upto_lsn = InvalidXLogRecPtr;
else
@@ -305,6 +311,12 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
upto_nchanges = PG_GETARG_INT32(2);
+ if (PG_ARGISNULL(3))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("options array must not be null")));
+ arr = PG_GETARG_ARRAYTYPE_P(3);
+
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
@@ -324,16 +336,11 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
- check_permissions();
-
- CheckLogicalDecodingRequirements();
-
- arr = PG_GETARG_ARRAYTYPE_P(3);
- ndim = ARR_NDIM(arr);
-
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
+ /* Deconstruct options array */
+ ndim = ARR_NDIM(arr);
if (ndim > 1)
{
ereport(ERROR,
@@ -382,7 +389,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
end_of_wal = GetXLogReplayRecPtr(NULL);
- CheckLogicalDecodingRequirements();
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
@@ -444,6 +450,23 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
break;
CHECK_FOR_INTERRUPTS();
}
+
+ tuplestore_donestoring(tupstore);
+
+ CurrentResourceOwner = old_resowner;
+
+ /*
+ * Next time, start where we left off. (Hunting things, the family
+ * business..)
+ */
+ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
+ LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
+
+ /* free context, call shutdown callback */
+ FreeDecodingContext(ctx);
+
+ ReplicationSlotRelease();
+ InvalidateSystemCaches();
}
PG_CATCH();
{
@@ -454,23 +477,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
}
PG_END_TRY();
- tuplestore_donestoring(tupstore);
-
- CurrentResourceOwner = old_resowner;
-
- /*
- * Next time, start where we left off. (Hunting things, the family
- * business..)
- */
- if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
- LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
-
- /* free context, call shutdown callback */
- FreeDecodingContext(ctx);
-
- ReplicationSlotRelease();
- InvalidateSystemCaches();
-
return (Datum) 0;
}
@@ -480,9 +486,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
Datum
pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, true, false);
}
/*
@@ -491,9 +495,7 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
Datum
pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, false, false);
}
/*
@@ -502,9 +504,7 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
Datum
pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, true, true);
}
/*
@@ -513,7 +513,5 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
Datum
pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}