aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-06-02 19:02:05 +0300
committervvvv <vvvv@ydb.tech>2023-06-02 19:02:05 +0300
commite403aee7f9d78faf4d4ea7c528da9ebd0a8bbd86 (patch)
tree4c02271c040d25212483417289f5254bb0f35c8e
parent3332cdd408c34bd067db2bde8bc0f322e392946b (diff)
downloadydb-e403aee7f9d78faf4d4ea7c528da9ebd0a8bbd86.tar.gz
Avoid most PG_TRY in PG interop
-rw-r--r--ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp6
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.h178
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp17
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp315
-rw-r--r--ydb/library/yql/parser/pg_wrapper/parser.cpp3
-rw-r--r--ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c15
-rw-r--r--ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h4
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp4
9 files changed, 205 insertions, 338 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
index 5921c8bbe48..46a04cc001c 100644
--- a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp
@@ -122,21 +122,21 @@ private:
TStringValue error("");
auto r = GetPgBuilder().ValueFromText(BoolOid, "", error);
UNIT_ASSERT(!r);
- UNIT_ASSERT_STRING_CONTAINS(AsString(error), "invalid input syntax for type boolean: \"\"");
+ UNIT_ASSERT_STRING_CONTAINS(AsString(error), "ERROR: invalid input syntax for type boolean: \"\"");
}
{
TStringValue error("");
auto r = GetPgBuilder().ValueFromText(BoolOid, "zzzz", error);
UNIT_ASSERT(!r);
- UNIT_ASSERT_STRING_CONTAINS(AsString(error), "Terminate was called, reason(85): Error in 'in' function: boolin, reason: invalid input syntax for type boolean: \"zzzz\"");
+ UNIT_ASSERT_STRING_CONTAINS(AsString(error), "ERROR: invalid input syntax for type boolean: \"zzzz\"");
}
{
TStringValue error("");
auto r = GetPgBuilder().ValueFromBinary(BoolOid, "", error);
UNIT_ASSERT(!r);
- UNIT_ASSERT_STRING_CONTAINS(AsString(error), "Error in 'recv' function: boolrecv, reason: no data left in message");
+ UNIT_ASSERT_STRING_CONTAINS(AsString(error), "ERROR: no data left in message");
}
{
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h
index 52bdce5c70f..72f37124710 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.h
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.h
@@ -233,7 +233,6 @@ struct TDefaultArgsPolicy {
};
extern "C" TPgKernelState& GetPGKernelState(arrow::compute::KernelContext* ctx);
-extern "C" void WithPgTry(const TString& funcName, const std::function<void()>& func);
template <typename TFunc, bool IsStrict, bool IsFixedResult, typename TArgsPolicy = TDefaultArgsPolicy>
struct TGenericExec {
@@ -275,10 +274,7 @@ struct TGenericExec {
Y_ENSURE(state.flinfo.fn_strict == IsStrict);
Y_ENSURE(state.IsFixedResult == IsFixedResult);
TArenaMemoryContext arena;
- WithPgTry(state.Name, [&]() {
- Dispatch1(hasScalars, hasNulls, ctx, batch, length, state, res);
- });
-
+ Dispatch1(hasScalars, hasNulls, ctx, batch, length, state, res);
return arrow::Status::OK();
}
@@ -517,18 +513,16 @@ public:
auto ret = *typedState;
if constexpr (HasFunc) {
if (!IsStrict || !typedState->isnull) {
- WithPgTry(Name_, [&]() {
- LOCAL_FCINFO(callInfo, 1);
- callInfo->flinfo = FuncInfo_;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
- callInfo->isnull = false;
- callInfo->args[0].isnull = typedState->isnull;
- callInfo->args[0].value = typedState->value;
- ret.value = Func_(callInfo);
- ret.isnull = callInfo->isnull;
- });
+ LOCAL_FCINFO(callInfo, 1);
+ callInfo->flinfo = FuncInfo_;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ callInfo->isnull = false;
+ callInfo->args[0].isnull = typedState->isnull;
+ callInfo->args[0].value = typedState->value;
+ ret.value = Func_(callInfo);
+ ret.isnull = callInfo->isnull;
}
}
@@ -623,20 +617,18 @@ private:
fmgr_info(inFuncId, &InFuncInfo_);
Y_ENSURE(InFuncInfo_.fn_addr);
- WithPgTry(this->AggDesc_.Name, [&]() {
- LOCAL_FCINFO(inCallInfo, 3);
- inCallInfo->flinfo = &this->InFuncInfo_;
- inCallInfo->nargs = 3;
- inCallInfo->fncollation = DEFAULT_COLLATION_OID;
- inCallInfo->isnull = false;
- inCallInfo->args[0] = { (Datum)this->AggDesc_.InitValue.c_str(), false };
- inCallInfo->args[1] = { ObjectIdGetDatum(this->TypeIOParam_), false };
- inCallInfo->args[2] = { Int32GetDatum(-1), false };
-
- auto state = this->InFuncInfo_.fn_addr(inCallInfo);
- Y_ENSURE(!inCallInfo->isnull);
- PreparedInitValue_ = AnyDatumToPod(state, IsTransTypeFixed);
- });
+ LOCAL_FCINFO(inCallInfo, 3);
+ inCallInfo->flinfo = &this->InFuncInfo_;
+ inCallInfo->nargs = 3;
+ inCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ inCallInfo->isnull = false;
+ inCallInfo->args[0] = { (Datum)this->AggDesc_.InitValue.c_str(), false };
+ inCallInfo->args[1] = { ObjectIdGetDatum(this->TypeIOParam_), false };
+ inCallInfo->args[2] = { Int32GetDatum(-1), false };
+
+ auto state = this->InFuncInfo_.fn_addr(inCallInfo);
+ Y_ENSURE(!inCallInfo->isnull);
+ PreparedInitValue_ = AnyDatumToPod(state, IsTransTypeFixed);
}
}
@@ -711,21 +703,19 @@ private:
filterBitmap = filterArray->template GetValues<uint8_t>(1);
}
- WithPgTry(this->AggDesc_.Name, [&]() {
- if (hasNulls) {
- if (hasScalars) {
- AddManyImpl<true, true>(typedState, values, batchLength, filterBitmap);
- } else {
- AddManyImpl<true, false>(typedState, values, batchLength, filterBitmap);
- }
+ if (hasNulls) {
+ if (hasScalars) {
+ AddManyImpl<true, true>(typedState, values, batchLength, filterBitmap);
} else {
- if (hasScalars) {
- AddManyImpl<false, true>(typedState, values, batchLength, filterBitmap);
- } else {
- AddManyImpl<false, false>(typedState, values, batchLength, filterBitmap);
- }
+ AddManyImpl<true, false>(typedState, values, batchLength, filterBitmap);
}
- });
+ } else {
+ if (hasScalars) {
+ AddManyImpl<false, true>(typedState, values, batchLength, filterBitmap);
+ } else {
+ AddManyImpl<false, false>(typedState, values, batchLength, filterBitmap);
+ }
+ }
}
template <bool HasNulls, bool HasScalars>
@@ -837,27 +827,25 @@ SkipCall:;
if constexpr (HasSerialize) {
NUdf::TUnboxedValue ret;
- WithPgTry(this->AggDesc_.Name, [&]() {
- LOCAL_FCINFO(serializeCallInfo, 1);
- serializeCallInfo->flinfo = &this->SerializeFuncInfo_;
- serializeCallInfo->nargs = 1;
- serializeCallInfo->fncollation = DEFAULT_COLLATION_OID;
- serializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
- serializeCallInfo->isnull = false;
- serializeCallInfo->args[0].isnull = false;
- serializeCallInfo->args[0].value = typedState->value;
- auto ser = this->SerializeFunc_(serializeCallInfo);
- Y_ENSURE(!serializeCallInfo->isnull);
- if constexpr (IsSerializedTypeFixed) {
- ret = ScalarDatumToPod(ser);
- } else {
- ret = PointerDatumToPod(ser);
- if (ser == typedState->value) {
- typedState->value = 0;
- typedState->isnull = true;
- }
+ LOCAL_FCINFO(serializeCallInfo, 1);
+ serializeCallInfo->flinfo = &this->SerializeFuncInfo_;
+ serializeCallInfo->nargs = 1;
+ serializeCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ serializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ serializeCallInfo->isnull = false;
+ serializeCallInfo->args[0].isnull = false;
+ serializeCallInfo->args[0].value = typedState->value;
+ auto ser = this->SerializeFunc_(serializeCallInfo);
+ Y_ENSURE(!serializeCallInfo->isnull);
+ if constexpr (IsSerializedTypeFixed) {
+ ret = ScalarDatumToPod(ser);
+ } else {
+ ret = PointerDatumToPod(ser);
+ if (ser == typedState->value) {
+ typedState->value = 0;
+ typedState->isnull = true;
}
- });
+ }
return ret;
} else {
@@ -980,9 +968,7 @@ SkipCall:;
}
transCallInfo->isnull = false;
- WithPgTry(this->AggDesc_.Name, [&]() {
- ret = this->TransFunc_(transCallInfo);
- });
+ ret = this->TransFunc_(transCallInfo);
CopyState<IsTransTypeFixed>({ret, transCallInfo->isnull}, *typedState);
SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
@@ -1043,18 +1029,16 @@ SkipCall:;
}
void Deserialize(Datum ser, NullableDatum& result) {
- WithPgTry(this->AggDesc_.Name, [&]() {
- LOCAL_FCINFO(deserializeCallInfo, 1);
- deserializeCallInfo->flinfo = &this->DeserializeFuncInfo_;
- deserializeCallInfo->nargs = 1;
- deserializeCallInfo->fncollation = DEFAULT_COLLATION_OID;
- deserializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
- deserializeCallInfo->isnull = false;
- deserializeCallInfo->args[0].isnull = false;
- deserializeCallInfo->args[0].value = ser;
- result.value = this->DeserializeFunc_(deserializeCallInfo);
- result.isnull = deserializeCallInfo->isnull;
- });
+ LOCAL_FCINFO(deserializeCallInfo, 1);
+ deserializeCallInfo->flinfo = &this->DeserializeFuncInfo_;
+ deserializeCallInfo->nargs = 1;
+ deserializeCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ deserializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ deserializeCallInfo->isnull = false;
+ deserializeCallInfo->args[0].isnull = false;
+ deserializeCallInfo->args[0].value = ser;
+ result.value = this->DeserializeFunc_(deserializeCallInfo);
+ result.isnull = deserializeCallInfo->isnull;
}
void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
@@ -1130,28 +1114,24 @@ SkipCall:;
}
}
- WithPgTry(this->AggDesc_.Name, [&]() {
- LOCAL_FCINFO(combineCallInfo, 2);
- combineCallInfo->flinfo = &this->CombineFuncInfo_;
- combineCallInfo->nargs = 2;
- combineCallInfo->fncollation = DEFAULT_COLLATION_OID;
- combineCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
- combineCallInfo->isnull = false;
- combineCallInfo->args[0] = *typedState;
- combineCallInfo->args[1] = deser;
- auto ret = this->CombineFunc_(combineCallInfo);
- if constexpr (!HasDeserialize) {
- if (!combineCallInfo->isnull && ret == d.value) {
- typedState->isnull = false;
- typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
- return;
- }
+ LOCAL_FCINFO(combineCallInfo, 2);
+ combineCallInfo->flinfo = &this->CombineFuncInfo_;
+ combineCallInfo->nargs = 2;
+ combineCallInfo->fncollation = DEFAULT_COLLATION_OID;
+ combineCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext;
+ combineCallInfo->isnull = false;
+ combineCallInfo->args[0] = *typedState;
+ combineCallInfo->args[1] = deser;
+ auto ret = this->CombineFunc_(combineCallInfo);
+ if constexpr (!HasDeserialize) {
+ if (!combineCallInfo->isnull && ret == d.value) {
+ typedState->isnull = false;
+ typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID);
+ return;
}
+ }
- CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState);
- });
-
-
+ CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState);
SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID);
}
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp b/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp
index 881f460ecbb..24137349c3c 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp
@@ -6,21 +6,4 @@ extern "C" TPgKernelState& GetPGKernelState(arrow::compute::KernelContext* ctx)
return dynamic_cast<TPgKernelState&>(*ctx->state());
}
-extern "C" void WithPgTry(const TString& funcName, const std::function<void()>& func) {
- PG_TRY();
- {
- func();
- }
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in function: " << funcName << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
-}
-
}
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 931da4dc1fc..58e3a0c94e8 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -80,6 +80,7 @@ struct TMainContext {
MemoryContext PrevErrorContext = nullptr;
TimestampTz StartTimestamp;
pg_stack_base_t PrevStackBase;
+ TString LastError;
};
NUdf::TUnboxedValue CreatePgString(i32 typeLen, ui32 targetTypeId, TStringBuf data) {
@@ -209,22 +210,9 @@ public:
callInfo->args[2] = { Int32GetDatum(typeMod), false };
TPAllocScope call;
- PG_TRY();
- {
- auto ret = FInfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- return AnyDatumToPod(ret, TypeDesc.PassByValue);
- }
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in function: " << NPg::LookupProc(TypeDesc.InFuncId).Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
+ auto ret = FInfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return AnyDatumToPod(ret, TypeDesc.PassByValue);
}
private:
@@ -453,25 +441,12 @@ public:
private:
NUdf::TUnboxedValuePod DoCall(FunctionCallInfoBaseData& callInfo) const {
- PG_TRY();
- {
- auto ret = this->FInfo.fn_addr(&callInfo);
- if (callInfo.isnull) {
- return NUdf::TUnboxedValuePod();
- }
-
- return AnyDatumToPod(ret, this->RetTypeDesc.PassByValue);
+ auto ret = this->FInfo.fn_addr(&callInfo);
+ if (callInfo.isnull) {
+ return NUdf::TUnboxedValuePod();
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in function: " << this->Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
+
+ return AnyDatumToPod(ret, this->RetTypeDesc.PassByValue);
}
TPgResolvedCallState& GetState(TComputationContext& compCtx) const {
@@ -530,33 +505,20 @@ private:
}
auto& callInfo = CallInfo.Ref();
- PG_TRY();
- {
- callInfo.isnull = false;
- auto ret = callInfo.flinfo->fn_addr(&callInfo);
- if (RSInfo.Ref().isDone == ExprEndResult) {
- IsFinished = true;
- return false;
- }
-
- if (callInfo.isnull) {
- value = NUdf::TUnboxedValuePod();
- } else {
- value = AnyDatumToPod(ret, RetTypeDesc.PassByValue);
- }
-
- return true;
+ callInfo.isnull = false;
+ auto ret = callInfo.flinfo->fn_addr(&callInfo);
+ if (RSInfo.Ref().isDone == ExprEndResult) {
+ IsFinished = true;
+ return false;
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in function: " << Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
+
+ if (callInfo.isnull) {
+ value = NUdf::TUnboxedValuePod();
+ } else {
+ value = AnyDatumToPod(ret, RetTypeDesc.PassByValue);
}
- PG_END_TRY();
+
+ return true;
}
const std::string_view Name;
@@ -862,7 +824,6 @@ private:
}
};
- PG_TRY();
{
auto ret = FInfo1.fn_addr(&callInfo1);
Y_ENSURE(!callInfo1.isnull);
@@ -892,16 +853,6 @@ private:
return ret;
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in cast, reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
}
const ui32 StateIndex;
@@ -1191,7 +1142,6 @@ public:
}
}
- PG_TRY();
{
int ndims = 0;
int dims[MAXDIM];
@@ -1382,16 +1332,6 @@ public:
return PointerDatumToPod(PointerGetDatum(result));
}
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in PgArray, reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
}
private:
@@ -1971,7 +1911,6 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId)
}
};
- PG_TRY();
{
FmgrInfo finfo;
Zero(finfo);
@@ -1994,16 +1933,6 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId)
return TString(str);
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in 'out' function: " << NPg::LookupProc(outFuncId).Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
}
template <typename F>
@@ -2030,7 +1959,6 @@ void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeI
}
};
- PG_TRY();
{
FmgrInfo finfo;
Zero(finfo);
@@ -2056,16 +1984,6 @@ void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeI
ui32 len = s.Size();
f(TStringBuf(s.Data(), s.Size()));
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in 'send' function: " << NPg::LookupProc(sendFuncId).Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
}
TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
@@ -2253,7 +2171,6 @@ NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgType
receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId;
}
- PG_TRY();
{
FmgrInfo finfo;
Zero(finfo);
@@ -2281,16 +2198,6 @@ NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgType
}
return AnyDatumToPod(x, typeInfo.PassByValue);
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
}
NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) {
@@ -2304,7 +2211,6 @@ NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId)
inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId;
}
- PG_TRY();
{
FmgrInfo finfo;
Zero(finfo);
@@ -2327,16 +2233,6 @@ NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId)
Y_ENSURE(!callInfo->isnull);
return AnyDatumToPod(x, typeInfo.PassByValue);
}
- PG_CATCH();
- {
- auto error_data = CopyErrorData();
- TStringBuilder errMsg;
- errMsg << "Error in 'in' function: " << NPg::LookupProc(inFuncId).Name << ", reason: " << error_data->message;
- FreeErrorData(error_data);
- FlushErrorState();
- UdfTerminate(errMsg.c_str());
- }
- PG_END_TRY();
}
NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) {
@@ -3274,7 +3170,7 @@ NUdf::IEquate::TPtr MakePgEquate(const TPgType* type) {
}
void* PgInitializeMainContext() {
- auto ctx = (TMainContext*)malloc(sizeof(TMainContext));
+ auto ctx = new TMainContext();
MemoryContextCreate((MemoryContext)&ctx->Data,
T_AllocSetContext,
&MkqlMethods,
@@ -3285,7 +3181,7 @@ void* PgInitializeMainContext() {
}
void PgDestroyMainContext(void* ctx) {
- free(ctx);
+ delete (TMainContext*)ctx;
}
void PgAcquireThreadContext(void* ctx) {
@@ -3297,6 +3193,7 @@ void PgAcquireThreadContext(void* ctx) {
CurrentMemoryContext = ErrorContext = (MemoryContext)&main->Data;
SetParallelStartTimestamps(main->StartTimestamp, main->StartTimestamp);
main->PrevStackBase = set_stack_base();
+ yql_error_report_active = true;
}
}
@@ -3306,9 +3203,20 @@ void PgReleaseThreadContext(void* ctx) {
CurrentMemoryContext = main->PrevCurrentMemoryContext;
ErrorContext = main->PrevErrorContext;
restore_stack_base(main->PrevStackBase);
+ yql_error_report_active = false;
}
}
+extern "C" void yql_prepare_error(const char* msg) {
+ auto ctx = (TMainContext*)TlsAllocState->MainContext;
+ ctx->LastError = msg;
+}
+
+extern "C" void yql_raise_error() {
+ auto ctx = (TMainContext*)TlsAllocState->MainContext;
+ UdfTerminate(ctx->LastError.c_str());
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
@@ -3485,32 +3393,23 @@ public:
pfree((void*)datumR);
}
};
- PG_TRY();
- {
- datumL = Receive(dataL, sizeL);
- datumR = Receive(dataR, sizeR);
- FmgrInfo finfo;
- InitFunc(CompareProcId, &finfo, 2, 2);
- LOCAL_FCINFO(callInfo, 2);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 2;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { datumL, false };
- callInfo->args[1] = { datumR, false };
- auto result = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- return DatumGetInt32(result);
- }
- PG_CATCH();
- {
- // TODO
- Y_FAIL("PG error in Compare");
- }
- PG_END_TRY();
- return 0;
+ datumL = Receive(dataL, sizeL);
+ datumR = Receive(dataR, sizeR);
+ FmgrInfo finfo;
+ InitFunc(CompareProcId, &finfo, 2, 2);
+ LOCAL_FCINFO(callInfo, 2);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 2;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { datumL, false };
+ callInfo->args[1] = { datumR, false };
+
+ auto result = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return DatumGetInt32(result);
}
ui64 Hash(const char* data, size_t size) const {
@@ -3522,30 +3421,20 @@ public:
pfree((void*)datum);
}
};
- PG_TRY();
- {
- datum = Receive(data, size);
- FmgrInfo finfo;
- InitFunc(HashProcId, &finfo, 1, 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { datum, false };
+ datum = Receive(data, size);
+ FmgrInfo finfo;
+ InitFunc(HashProcId, &finfo, 1, 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { datum, false };
- auto result = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- return DatumGetUInt32(result);
- }
- PG_CATCH();
- {
- // TODO
- Y_FAIL("PG error in Hash");
- }
- PG_END_TRY();
- return 0;
+ auto result = finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return DatumGetUInt32(result);
}
TConvertResult NativeBinaryFromNativeText(const TString& str) const {
@@ -3563,36 +3452,36 @@ public:
};
PG_TRY();
{
- {
- FmgrInfo finfo;
- InitFunc(InFuncId, &finfo, 1, 3);
- LOCAL_FCINFO(callInfo, 3);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 3;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { (Datum)str.c_str(), false };
- callInfo->args[1] = { ObjectIdGetDatum(NMiniKQL::MakeTypeIOParam(*this)), false };
- callInfo->args[2] = { Int32GetDatum(-1), false };
-
- datum = finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- }
+ {
FmgrInfo finfo;
- InitFunc(SendFuncId, &finfo, 1, 1);
- LOCAL_FCINFO(callInfo, 1);
+ InitFunc(InFuncId, &finfo, 1, 3);
+ LOCAL_FCINFO(callInfo, 3);
Zero(*callInfo);
callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
+ callInfo->nargs = 3;
callInfo->fncollation = DEFAULT_COLLATION_OID;
callInfo->isnull = false;
- callInfo->args[0] = { datum, false };
+ callInfo->args[0] = { (Datum)str.c_str(), false };
+ callInfo->args[1] = { ObjectIdGetDatum(NMiniKQL::MakeTypeIOParam(*this)), false };
+ callInfo->args[2] = { Int32GetDatum(-1), false };
- serialized = (text*)finfo.fn_addr(callInfo);
+ datum = finfo.fn_addr(callInfo);
Y_ENSURE(!callInfo->isnull);
- return {TString(NMiniKQL::GetVarBuf(serialized)), {}};
}
+ FmgrInfo finfo;
+ InitFunc(SendFuncId, &finfo, 1, 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { datum, false };
+
+ serialized = (text*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return {TString(NMiniKQL::GetVarBuf(serialized)), {}};
+ }
PG_CATCH();
{
auto error_data = CopyErrorData();
@@ -3620,20 +3509,20 @@ public:
};
PG_TRY();
{
- datum = Receive(binary.Data(), binary.Size());
- FmgrInfo finfo;
- InitFunc(OutFuncId, &finfo, 1, 1);
- LOCAL_FCINFO(callInfo, 1);
- Zero(*callInfo);
- callInfo->flinfo = &finfo;
- callInfo->nargs = 1;
- callInfo->fncollation = DEFAULT_COLLATION_OID;
- callInfo->isnull = false;
- callInfo->args[0] = { datum, false };
+ datum = Receive(binary.Data(), binary.Size());
+ FmgrInfo finfo;
+ InitFunc(OutFuncId, &finfo, 1, 1);
+ LOCAL_FCINFO(callInfo, 1);
+ Zero(*callInfo);
+ callInfo->flinfo = &finfo;
+ callInfo->nargs = 1;
+ callInfo->fncollation = DEFAULT_COLLATION_OID;
+ callInfo->isnull = false;
+ callInfo->args[0] = { datum, false };
- str = (char*)finfo.fn_addr(callInfo);
- Y_ENSURE(!callInfo->isnull);
- return {TString(str), {}};
+ str = (char*)finfo.fn_addr(callInfo);
+ Y_ENSURE(!callInfo->isnull);
+ return {TString(str), {}};
}
PG_CATCH();
{
@@ -3751,9 +3640,9 @@ public:
};
PG_TRY();
{
- datum = Receive(binary.Data(), binary.Size());
- return {};
- }
+ datum = Receive(binary.Data(), binary.Size());
+ return {};
+ }
PG_CATCH();
{
auto error_data = CopyErrorData();
diff --git a/ydb/library/yql/parser/pg_wrapper/parser.cpp b/ydb/library/yql/parser/pg_wrapper/parser.cpp
index bff1b533bb4..57124b87e7f 100644
--- a/ydb/library/yql/parser/pg_wrapper/parser.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/parser.cpp
@@ -200,7 +200,7 @@ void PGParse(const TString& input, IPGParseEvents& events) {
walker.Advance(input[i]);
}
- events.OnError(TIssue(position, TString(parsetree_and_error.error->message)));
+ events.OnError(TIssue(position, "ERROR: " + TString(parsetree_and_error.error->message) + "\n"));
} else {
events.OnResult(parsetree_and_error.tree);
}
@@ -230,6 +230,7 @@ extern "C" void setup_pg_thread_cleanup() {
};
static thread_local TThreadCleanup ThreadCleanup;
+ Log_error_verbosity = PGERROR_DEFAULT;
SetDatabaseEncoding(PG_UTF8);
MemoryContextInit();
auto owner = ResourceOwnerCreate(NULL, "TopTransaction");
diff --git a/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c b/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c
index a872af2499c..89386e8cbd4 100644
--- a/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c
+++ b/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c
@@ -93,6 +93,7 @@
__thread ErrorContextCallback *error_context_stack = NULL;
__thread sigjmp_buf *PG_exception_stack = NULL;
+__thread bool yql_error_report_active = false;
extern __thread bool redirection_done;
@@ -371,7 +372,7 @@ errstart(int elevel, const char *domain)
*/
if (elevel == ERROR)
{
- if (PG_exception_stack == NULL ||
+ if ((PG_exception_stack == NULL && !yql_error_report_active) ||
ExitOnAnyError ||
proc_exit_inprogress)
elevel = FATAL;
@@ -1801,7 +1802,12 @@ pg_re_throw(void)
/* If possible, throw the error to the next outer setjmp handler */
if (PG_exception_stack != NULL)
siglongjmp(*PG_exception_stack, 1);
- else
+ else if (yql_error_report_active) {
+ ErrorData *edata = &errordata[errordata_stack_depth];
+ send_message_to_server_log(edata);
+ FlushErrorState();
+ yql_raise_error();
+ }
{
/*
* If we get here, elog(ERROR) was thrown inside a PG_TRY block, which
@@ -3110,6 +3116,11 @@ send_message_to_server_log(ErrorData *edata)
appendStringInfoChar(&buf, '\n');
}
+ if (yql_error_report_active) {
+ yql_prepare_error(buf.data);
+ return;
+ }
+
#ifdef HAVE_SYSLOG
/* Write to syslog, if enabled */
if (Log_destination & LOG_DESTINATION_SYSLOG)
diff --git a/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h b/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h
index e9a5944d338..7e9970f25b5 100644
--- a/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h
+++ b/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h
@@ -356,6 +356,7 @@ extern __thread PGDLLIMPORT ErrorContextCallback *error_context_stack;
#endif
extern __thread PGDLLIMPORT sigjmp_buf *PG_exception_stack;
+extern __thread PGDLLIMPORT bool yql_error_report_active;
/* Stuff that error handlers might want to use */
@@ -400,6 +401,9 @@ typedef struct ErrorData
struct MemoryContextData *assoc_context;
} ErrorData;
+extern void yql_prepare_error(const char* msg);
+extern void yql_raise_error();
+
extern void EmitErrorReport(void);
extern ErrorData *CopyErrorData(void);
extern void FreeErrorData(ErrorData *edata);
diff --git a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
index 1d9388de199..0c7e03603d3 100644
--- a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
@@ -69,7 +69,6 @@ Y_UNIT_TEST_SUITE(TPgCodegen) {
auto func = codegen->GetModule().getFunction(std::string("arrow_" + name));
Y_ENSURE(func);
codegen->AddGlobalMapping("GetPGKernelState", (const void*)&GetPGKernelState);
- codegen->AddGlobalMapping("WithPgTry", (const void*)&WithPgTry);
codegen->Verify();
codegen->ExportSymbol(func);
codegen->Compile();
diff --git a/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp
index 972cfec0125..0c9368d0097 100644
--- a/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp
@@ -40,7 +40,7 @@ Y_UNIT_TEST_SUITE(TWrapperTests) {
UNIT_ASSERT(!events.Result);
UNIT_ASSERT(events.Issue);
auto msg = events.Issue->GetMessage();
- UNIT_ASSERT_NO_DIFF(msg, "syntax error at or near \"SELECT1\"");
+ UNIT_ASSERT_NO_DIFF(msg, "ERROR: syntax error at or near \"SELECT1\"\n");
UNIT_ASSERT_VALUES_EQUAL(events.Issue->Position.Row, 2);
UNIT_ASSERT_VALUES_EQUAL(events.Issue->Position.Column, 3);
}
@@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(TMTWrapperTests) {
Y_ENSURE(!events.Result);
Y_ENSURE(events.Issue);
auto msg = events.Issue->GetMessage();
- Y_ENSURE(msg == "syntax error at or near \"SELECT1\"");
+ Y_ENSURE(msg == "ERROR: syntax error at or near \"SELECT1\"\n");
Y_ENSURE(events.Issue->Position.Row == 2);
Y_ENSURE(events.Issue->Position.Column == 3);
}