49.6. ロジカルデコーディングの出力プラグイン #

PostgreSQLのソースコードのサブディレクトリ contrib/test_decoding にサンプル出力プラグインがあります。

49.6.1. 初期化関数 #

出力プラグインは、出力プラグインの名前をライブラリのベース名として持つ共有ライブラリを動的にロードすることによってロードされます。 必要な出力プラグインコールバックを提供し、そのライブラリが実際に出力プラグインであることを示すために、_PG_output_plugin_initという名前の関数を作成しなければなりません。 この関数には、各々のアクションに対応するコールバック関数へのポインタを持つ構造体が渡されます。

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeTruncateCB truncate_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeShutdownCB shutdown_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodeBeginPrepareCB begin_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
    LogicalDecodeStreamStartCB stream_start_cb;
    LogicalDecodeStreamStopCB stream_stop_cb;
    LogicalDecodeStreamAbortCB stream_abort_cb;
    LogicalDecodeStreamPrepareCB stream_prepare_cb;
    LogicalDecodeStreamCommitCB stream_commit_cb;
    LogicalDecodeStreamChangeCB stream_change_cb;
    LogicalDecodeStreamMessageCB stream_message_cb;
    LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

《マッチ度[]》コールバック関数のbegin_cbchange_cb、および、commit_cbは必須ですが、startup_cbfilter_by_origin_cbtruncate_cb、および、shutdown_cbは必須ではありません。 truncate_cbが設定されていないけれども、TRUNCATEがデコードされることになった場合、この動作は無視されます。 《機械翻訳》begin_cbchange_cbのコールバックは必須ですが、startup_cbtruncate_cbmessage_cbfilter_by_origin_cbshutdown_cbはオプションです。 truncate_cbが設定されていないが、TRUNCATEをデコードする場合、アクションは無視されます。

《マッチ度[71.157495]》出力プラグインは、大きな継続中(in-progress)トランザクションのストリーミングをサポートする関数を定義することもできます。 stream_start_cbstream_stop_cbstream_abort_cbstream_commit_cbstream_change_cbstream_prepare_cbは必須ですが、stream_message_cbstream_truncate_cbは必須ではありません。 《機械翻訳》出力プラグインは、大きな進行中のトランザクションのストリーミングをサポートする関数も定義できます。 stream_start_cbstream_stop_cbstream_abort_cbstream_commit_cbstream_change_cbは必須ですが、stream_message_cbstream_truncate_cbはオプションです。 出力プラグインが2フェーズコミットもサポートする場合は、stream_prepare_cbも必要です。

《マッチ度[65.129151]》出力プラグインは、PREPARE TRANSACTIONでアクションをデコードできるようにする2相コミットをサポートする関数を定義することもできます。 begin_prepare_cbprepare_cbstream_prepare_cbcommit_prepared_cbrollback_prepared_cbコールバックは必須ですが、filter_prepare_cbは必須ではありません。 《機械翻訳》出力プラグインは、2フェーズコミットをサポートする関数を定義することもできます。 これにより、PREPARE TRANSACTIONでアクションをデコードできます。 begin_prepare_cbprepare_cbcommit_prepared_cbrollback_prepared_cbコールバックが必須です。 また、filter_prepare_cbもオプションです。 出力プラグインが大きな進行中のトランザクションのストリーミングもサポートしている場合は、stream_prepare_cbも必須です。

49.6.2. 機能 #

更新データをデコード、整形、出力するために、出力関数を呼び出すことを含め、出力プラグインはバックエンドの通常のインフラストラクチャのほとんどを利用できます。 テーブルは、initdbで作られ、pg_catalogスキーマに含まれているか、以下のコマンドでユーザ定義のカタログテーブルであると印が付けられている限り、読み込み専用のアクセスが許可されます。

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

出力プラグイン内のユーザカタログテーブルまたは通常のシステムカタログテーブルへのアクセスは、systable_*スキャンAPIを介してのみ行う必要があることに注意してください。 heap_*スキャンAPIを介したアクセスはエラーになります。 さらに、トランザクションIDの割り当てにつながるアクションは禁止されています。 これには、テーブルへの書き込み、DDL変更の実行、pg_current_xact_id()の呼び出しなどが含まれます。

49.6.3. 出力モード #

出力プラグインコールバックは、かなり自由な形式で消費者にデータを渡すことができます。 SQLで変更データを見るような場合、任意のかたちでデータを返すことのできるデータ型(たとえばbytea)は扱いにくいです。 出力プラグインがサーバエンコーディングのテキストデータのみを含むことにするには、起動コールバックで、OutputPluginOptions.output_typeOUTPUT_PLUGIN_BINARY_OUTPUTではなく、OUTPUT_PLUGIN_TEXTUAL_OUTPUTを設定することによって宣言できます。 この場合、textdatumが格納することができるように、すべてのデータはサーバエンコーディングでエンコードされていなければなりません。

49.6.4. 出力プラグインコールバック #

出力プラグインには、必要に応じて発生した更新に関する通知が様々なコールバックを通じて送られます。

同時に実行されたトランザクションは、コミットした順番にデコードされます。 指定したトランザクションに含まれる更新だけがbegincommitの間のコールバックによってデコードされます。 明示的あるいは暗黙的にロールバックされたトランザクションは、決してデコードされません。 成功したセーブポイントは、実行された順番にセーブポイントが実行されたトランザクションの中に折り込まれます。 PREPARE TRANSACTIONを使用して2相コミット用に準備されたトランザクションも、デコードに必要な出力プラグインコールバックが提供されていればデコードされます。 ROLLBACK PREPAREDコマンドを使用して、現在準備されているトランザクションが同時にアボートされる可能性があります。 その場合、このトランザクションのロジカルデコーディングもアボートされます。 そのようなトランザクションのすべての変更は、アボートが検出され、prepare_cbコールバックが呼び出されるとスキップされます。 このように、同時にアボートされた場合でも、デコードされたROLLBACK PREPAREDを適切に処理するために十分な情報が出力プラグインに提供されます。

注記

ディスクに安全にフラッシュされたトランザクションだけがデコードされます。 そのため、synchronous_commitoffの場合には、直後に呼び出されたpg_logical_slot_get_changes()がそのCOMMITをデコードしないことがあります。

49.6.4.1. 開始コールバック #

ストリームに投入可能な更新の数に関係なく、レプリケーションスロットが作られるか、ストリームの変更がリクエストされた場合にオプションのstartup_cbコールバック呼び出されます。

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

is_init パラメータは、レプリケーションスロットが作られる際にはtrue、それ以外ではfalseになります。 optionsは、出力プラグインが書き込む以下の構造体を指します。

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
    bool        receive_rewrites;
} OutputPluginOptions;

output_typeOUTPUT_PLUGIN_TEXTUAL_OUTPUTOUTPUT_PLUGIN_BINARY_OUTPUTのどちらかです。 49.6.3も参照してください。 receive_rewritesが真なら、何らかDDL操作時のヒープ書き換えで生じた変更に対して、出力プラグインも呼ばれます。 これはDDLレプリケーションを処理するプラグインを対象としていますが、これらは特別な処理を必要とします。

開始コールバックでは、ctx->output_plugin_optionsで指定されるオプションを検証しましょう。 出力プラグインが状態を持つ必要がある場合には、ctx->output_plugin_privateを利用できます。

49.6.4.2. 終了コールバック #

以前アクティブだったレプリケーションスロットが使われなくなったら、いつでもshutdown_cbコールバックが呼び出され、出力プラグインのプライベートリソースが解放されます。 スロットは削除される必要はありません。単にストリームが停止します。

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

49.6.4.3. トランザクション開始コールバック #

必須であるbegin_cbコールバックは、コミットしたトランザクションの開始がデコードされる際に必ず呼び出されます。 アボートしたトランザクションとその内容は決してデコードされません。

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

txn引数は、コミット時のタイムスタンプやトランザクションIDなどのトランザクションに関するメタ情報を含みます。

49.6.4.4. トランザクション終了コールバック #

必須であるcommit_cbコールバックは、トランザクションのコミットがデコードされる際に必ず呼び出されます。 行が更新された場合は、それぞれの行に対してchange_cbコールバックが、commit_cbの前に呼び出されます。

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

49.6.4.5. 更新コールバック #

トランザクション内のINSERTUPDATEDELETEの更新に対して、必須コールバックであるchange_cbが呼び出されます。 元の更新コマンドが複数の行を一度に更新する場合は、それぞれの行に対してこのコールバックが呼び出されます。 change_cbコールバックは、システムまたはユーザカタログテーブルにアクセスして、行変更の詳細を出力する処理を支援することができます。 準備された(まだコミットされていない)トランザクションをデコードする場合、またはコミットされていないトランザクションをデコードする場合、この変更コールバックは、まったく同じトランザクションが同時にロールバックされるためにエラーになることもあります。 この場合、このアボートされたトランザクションのロジカルデコーディングは正常に停止されます。

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

ctxtxnは、begin_cbcommit_cbコールバックでは同じ内容になります。 これに加えてrelationは行が属するリレーションを指定し、行の変更を記述するchangeパラメータが渡されます。

注記

unloggedテーブル(UNLOGGED参照)と(TEMPORARYまたはTEMP参照)以外のユーザ定義テーブルだけが、ロジカルデコーディングを使って更新データを取得できます。

49.6.4.6. TRUNCATEコールバック #

《マッチ度[]》truncate_cbコールバックは、TRUNCATEコマンドに対して呼ばれます。

typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
                                         ReorderBufferTXN *txn,
                                         int nrelations,
                                         Relation relations[],
                                         ReorderBufferChange *change);

パラメータはchange_cbコールバックと似ています。 しかしながら、外部キーで結びついたテーブル群のTRUNCATE動作は一緒に実行される必要があるため、このコールバックは単一リレーションではなく、リレーションの配列を受け取ります。 詳しくはTRUNCATE文の説明を参照してください。

49.6.4.7. オリジンフィルターコールバック #

オプションのfilter_by_origin_cbコールバックは、origin_idからリプレイされたデータがアウトプットプラグインの対象となるかどうかを判定するために呼び出されます。

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

ctxパラメータは、他のコールバックと同じ内容を持ちます。 オリジンの情報だけが得られます。 渡されたノードで発生した変更が無関係であることを伝えるには、trueを返します。 これにより、その変更は無視されることになります。 無視されたトランザクション変更に関わる他のコールバックは呼び出されません。

これは、カスケード、あるいは双方向レプリケーションソリューションを実装する際に有用です。 オリジンでフィルターすることにより、そのような構成で、同じ変更のレプリケーションが往復するのを防ぐことができます。 トランザクションや変更もオリジンに関する情報を持っていますが、このコールバックでフィルターするほうがずっと効率的です。

49.6.4.8. 汎用メッセージコールバック #

オプションのmessage_cbコールバックは、ロジカルデコーディングメッセージがデコードされる度に呼び出されます。

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

txnパラメータは、コミット時のタイムスタンプとXIDのような、トランザクションに関するメタ情報を含んでいます。 ただし、そのメッセージがトランザクション扱いではなく、メッセージをログしたトランザクションにXIDが割り当てられてない場合はNULLになることに注意してください。 lsnは、メッセージに対応するWALの位置です。 transactionalは、メッセージがトランザクションとして送られたものかどうかを表しています。 変更コールバックと同様に、準備された(まだコミットされていない)トランザクションをデコードする場合、またはコミットされていないトランザクションをデコードする場合、このメッセージコールバックも、まったく同じトランザクションの同時ロールバックのためにエラーになることがあります。 この場合、アボートされたトランザクションのロジカルデコーディングは正常に停止されます。 prefixはnull終端された任意の接頭辞で、現在のプラグインが興味のあるメッセージを特定するために利用できます。 最後に、messageパラメータは、大きさがmessage_sizeの、実際のメッセージを保持します。

出力プラグインが利用を考慮している接頭辞が一意になるように、特に注意を払ってください。 拡張の名前か、出力プラグインの名前を使うのが良い場合が多いです。

49.6.4.9. フィルタコールバックの準備 #

オプションのfilter_prepare_cbコールバックは、現在の2相コミットトランザクションの一部であるデータを、この準備段階でデコードするか、またはCOMMIT PREPARED時に通常の1相トランザクションとしてデコードするかを決定するために呼び出されます。 デコードをスキップするように合図するには、trueを返します。 そうでなければfalseを返します。 コールバックが定義されていない場合、falseが想定されます(すなわち、フィルタリングなしで、2相コミットを使用するすべてのトランザクションも同様に2相でデコードされます)。

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              TransactionId xid,
                                              const char *gid);

ctxパラメータは他のコールバックと同じ内容です。 パラメータxidgidは、トランザクションを識別するための2つの異なる方法を提供します。 後のCOMMIT PREPAREDまたはROLLBACK PREPAREDは両方の識別子を持ち、出力プラグインに何を使用するかの選択を提供します。

このコールバックは、デコードするトランザクションごとに複数回呼び出すことができ、呼び出されるたびにxidgidの与えられたペアに対して同じ静的な答えを提供しなければなりません。

49.6.4.10. トランザクション開始準備コールバック #

必須であるbegin_prepare_cbコールバックは、準備されたトランザクションの開始がデコードされるたびに呼び出されます。 txnパラメータの一部であるgidフィールドをこのコールバックで使用して、プラグインがこのPREPAREを既に受信しているかどうかをチェックできます。 この場合、エラーになるか、トランザクションの残りの変更をスキップできます。

typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn);

49.6.4.11. トランザクション準備コールバック #

必須であるprepare_cbコールバックは、2相コミット用に準備されたトランザクションがデコードされるたびに呼び出されます。 修正された行がある場合、すべての修正された行に対するchange_cbコールバックはこの前に呼び出されています。 txnパラメータの一部であるgidフィールドは、このコールバックで使用できます。

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

49.6.4.12. トランザクションコミット準備コールバック #

必須であるcommit_prepared_cbコールバックは、トランザクションCOMMIT PREPAREDがデコードされるたびに呼び出されます。 txnパラメータの一部であるgidフィールドは、このコールバックで使用できます。

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

49.6.4.13. トランザクションロールバック準備コールバック #

必須であるrollback_prepared_cbコールバックは、トランザクションROLLBACK PREPAREDがデコードされるたびに呼び出されます。 txnパラメータの一部であるgidフィールドは、このコールバックで使用できます。 パラメータprepare_end_lsnprepare_timeは、プラグインがこのPREPARE TRANSACTIONを受信したかどうかをチェックするために使用できます。 この場合、プラグインはロールバックを適用できます。 そうでない場合は、ロールバック操作をスキップできます。 gidだけでは十分ではありません。 なぜなら、下流ノードは同じ識別子を持つ準備されたトランザクションを持つことができるからです。

typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
                                                 ReorderBufferTXN *txn,
                                                 XLogRecPtr prepare_end_lsn,
                                                 TimestampTz prepare_time);

49.6.4.14. ストリーム開始コールバック #

《機械翻訳》必要なLogicalDecodeStreamStartCBコールバックは、進行中のトランザクションからストリーム化された変更ブロックを開くときに呼び出されます。

typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn);

49.6.4.15. ストリーム停止コールバック #

《機械翻訳》必要なstream_stop_cbコールバックは、進行中のトランザクションからのストリーミング変更ブロックを閉じるときに呼び出されます。

typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn);

49.6.4.16. ストリームアボートコールバック #

《機械翻訳》必要なstream_abort_cbコールバックは、以前にストリームされたトランザクションを中止するために呼び出されます。

typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                            ReorderBufferTXN *txn,
                                            XLogRecPtr abort_lsn);

49.6.4.17. ストリーム準備コールバック #

《機械翻訳》stream_prepare_cbコールバックは、2フェーズコミットの一部としてストリーミングされているトランザクションを準備するために呼び出されます。 このコールバックは、出力プラグインが大きな進行中のトランザクションと2フェーズコミットの両方をストリーミングする場合に必要です。

typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr prepare_lsn);

49.6.4.18. ストリームコミットコールバック #

《機械翻訳》必要なstream_commit_cbコールバックは、以前にストリーミングされたトランザクションをコミットするために呼び出されます。

typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             XLogRecPtr commit_lsn);

49.6.4.19. ストリーム変更コールバック #

《マッチ度[60.000000]》stream_change_cbコールバックは、ストリーム化された変更のブロック(stream_start_cbstream_stop_cb呼び出しで区切られます)で変更を送信するときに呼び出されます。 実際の変更は表示されません。 なぜなら、トランザクションは後の時点でアボートする可能性があり、アボートされたトランザクションの変更はデコードされないからです。 《機械翻訳》ストリーム化された変更のブロック内で変更が送信されるときに、必要なstream_change_cbコールバックが呼び出されます(stream_start_cbstream_stop_cbの呼び出しによって区切られます)。 実際の変更は後で表示される可能性があり、中止された変更の変更をデコードすることはできないため、トランザクションは中止されません。

typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
                                             ReorderBufferTXN *txn,
                                             Relation relation,
                                             ReorderBufferChange *change);

49.6.4.20. ストリームメッセージコールバック #

《マッチ度[57.603687]》stream_message_cbコールバックは、ストリーム化された変更のブロック(stream_start_cbstream_stop_cbコールで区切られた)で汎用メッセージを送信するときに呼び出されます。 トランザクションメッセージのメッセージ内容は表示されません。 なぜなら、トランザクションは後の時点でアボートする可能性があり、アボートされたトランザクションの変更はデコードされないからです。 《機械翻訳》オプションのstream_message_cbコールバックは、ブロックされた変更のストリームで一般的なメッセージを送信するときに呼び出されます(stream_start_cbstream_stop_cbの呼び出しによって区切られます)。 トランザクションメッセージのメッセージ内容は、後で中止される可能性があるため、トランザクションが中止されるときに表示されず、中止された変更のメッセージをデコードしません。

typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr message_lsn,
                                              bool transactional,
                                              const char *prefix,
                                              Size message_size,
                                              const char *message);

49.6.4.21. ストリームTRUNCATEコールバック #

《マッチ度[]》stream_truncate_cbコールバックは、ストリーム化された変更のブロック(stream_start_cbstream_stop_cb呼び出しで区切られます)内のTRUNCATEコマンドに対して呼び出されます。

typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               int nrelations,
                                               Relation relations[],
                                               ReorderBufferChange *change);

パラメータはstream_change_cbコールバックに類似しています。 ただし、外部キーで接続されたテーブルに対するTRUNCATEアクションは一緒に実行する必要があるため、このコールバックは単一のリレーションではなくリレーションの配列を受け取ります。 詳細はTRUNCATE文の説明を参照してください。

49.6.5. 出力生成関数 #

begin_cbcommit_cbchange_cbコールバックにおいて、出力プラグインは実際にデータ出力するためにctx->outStringInfo出力バッファに書き込みます。 出力バッファに書き込む前に、OutputPluginPrepareWrite(ctx, last_write)を呼び出します。 また、書き込みバッファにデータを書き終えたら、OutputPluginWrite(ctx, last_write)を呼び出してデータの書き込みを実施します。 last_write引数により、その書き込みがコールバックの最終的な書き込みであるかどうかを指定します。

以下の例では、出力プラグインにおいて消費者に向けてデータを出力する方法を示します。

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);