1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
/*-------------------------------------------------------------------------
* logical.h
* PostgreSQL logical decoding coordination
*
* Copyright (c) 2012-2023, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef LOGICAL_H
#define LOGICAL_H
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "replication/output_plugin.h"
#include "replication/slot.h"
struct LogicalDecodingContext;
typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
XLogRecPtr Ptr,
TransactionId xid,
bool last_write
);
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
XLogRecPtr Ptr,
TransactionId xid,
bool skipped_xact
);
typedef struct LogicalDecodingContext
{
/* memory context this is all allocated in */
MemoryContext context;
/* The associated replication slot */
ReplicationSlot *slot;
/* infrastructure pieces for decoding */
XLogReaderState *reader;
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
/*
* Marks the logical decoding context as fast forward decoding one. Such a
* context does not have plugin loaded so most of the following properties
* are unused.
*/
bool fast_forward;
OutputPluginCallbacks callbacks;
OutputPluginOptions options;
/*
* User specified options
*/
List *output_plugin_options;
/*
* User-Provided callback for writing/streaming out data.
*/
LogicalOutputPluginWriterPrepareWrite prepare_write;
LogicalOutputPluginWriterWrite write;
LogicalOutputPluginWriterUpdateProgress update_progress;
/*
* Output buffer.
*/
StringInfo out;
/*
* Private data pointer of the output plugin.
*/
void *output_plugin_private;
/*
* Private data pointer for the data writer.
*/
void *output_writer_private;
/*
* Does the output plugin support streaming, and is it enabled?
*/
bool streaming;
/*
* Does the output plugin support two-phase decoding, and is it enabled?
*/
bool twophase;
/*
* Is two-phase option given by output plugin?
*
* This flag indicates that the plugin passed in the two-phase option as
* part of the START_STREAMING command. We can't rely solely on the
* twophase flag which only tells whether the plugin provided all the
* necessary two-phase callbacks.
*/
bool twophase_opt_given;
/*
* State for writing output.
*/
bool accept_writes;
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
/* Are we processing the end LSN of a transaction? */
bool end_xact;
} LogicalDecodingContext;
extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
extern void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn,
TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
#endif
|