1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
|
#include <yql/essentials/parser/pg_wrapper/interface/raw_parser.h>
#include "arena_ctx.h"
#include <util/generic/scope.h>
#include <fcntl.h>
#include <stdint.h>
#ifndef WIN32
#include <pthread.h>
#endif
#include <signal.h>
#define TypeName PG_TypeName
#define SortBy PG_SortBy
#define Sort PG_Sort
#define Unique PG_Unique
#undef SIZEOF_SIZE_T
extern "C" {
#include "postgres.h"
#include "access/session.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "mb/pg_wchar.h"
#include "nodes/pg_list.h"
#include "nodes/parsenodes.h"
#include "nodes/value.h"
#include "parser/parser.h"
#include "utils/guc.h"
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/memdebug.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
#include "utils/guc_hooks.h"
#include "port/pg_bitutils.h"
#include "port/pg_crc32c.h"
#include "postmaster/postmaster.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "miscadmin.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "thread_inits.h"
#undef Abs
#undef Min
#undef Max
#undef TypeName
#undef SortBy
#undef LOG
#undef INFO
#undef NOTICE
#undef WARNING
#undef ERROR
#undef FATAL
#undef PANIC
#undef open
#undef fopen
#undef bind
#undef locale_t
}
#include "utils.h"
extern "C" {
extern __thread Latch LocalLatchData;
extern void destroy_timezone_hashtable();
extern void destroy_typecache_hashtable();
extern void free_current_locale_conv();
extern void RE_cleanup_cache();
const char *progname;
#define STDERR_BUFFER_LEN 4096
#define DEBUG
typedef struct {
char* message; // exception message
char* funcname; // source function of exception (e.g. SearchSysCache)
char* filename; // source of exception (e.g. parse.l)
int lineno; // source of exception (e.g. 104)
int cursorpos; // char in query at which exception occurred
char* context; // additional context (optional, can be NULL)
} PgQueryError;
typedef struct {
List *tree;
char* stderr_buffer;
PgQueryError* error;
} PgQueryInternalParsetreeAndError;
PgQueryInternalParsetreeAndError pg_query_raw_parse(const char* input) {
PgQueryInternalParsetreeAndError result = { 0 };
char stderr_buffer[STDERR_BUFFER_LEN + 1] = { 0 };
#ifndef DEBUG
int stderr_global;
int stderr_pipe[2];
#endif
#ifndef DEBUG
// Setup pipe for stderr redirection
if (pipe(stderr_pipe) != 0) {
PgQueryError* error = (PgQueryError*)malloc(sizeof(PgQueryError));
error->message = strdup("Failed to open pipe, too many open file descriptors");
result.error = error;
return result;
}
fcntl(stderr_pipe[0], F_SETFL, fcntl(stderr_pipe[0], F_GETFL) | O_NONBLOCK);
// Redirect stderr to the pipe
stderr_global = dup(STDERR_FILENO);
dup2(stderr_pipe[1], STDERR_FILENO);
close(stderr_pipe[1]);
#endif
PG_TRY();
{
result.tree = raw_parser(input, RAW_PARSE_DEFAULT);
#ifndef DEBUG
// Save stderr for result
read(stderr_pipe[0], stderr_buffer, STDERR_BUFFER_LEN);
#endif
result.stderr_buffer = strdup(stderr_buffer);
}
PG_CATCH();
{
ErrorData* error_data;
PgQueryError* error;
error_data = CopyErrorData();
// Note: This is intentionally malloc so exiting the memory context doesn't free this
error = (PgQueryError*)malloc(sizeof(PgQueryError));
error->message = strdup(error_data->message);
error->filename = strdup(error_data->filename);
error->funcname = strdup(error_data->funcname);
error->context = NULL;
error->lineno = error_data->lineno;
error->cursorpos = error_data->cursorpos;
result.error = error;
FlushErrorState();
}
PG_END_TRY();
#ifndef DEBUG
// Restore stderr, close pipe
dup2(stderr_global, STDERR_FILENO);
close(stderr_pipe[0]);
close(stderr_global);
#endif
return result;
}
void pg_query_free_error(PgQueryError *error) {
free(error->message);
free(error->funcname);
free(error->filename);
if (error->context) {
free(error->context);
}
free(error);
}
}
namespace NYql {
static struct TGlobalInit {
TGlobalInit() {
pg_crc32c crc = 0;
pg_popcount32(0);
pg_popcount64(0);
COMP_CRC32C(crc,"",0);
}
} GlobalInit;
void PGParse(const TString& input, IPGParseEvents& events) {
pg_thread_init();
PgQueryInternalParsetreeAndError parsetree_and_error;
TArenaMemoryContext arena;
auto prevErrorContext = ErrorContext;
ErrorContext = CurrentMemoryContext;
Y_DEFER {
ErrorContext = prevErrorContext;
};
parsetree_and_error = pg_query_raw_parse(input.c_str());
Y_DEFER {
if (parsetree_and_error.error) {
pg_query_free_error(parsetree_and_error.error);
}
free(parsetree_and_error.stderr_buffer);
};
if (parsetree_and_error.error) {
TPosition position(0, 1);
// cursorpos is about codepoints, not bytes
TTextWalker walker(position, true);
auto cursorpos = parsetree_and_error.error->cursorpos;
size_t codepoints = 0;
if (cursorpos >= 0) {
for (size_t i = 0; i < input.size(); ++i) {
if (codepoints == cursorpos) {
break;
}
if (!TTextWalker::IsUtf8Intermediate(input[i])) {
++codepoints;
}
walker.Advance(input[i]);
}
}
events.OnError(TIssue(position, "ERROR: " + TString(parsetree_and_error.error->message) + "\n"));
} else {
events.OnResult(parsetree_and_error.tree);
}
}
TString PrintPGTree(const List* raw) {
auto str = nodeToString(raw);
Y_DEFER {
pfree(str);
};
return TString(str);
}
TString GetCommandName(Node* node) {
return CreateCommandName(node);
}
}
extern "C" void setup_pg_thread_cleanup() {
struct TThreadCleanup {
~TThreadCleanup() {
NYql::TExtensionsRegistry::Instance().CleanupThread();
destroy_timezone_hashtable();
destroy_typecache_hashtable();
RE_cleanup_cache();
free_current_locale_conv();
ResourceOwnerDelete(CurrentResourceOwner);
MemoryContextDelete(TopMemoryContext);
free(MyProc);
}
};
static thread_local TThreadCleanup ThreadCleanup;
Log_error_verbosity = PGERROR_DEFAULT;
SetDatabaseEncoding(PG_UTF8);
SetClientEncoding(PG_UTF8);
InitializeClientEncoding();
MemoryContextInit();
auto owner = ResourceOwnerCreate(NULL, "TopTransaction");
TopTransactionResourceOwner = owner;
CurTransactionResourceOwner = owner;
CurrentResourceOwner = owner;
MyProcPid = getpid();
MyStartTimestamp = GetCurrentTimestamp();
MyStartTime = timestamptz_to_time_t(MyStartTimestamp);
InitializeLatchSupport();
MyLatch = &LocalLatchData;
InitLatch(MyLatch);
InitializeLatchWaitSet();
MyProc = (PGPROC*)malloc(sizeof(PGPROC));
Zero(*MyProc);
StartTransactionCommand();
InitializeSession();
work_mem = MAX_KILOBYTES; // a way to postpone spilling for tuple stores
assign_max_stack_depth(1024, nullptr);
MyDatabaseId = 3; // from catalog.pg_database
namespace_search_path = pstrdup("public");
InitializeSessionUserId(nullptr, 1);
};
|