diff options
Diffstat (limited to 'doc/src/sgml/logicaldecoding.sgml')
-rw-r--r-- | doc/src/sgml/logicaldecoding.sgml | 560 |
1 files changed, 560 insertions, 0 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml new file mode 100644 index 00000000000..eabdd5f592e --- /dev/null +++ b/doc/src/sgml/logicaldecoding.sgml @@ -0,0 +1,560 @@ +<!-- doc/src/sgml/logicaldecoding.sgml --> + <chapter id="logicaldecoding"> + <title>Logical Decoding</title> + <indexterm zone="logicaldecoding"> + <primary>Logical Decoding</primary> + </indexterm> + <para> + PostgreSQL provides infrastructure to stream the modifications performed + via SQL to external consumers. This functionality can be used to for a + variety of purposes, including replication solutions and auditing. + </para> + + <para> + Changes are sent out in streams identified by logical replication slots. + Each stream outputs each change exactly once. + </para> + + <para> + The format in which those changes are streamed is determined by the output + plugin used. An example plugin is provided, and additional plugins can be + written to extend the choice of available formats without modifying any + core code. + Every output plugin has access to each individual new row produced + by <command>INSERT</command> and the new row version created + by <command>UPDATE</command>. Availability of old row versions for + <command>UPDATE</command> and delete <command>DELETE</command> depends on + the configured + <link linkend="SQL-CREATETABLE-REPLICA-IDENTITY"><literal>REPLICA + IDENTITY</literal></link>. + </para> + + <para> + Changes can be consumed either using the streaming replication protocol + (see <xref linkend="protocol-replication"> and + <xref linkend="logicaldecoding-walsender">), or by calling functions + via SQL (see <xref linkend="logicaldecoding-sql">). It is also possible + to write additional methods of consuming the output of a replication slot + without modifying core code + (see <xref linkend="logicaldecoding-writer">). + </para> + + <sect1 id="logicaldecoding-example"> + <title>Logical Decoding Example</title> + <para> + The following example demonstartes the SQL interface. + </para> + <para> + Before you can use logical decoding, you must set + <xref linkend="guc-wal-level"> to logical and + <xref linkend="guc-max-replication-slots"> ot at least 1. + Then, you should connect to the target database (in the example + below, <literal>postgres</literal>) as a superuser. + </para> + <programlisting> +postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding' +postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + slotname | xlog_position +-----------------+--------------- + regression_slot | 0/16B1970 +(1 row) + +postgres=# SELECT * FROM pg_replication_slots; + slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn +-----------------+---------------+-----------+--------+----------+--------+--------+--------------+------------- + regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408 +(1 row) + +postgres=# -- There are no changes to see yet +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +----------+-----+------ +(0 rows) + +postgres=# CREATE TABLE data(id serial primary key, data text); +CREATE TABLE + +postgres=# -- DDL isn't replicated, so all you'll see is the transaction +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+------------ + 0/16D5D48 | 688 | BEGIN 688 + 0/16E0380 | 688 | COMMIT 688 +(2 rows) + +postgres=# -- Once changes are read, they're consumed and not emitted +postgres=# -- in a subsequent call: +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +----------+-----+------ +(0 rows) + +postgres=# BEGIN; +postgres=# INSERT INTO data(data) VALUES('1'); +postgres=# INSERT INTO data(data) VALUES('2'); +postgres=# COMMIT; + +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E0478 | 689 | BEGIN 689 + 0/16E0478 | 689 | table public.data: INSERT: id[int4]:1 data[text]:'1' + 0/16E0580 | 689 | table public.data: INSERT: id[int4]:2 data[text]:'2' + 0/16E0650 | 689 | COMMIT 689 +(4 rows) + +postgres=# INSERT INTO data(data) VALUES('3'); + +postgres=# -- You can also peek ahead in the change stream without consuming changes +postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E09C0 | 690 | BEGIN 690 + 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3' + 0/16E0B90 | 690 | COMMIT 690 +(3 rows) + +postgres=# -- You can also peek ahead in the change stream without consuming changes +postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E09C0 | 690 | BEGIN 690 + 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3' + 0/16E0B90 | 690 | COMMIT 690 +(3 rows) + +postgres=# -- options can be passed to output plugin, to influence the formatting +postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); + location | xid | data +-----------+-----+----------------------------------------------- + 0/16E09C0 | 690 | BEGIN 690 + 0/16E09C0 | 690 | table public.data: INSERT: id[int4]:3 data[text]:'3' + 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01) +(3 rows) + +postgres=# -- Remember to destroy a slot you no longer need to stop it consuming +postgres=# -- server resources: +postgres=# SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +----------------------- + +(1 row) + </programlisting> + <para> + The following example shows usage of the walsender interface using + the <link linkend="app-pgrecvlogical"><command>pg_recvlogical</command></link> + shell command. It requires the replication configurations to be allowed + (see <xref linkend="streaming-replication-authentication">) + and <varname>max_wal_senders</varname> to be set sufficiently high for + another connection. + </para> + <programlisting> +# pg_recvlogical -d testdb --slot test --create +# pg_recvlogical -d testdb --slot test --start -f - +CTRL-Z +# psql -c "INSERT INTO data(data) VALUES('4');" +# fg +BEGIN 693 +table public.data: INSERT: id[int4]:4 data[text]:'4' +COMMIT 693 +CTRL-C +# pg_recvlogical -d testdb --slot test --drop + </programlisting> + </sect1> + <sect1 id="logicaldecoding-explanation"> + <title>Logical Decoding Concepts</title> + <sect2> + <indexterm> + <primary>Logical Decoding</primary> + </indexterm> + <title>Logical Decoding</title> + <para> + Logical decoding is the the process of extracting all persistent changes + to a database's tables into a coherent, easy to understand format which + can be interpreted without detailed knowledge of the database's internal + state. + </para> + <para> + In <productname>PostgreSQL</productname>, logical decoding is implemented + by decoding the contents of the <link linkend="wal">write-ahead + log</link>, which describe changes on a storage level, into an + application-specific form such as a stream of tuples or SQL statements. + </para> + </sect2> + + <sect2> + <indexterm> + <primary>Logical Replication Slot</primary> + </indexterm> + <indexterm> + <primary>Replication Slot</primary> + </indexterm> + <title>Replication Slots</title> + <para> + In the context of logical replication, a slot represents a stream of + changes which can be replayed to a client in the order they were made on + the origin server. Each slot streams a sequence of changes from a single + database, sending each change exactly once (except when peeking forward + in the stream). + </para> + <note> + <para>PostgreSQL also has streaming replication slots + (see <xref linkend="streaming-replication">), but they are used somewhat + differently there. + </para> + </note> + <para> + Replication slots have an identifier which is unique across all databases + in a <productname>PostgreSQL</productname> cluster. Slots persist + independently of the connection using them and are crash-safe. + </para> + <para> + Multiple independent slots may exist for a single database. Each slot has + its own state, allowing different consumers to receive changes from + different points in the database change stream. For most applications, a + separate slot will be required for each consumer. + </para> + <para> + A logical replication slot knows nothing about the state of the + receiver(s). It's even possible to have multiple different receivers using + the same slot at different times; they'll just get the changes following + on from when the last receiver stopped consuming them. Only one receiver + may consume changes from a slot at any given time. + </para> + <note> + <para> + Replication slots persist across crashes and know nothing about the state + of their consumer(s). They will prevent removal of required resources + even when there is no connection using them. This consumes storage + because neither required WAL nor required rows from the system catalogs + can be removed by VACUUM as long as they are required by a replication + slot, so if a slot is no longer required it should be dropped. + </para> + </note> + </sect2> + <sect2> + <title>Output Plugins</title> + <para> + Output plugins transform the data from the write-ahead log's internal + representation into the format the consumer of a replication slot desires. + </para> + </sect2> + <sect2> + <title>Exported Snapshots</title> + <para> + When a new replication slot is created using the walsender interface a + snapshot is exported + (see <xref linkend="functions-snapshot-synchronization">) which will show + exactly the state of the database after which all changes will be + included in the change stream. This can be used to create a new replica by + using <link linkend="sql-set-transaction"><literal>SET TRANSACTION + SNAPSHOT</literal></link> to read the state of the database at the moment + the slot was created. This transaction can then be used to dump the + database's state at that point in time which afterwards can be updated + using the slot's contents without loosing any changes. + </para> + </sect2> + </sect1> + <sect1 id="logicaldecoding-walsender"> + <title>Streaming Replication Protocol Interface</title> + <para> + The <literal>CREATE_REPLICATION_SLOT SLOT slotname LOGICAL + options</literal>, <literal>DROP_REPLICATION_SLOT SLOT slotname</literal> + and <literal>START_REPLICATION SLOT slotname LOGICAL options</literal> + commands can be used to create, drop and stream changes from a replication + slot respectively. These commands are only available over a replication + connection; they cannot be used via SQL. + See <xref linkend="protocol-replication">. + </para> + <para> + The <command>pg_recvlogical</command> command + (see <xref linkend="app-pgrecvlogical">) can be used to control logical + decoding over a walsender connection. + </para> + </sect1> + <sect1 id="logicaldecoding-sql"> + <title>Logical Decoding <acronym>SQL</acronym> Interface</title> + <para> + See <xref linkend="functions-replication"> for detailed documentation on + the SQL-level API for interacting with logical decoding. + </para> + <para> + Synchronous replication (see <xref linkend="synchronous-replication">) is + only supported on replication slots used over the walsender interface. The + function interface and additional, non-core interfaces do not support + synchronous replication. + </para> + </sect1> + <sect1 id="logicaldecoding-catalogs"> + <title>System catalogs related to logical decoding</title> + <para> + The <link linkend="catalog-pg-replication-slots"><structname>pg_replication_slots</structname></link> + view and the + <link linkend="monitoring-stats-views-table"><structname>pg_stat_replication</structname></link> + view provide information about the current state of replication slots and + walsender connections respectively. These views apply to both physical and + logical replication. + </para> + </sect1> + <sect1 id="logicaldecoding-output-plugin"> + <title>Logical Decoding Output Plugins</title> + <para> + An example output plugin can be found in the + <link linkend="test-decoding"> + <filename>contrib/test_decoding</filename> + </link> + subdirectory of the PostgreSQL source tree. + </para> + <sect2 id="logicaldecoding-output-init"> + <title>Initialization Function</title> + <indexterm zone="logicaldecoding"> + <primary>_PG_output_plugin_init</primary> + </indexterm> + <para> + An output plugin is loaded by dynamically loading a shared library with + the output plugin's name as the library basename. The normal library + search path is used to locate the library. To provide the required output + plugin callbacks and to indicate that the library is actually an output + plugin it needs to provide a function named + <function>_PG_output_plugin_init</function>. This function is passed a + struct that needs to be filled with the callback function pointers for + individual actions. + <programlisting> +typedef struct OutputPluginCallbacks +{ + LogicalDecodeStartupCB startup_cb; + LogicalDecodeBeginCB begin_cb; + LogicalDecodeChangeCB change_cb; + LogicalDecodeCommitCB commit_cb; + LogicalDecodeShutdownCB shutdown_cb; +} OutputPluginCallbacks; +typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb); + </programlisting> + The <function>begin_cb</function>, <function>change_cb</function> + and <function>commit_cb</function> callbacks are required, + while <function>startup_cb</function> + and <function>shutdown_cb</function> are optional. + </para> + </sect2> + + <sect2 id="logicaldecoding-capabilities"> + <title>Capabilities</title> + <para> + To decode, format and output changes, output plugins can use most of the + backend's normal infrastructure, including calling output functions. Read + only access to relations is permitted as long as only relations are + accessed that either have been created by <command>initdb</command> in + the <literal>pg_catalog</literal> schema, or have are marked as user + provided catalog tables using + <programlisting> +ALTER TABLE user_catalog_table SET (user_catalog_table = true); +CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); + </programlisting> + Any actions leading to xid assignment are prohibited. That, among others, + includes writing to tables, performing DDL changes and + calling <literal>txid_current()</literal>. + </para> + </sect2> + + <sect2 id="logicaldecoding-output-plugin-callbacks"> + <title>Output Plugin Callbacks</title> + <para> + An output plugin gets notified about changes that are happening via + various callbacks it needs to provide. + </para> + <para> + Concurrent transactions are decoded in commit order and only changes + belonging to a specific transaction are decoded inbetween + the <literal>begin</literal> and <literal>commit</literal> + callbacks. Transactions that were rolled back explicitly or implicitly + never get + decoded. Successfull <link linkend="SQL-SAVEPOINT">SAVEPOINTs</link> are + folded into the transaction containing them in the order they were + exectuded within that transaction. + </para> + <note> + <para> + Only transactions that have already safely been flushed to disk will be + decoded. That can lead to a COMMIT not immediately being decoded in a + directly following <literal>pg_logical_slot_get_changes()</literal> + when <varname>synchronous_commit</varname> is set + to <literal>off</literal>. + </para> + </note> + <sect3 id="logicaldecoding-output-plugin-startup"> + <title>Startup Callback</title> + <para> + The optional <function>startup_cb</function> callback is called whenever + an replication slot is created or asked to stream changes, independent + of the number of changes that are ready to be put out. + <programlisting> +typedef void (*LogicalDecodeStartupCB) ( + struct LogicalDecodingContext *ctx, + OutputPluginOptions *options, + bool is_init +); + </programlisting> + The <literal>is_init</literal> paramter will be true when the + replication slot is being created and false + otherwise. <parameter>options</parameter> points to a struct of options + that output plugins can set: + <programlisting> +typedef struct OutputPluginOptions +{ + OutputPluginOutputType output_type; +} OutputPluginOptions; + </programlisting> + <literal>output_type</literal> has to either be set to + <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> + or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. + </para> + <para> + The startup callback should validate the options present in + <literal>ctx->output_plugin_options</literal>. If the output plugin + needs to have a state, it can + use <literal>ctx->output_plugin_private</literal> to store it. + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-shutdown"> + <title>Shutdown Callback</title> + <para> + The optional <function>shutdown_cb</function> callback is called + whenever a formerly active replication slot is not used anymore and can + be used to deallocate resources private to the output plugin. The slot + isn't necessarily being dropped, streaming is just being stopped. + <programlisting> +typedef void (*LogicalDecodeShutdownCB) ( + struct LogicalDecodingContext *ctx +); + </programlisting> + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-begin"> + <title>Transaction Begin Callback</title> + <para> + The required <function>begin_cb</function> callback is called whenever a + start of a commited transaction has been decoded. Aborted transactions + and their contents never get decoded. + <programlisting> +typedef void (*LogicalDecodeBeginCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn +); + </programlisting> + The <parameter>txn</parameter> parameter contains meta information about + the transaction, like the timestamp at which it has been committed and + its XID. + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-commit"> + <title>Transaction End Callback</title> + <para> + The required <function>commit_cb</function> callback is called whenever + a transaction commit has been + decoded. The <function>change_cb</function> callbacks for all modified + rows will have been called before this, if there have been any modified + rows. + <programlisting> +typedef void (*LogicalDecodeCommitCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn +); + </programlisting> + </para> + </sect3> + <sect3 id="logicaldecoding-output-plugin-change"> + <title>Callback called for each individual change in a + transaction</title> + <para> + The required <function>change_cb</function> callback is called for every + individual row modification inside a transaction, may it be + an <command>INSERT</command>, <command>UPDATE</command> + or <command>DELETE</command>. Even if the original command modified + several rows at once the callback will be called indvidually for each + row. + <programlisting> +typedef void (*LogicalDecodeChangeCB) ( + struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change +); + </programlisting> + The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters + have the same contents as for the <function>begin_cb</function> + and <function>commit_cb</function> callbacks, but additionally the + relation descriptor <parameter>relation</parameter> points to the + relation the row belongs to and a struct + <parameter>change</parameter> describing the row modification are passed + in. + </para> + <note> + <para> + Only changes in user defined tables that are not unlogged + (see <xref linkend="SQL-CREATETABLE-UNLOGGED">) and not temporary + (see <xref linkend="SQL-CREATETABLE-TEMPORARY">) can be extracted using + logical decoding. + </para> + </note> + </sect3> + </sect2> + <sect2 id="logicaldecoding-output-plugin-output"> + <title>Functions for producing output from an output plugin</title> + <para> + To actually produce output, output plugins can write data to + the <literal>StringInfo</literal> output buffer + in <literal>ctx->out</literal> when inside + the <function>begin_cb</function>, <function>commit_cb</function> + or <function>change_cb</function> callbacks. Before writing to the output + buffer <function>OutputPluginPrepareWrite(ctx, last_write)</function> has + to be called, and after finishing writing to the + buffer <function>OutputPluginWrite(ctx, last_write)</function> has to be + called to perform the write. The <parameter>last_write</parameter> + indicates whether a particular write was the callback's last write. + </para> + <para> + The following example shows how to output data to the consumer of an + output plugin: + <programlisting> +OutputPluginPrepareWrite(ctx, true); +appendStringInfo(ctx->out, "BEGIN %u", txn->xid); +OutputPluginWrite(ctx, true); + </programlisting> + </para> + </sect2> + </sect1> + <sect1 id="logicaldecoding-writer"> + <title>Logical Decoding Output Writers</title> + <para> + It is possible to add more output methods for logical decoding. + For details, see + <filename>src/backend/replication/logical/logicalfuncs.c</filename>. + Essentially, three functions need to be provided: one to read WAL, one to + prepare writing output, and one to write the output + (see <xref linkend="logicaldecoding-output-plugin-output">). + </para> + </sect1> + <sect1 id="logicaldecoding-synchronous"> + <title>Synchronous replication support for Logical Decoding</title> + <para> + Logical decoding may be used to to build + <link linkend="synchronous-replication">synchronous + replication</link> solutions with the same user interface as synchronous + replication for <link linkend="streaming-replication">streaming + replication</link>. To do this, the walsender interface + (see <xref linkend="logicaldecoding-walsender">) must be used to stream out + data. Clients have to send <literal>Standby status update (F)</literal> + (see <xref linkend="protocol-replication">) messages, just like streaming + replication clients do. + </para> + <note> + <para> + A synchronous replica receiving changes via logical decoding will work in + the scope of a single database. Since, in contrast to + that, <parameter>synchronous_standby_names</parameter> currently is + server wide, this means this technique will not work properly if more + than one database is actively used. + </para> + </note> + </sect1> + </chapter> |