diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-02 19:02:05 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-02 19:02:05 +0300 |
commit | e403aee7f9d78faf4d4ea7c528da9ebd0a8bbd86 (patch) | |
tree | 4c02271c040d25212483417289f5254bb0f35c8e | |
parent | 3332cdd408c34bd067db2bde8bc0f322e392946b (diff) | |
download | ydb-e403aee7f9d78faf4d4ea7c528da9ebd0a8bbd86.tar.gz |
Avoid most PG_TRY in PG interop
9 files changed, 205 insertions, 338 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp index 5921c8bbe48..46a04cc001c 100644 --- a/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_value_builder_ut.cpp @@ -122,21 +122,21 @@ private: TStringValue error(""); auto r = GetPgBuilder().ValueFromText(BoolOid, "", error); UNIT_ASSERT(!r); - UNIT_ASSERT_STRING_CONTAINS(AsString(error), "invalid input syntax for type boolean: \"\""); + UNIT_ASSERT_STRING_CONTAINS(AsString(error), "ERROR: invalid input syntax for type boolean: \"\""); } { TStringValue error(""); auto r = GetPgBuilder().ValueFromText(BoolOid, "zzzz", error); UNIT_ASSERT(!r); - UNIT_ASSERT_STRING_CONTAINS(AsString(error), "Terminate was called, reason(85): Error in 'in' function: boolin, reason: invalid input syntax for type boolean: \"zzzz\""); + UNIT_ASSERT_STRING_CONTAINS(AsString(error), "ERROR: invalid input syntax for type boolean: \"zzzz\""); } { TStringValue error(""); auto r = GetPgBuilder().ValueFromBinary(BoolOid, "", error); UNIT_ASSERT(!r); - UNIT_ASSERT_STRING_CONTAINS(AsString(error), "Error in 'recv' function: boolrecv, reason: no data left in message"); + UNIT_ASSERT_STRING_CONTAINS(AsString(error), "ERROR: no data left in message"); } { diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h index 52bdce5c70f..72f37124710 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/arrow.h @@ -233,7 +233,6 @@ struct TDefaultArgsPolicy { }; extern "C" TPgKernelState& GetPGKernelState(arrow::compute::KernelContext* ctx); -extern "C" void WithPgTry(const TString& funcName, const std::function<void()>& func); template <typename TFunc, bool IsStrict, bool IsFixedResult, typename TArgsPolicy = TDefaultArgsPolicy> struct TGenericExec { @@ -275,10 +274,7 @@ struct TGenericExec { Y_ENSURE(state.flinfo.fn_strict == IsStrict); Y_ENSURE(state.IsFixedResult == IsFixedResult); TArenaMemoryContext arena; - WithPgTry(state.Name, [&]() { - Dispatch1(hasScalars, hasNulls, ctx, batch, length, state, res); - }); - + Dispatch1(hasScalars, hasNulls, ctx, batch, length, state, res); return arrow::Status::OK(); } @@ -517,18 +513,16 @@ public: auto ret = *typedState; if constexpr (HasFunc) { if (!IsStrict || !typedState->isnull) { - WithPgTry(Name_, [&]() { - LOCAL_FCINFO(callInfo, 1); - callInfo->flinfo = FuncInfo_; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; - callInfo->isnull = false; - callInfo->args[0].isnull = typedState->isnull; - callInfo->args[0].value = typedState->value; - ret.value = Func_(callInfo); - ret.isnull = callInfo->isnull; - }); + LOCAL_FCINFO(callInfo, 1); + callInfo->flinfo = FuncInfo_; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + callInfo->isnull = false; + callInfo->args[0].isnull = typedState->isnull; + callInfo->args[0].value = typedState->value; + ret.value = Func_(callInfo); + ret.isnull = callInfo->isnull; } } @@ -623,20 +617,18 @@ private: fmgr_info(inFuncId, &InFuncInfo_); Y_ENSURE(InFuncInfo_.fn_addr); - WithPgTry(this->AggDesc_.Name, [&]() { - LOCAL_FCINFO(inCallInfo, 3); - inCallInfo->flinfo = &this->InFuncInfo_; - inCallInfo->nargs = 3; - inCallInfo->fncollation = DEFAULT_COLLATION_OID; - inCallInfo->isnull = false; - inCallInfo->args[0] = { (Datum)this->AggDesc_.InitValue.c_str(), false }; - inCallInfo->args[1] = { ObjectIdGetDatum(this->TypeIOParam_), false }; - inCallInfo->args[2] = { Int32GetDatum(-1), false }; - - auto state = this->InFuncInfo_.fn_addr(inCallInfo); - Y_ENSURE(!inCallInfo->isnull); - PreparedInitValue_ = AnyDatumToPod(state, IsTransTypeFixed); - }); + LOCAL_FCINFO(inCallInfo, 3); + inCallInfo->flinfo = &this->InFuncInfo_; + inCallInfo->nargs = 3; + inCallInfo->fncollation = DEFAULT_COLLATION_OID; + inCallInfo->isnull = false; + inCallInfo->args[0] = { (Datum)this->AggDesc_.InitValue.c_str(), false }; + inCallInfo->args[1] = { ObjectIdGetDatum(this->TypeIOParam_), false }; + inCallInfo->args[2] = { Int32GetDatum(-1), false }; + + auto state = this->InFuncInfo_.fn_addr(inCallInfo); + Y_ENSURE(!inCallInfo->isnull); + PreparedInitValue_ = AnyDatumToPod(state, IsTransTypeFixed); } } @@ -711,21 +703,19 @@ private: filterBitmap = filterArray->template GetValues<uint8_t>(1); } - WithPgTry(this->AggDesc_.Name, [&]() { - if (hasNulls) { - if (hasScalars) { - AddManyImpl<true, true>(typedState, values, batchLength, filterBitmap); - } else { - AddManyImpl<true, false>(typedState, values, batchLength, filterBitmap); - } + if (hasNulls) { + if (hasScalars) { + AddManyImpl<true, true>(typedState, values, batchLength, filterBitmap); } else { - if (hasScalars) { - AddManyImpl<false, true>(typedState, values, batchLength, filterBitmap); - } else { - AddManyImpl<false, false>(typedState, values, batchLength, filterBitmap); - } + AddManyImpl<true, false>(typedState, values, batchLength, filterBitmap); } - }); + } else { + if (hasScalars) { + AddManyImpl<false, true>(typedState, values, batchLength, filterBitmap); + } else { + AddManyImpl<false, false>(typedState, values, batchLength, filterBitmap); + } + } } template <bool HasNulls, bool HasScalars> @@ -837,27 +827,25 @@ SkipCall:; if constexpr (HasSerialize) { NUdf::TUnboxedValue ret; - WithPgTry(this->AggDesc_.Name, [&]() { - LOCAL_FCINFO(serializeCallInfo, 1); - serializeCallInfo->flinfo = &this->SerializeFuncInfo_; - serializeCallInfo->nargs = 1; - serializeCallInfo->fncollation = DEFAULT_COLLATION_OID; - serializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; - serializeCallInfo->isnull = false; - serializeCallInfo->args[0].isnull = false; - serializeCallInfo->args[0].value = typedState->value; - auto ser = this->SerializeFunc_(serializeCallInfo); - Y_ENSURE(!serializeCallInfo->isnull); - if constexpr (IsSerializedTypeFixed) { - ret = ScalarDatumToPod(ser); - } else { - ret = PointerDatumToPod(ser); - if (ser == typedState->value) { - typedState->value = 0; - typedState->isnull = true; - } + LOCAL_FCINFO(serializeCallInfo, 1); + serializeCallInfo->flinfo = &this->SerializeFuncInfo_; + serializeCallInfo->nargs = 1; + serializeCallInfo->fncollation = DEFAULT_COLLATION_OID; + serializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + serializeCallInfo->isnull = false; + serializeCallInfo->args[0].isnull = false; + serializeCallInfo->args[0].value = typedState->value; + auto ser = this->SerializeFunc_(serializeCallInfo); + Y_ENSURE(!serializeCallInfo->isnull); + if constexpr (IsSerializedTypeFixed) { + ret = ScalarDatumToPod(ser); + } else { + ret = PointerDatumToPod(ser); + if (ser == typedState->value) { + typedState->value = 0; + typedState->isnull = true; } - }); + } return ret; } else { @@ -980,9 +968,7 @@ SkipCall:; } transCallInfo->isnull = false; - WithPgTry(this->AggDesc_.Name, [&]() { - ret = this->TransFunc_(transCallInfo); - }); + ret = this->TransFunc_(transCallInfo); CopyState<IsTransTypeFixed>({ret, transCallInfo->isnull}, *typedState); SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); @@ -1043,18 +1029,16 @@ SkipCall:; } void Deserialize(Datum ser, NullableDatum& result) { - WithPgTry(this->AggDesc_.Name, [&]() { - LOCAL_FCINFO(deserializeCallInfo, 1); - deserializeCallInfo->flinfo = &this->DeserializeFuncInfo_; - deserializeCallInfo->nargs = 1; - deserializeCallInfo->fncollation = DEFAULT_COLLATION_OID; - deserializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; - deserializeCallInfo->isnull = false; - deserializeCallInfo->args[0].isnull = false; - deserializeCallInfo->args[0].value = ser; - result.value = this->DeserializeFunc_(deserializeCallInfo); - result.isnull = deserializeCallInfo->isnull; - }); + LOCAL_FCINFO(deserializeCallInfo, 1); + deserializeCallInfo->flinfo = &this->DeserializeFuncInfo_; + deserializeCallInfo->nargs = 1; + deserializeCallInfo->fncollation = DEFAULT_COLLATION_OID; + deserializeCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + deserializeCallInfo->isnull = false; + deserializeCallInfo->args[0].isnull = false; + deserializeCallInfo->args[0].value = ser; + result.value = this->DeserializeFunc_(deserializeCallInfo); + result.isnull = deserializeCallInfo->isnull; } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { @@ -1130,28 +1114,24 @@ SkipCall:; } } - WithPgTry(this->AggDesc_.Name, [&]() { - LOCAL_FCINFO(combineCallInfo, 2); - combineCallInfo->flinfo = &this->CombineFuncInfo_; - combineCallInfo->nargs = 2; - combineCallInfo->fncollation = DEFAULT_COLLATION_OID; - combineCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; - combineCallInfo->isnull = false; - combineCallInfo->args[0] = *typedState; - combineCallInfo->args[1] = deser; - auto ret = this->CombineFunc_(combineCallInfo); - if constexpr (!HasDeserialize) { - if (!combineCallInfo->isnull && ret == d.value) { - typedState->isnull = false; - typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); - return; - } + LOCAL_FCINFO(combineCallInfo, 2); + combineCallInfo->flinfo = &this->CombineFuncInfo_; + combineCallInfo->nargs = 2; + combineCallInfo->fncollation = DEFAULT_COLLATION_OID; + combineCallInfo->context = (Node*)NKikimr::NMiniKQL::TlsAllocState->CurrentContext; + combineCallInfo->isnull = false; + combineCallInfo->args[0] = *typedState; + combineCallInfo->args[1] = deser; + auto ret = this->CombineFunc_(combineCallInfo); + if constexpr (!HasDeserialize) { + if (!combineCallInfo->isnull && ret == d.value) { + typedState->isnull = false; + typedState->value = CloneDatumToAggContext<IsTransTypeFixed>(d.value, this->AggDesc_.TransTypeId == CSTRINGOID); + return; } + } - CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState); - }); - - + CopyState<IsTransTypeFixed>({ret, combineCallInfo->isnull}, *typedState); SaveToAggContext<IsTransTypeFixed>(*typedState, this->AggDesc_.TransTypeId == CSTRINGOID); } diff --git a/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp b/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp index 881f460ecbb..24137349c3c 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp +++ b/ydb/library/yql/parser/pg_wrapper/arrow_impl.cpp @@ -6,21 +6,4 @@ extern "C" TPgKernelState& GetPGKernelState(arrow::compute::KernelContext* ctx) return dynamic_cast<TPgKernelState&>(*ctx->state()); } -extern "C" void WithPgTry(const TString& funcName, const std::function<void()>& func) { - PG_TRY(); - { - func(); - } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in function: " << funcName << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); -} - } diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 931da4dc1fc..58e3a0c94e8 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -80,6 +80,7 @@ struct TMainContext { MemoryContext PrevErrorContext = nullptr; TimestampTz StartTimestamp; pg_stack_base_t PrevStackBase; + TString LastError; }; NUdf::TUnboxedValue CreatePgString(i32 typeLen, ui32 targetTypeId, TStringBuf data) { @@ -209,22 +210,9 @@ public: callInfo->args[2] = { Int32GetDatum(typeMod), false }; TPAllocScope call; - PG_TRY(); - { - auto ret = FInfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - return AnyDatumToPod(ret, TypeDesc.PassByValue); - } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in function: " << NPg::LookupProc(TypeDesc.InFuncId).Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); + auto ret = FInfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + return AnyDatumToPod(ret, TypeDesc.PassByValue); } private: @@ -453,25 +441,12 @@ public: private: NUdf::TUnboxedValuePod DoCall(FunctionCallInfoBaseData& callInfo) const { - PG_TRY(); - { - auto ret = this->FInfo.fn_addr(&callInfo); - if (callInfo.isnull) { - return NUdf::TUnboxedValuePod(); - } - - return AnyDatumToPod(ret, this->RetTypeDesc.PassByValue); + auto ret = this->FInfo.fn_addr(&callInfo); + if (callInfo.isnull) { + return NUdf::TUnboxedValuePod(); } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in function: " << this->Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); + + return AnyDatumToPod(ret, this->RetTypeDesc.PassByValue); } TPgResolvedCallState& GetState(TComputationContext& compCtx) const { @@ -530,33 +505,20 @@ private: } auto& callInfo = CallInfo.Ref(); - PG_TRY(); - { - callInfo.isnull = false; - auto ret = callInfo.flinfo->fn_addr(&callInfo); - if (RSInfo.Ref().isDone == ExprEndResult) { - IsFinished = true; - return false; - } - - if (callInfo.isnull) { - value = NUdf::TUnboxedValuePod(); - } else { - value = AnyDatumToPod(ret, RetTypeDesc.PassByValue); - } - - return true; + callInfo.isnull = false; + auto ret = callInfo.flinfo->fn_addr(&callInfo); + if (RSInfo.Ref().isDone == ExprEndResult) { + IsFinished = true; + return false; } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in function: " << Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); + + if (callInfo.isnull) { + value = NUdf::TUnboxedValuePod(); + } else { + value = AnyDatumToPod(ret, RetTypeDesc.PassByValue); } - PG_END_TRY(); + + return true; } const std::string_view Name; @@ -862,7 +824,6 @@ private: } }; - PG_TRY(); { auto ret = FInfo1.fn_addr(&callInfo1); Y_ENSURE(!callInfo1.isnull); @@ -892,16 +853,6 @@ private: return ret; } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in cast, reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); } const ui32 StateIndex; @@ -1191,7 +1142,6 @@ public: } } - PG_TRY(); { int ndims = 0; int dims[MAXDIM]; @@ -1382,16 +1332,6 @@ public: return PointerDatumToPod(PointerGetDatum(result)); } } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in PgArray, reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); } private: @@ -1971,7 +1911,6 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) } }; - PG_TRY(); { FmgrInfo finfo; Zero(finfo); @@ -1994,16 +1933,6 @@ TString PgValueToNativeText(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) return TString(str); } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in 'out' function: " << NPg::LookupProc(outFuncId).Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); } template <typename F> @@ -2030,7 +1959,6 @@ void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeI } }; - PG_TRY(); { FmgrInfo finfo; Zero(finfo); @@ -2056,16 +1984,6 @@ void PgValueToNativeBinaryImpl(const NUdf::TUnboxedValuePod& value, ui32 pgTypeI ui32 len = s.Size(); f(TStringBuf(s.Data(), s.Size())); } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in 'send' function: " << NPg::LookupProc(sendFuncId).Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); } TString PgValueToNativeBinary(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { @@ -2253,7 +2171,6 @@ NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgType receiveFuncId = NPg::LookupProc("array_recv", { 0,0,0 }).ProcId; } - PG_TRY(); { FmgrInfo finfo; Zero(finfo); @@ -2281,16 +2198,6 @@ NUdf::TUnboxedValue PgValueFromNativeBinary(const TStringBuf binary, ui32 pgType } return AnyDatumToPod(x, typeInfo.PassByValue); } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in 'recv' function: " << NPg::LookupProc(receiveFuncId).Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); } NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) { @@ -2304,7 +2211,6 @@ NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) inFuncId = NPg::LookupProc("array_in", { 0,0,0 }).ProcId; } - PG_TRY(); { FmgrInfo finfo; Zero(finfo); @@ -2327,16 +2233,6 @@ NUdf::TUnboxedValue PgValueFromNativeText(const TStringBuf text, ui32 pgTypeId) Y_ENSURE(!callInfo->isnull); return AnyDatumToPod(x, typeInfo.PassByValue); } - PG_CATCH(); - { - auto error_data = CopyErrorData(); - TStringBuilder errMsg; - errMsg << "Error in 'in' function: " << NPg::LookupProc(inFuncId).Name << ", reason: " << error_data->message; - FreeErrorData(error_data); - FlushErrorState(); - UdfTerminate(errMsg.c_str()); - } - PG_END_TRY(); } NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) { @@ -3274,7 +3170,7 @@ NUdf::IEquate::TPtr MakePgEquate(const TPgType* type) { } void* PgInitializeMainContext() { - auto ctx = (TMainContext*)malloc(sizeof(TMainContext)); + auto ctx = new TMainContext(); MemoryContextCreate((MemoryContext)&ctx->Data, T_AllocSetContext, &MkqlMethods, @@ -3285,7 +3181,7 @@ void* PgInitializeMainContext() { } void PgDestroyMainContext(void* ctx) { - free(ctx); + delete (TMainContext*)ctx; } void PgAcquireThreadContext(void* ctx) { @@ -3297,6 +3193,7 @@ void PgAcquireThreadContext(void* ctx) { CurrentMemoryContext = ErrorContext = (MemoryContext)&main->Data; SetParallelStartTimestamps(main->StartTimestamp, main->StartTimestamp); main->PrevStackBase = set_stack_base(); + yql_error_report_active = true; } } @@ -3306,9 +3203,20 @@ void PgReleaseThreadContext(void* ctx) { CurrentMemoryContext = main->PrevCurrentMemoryContext; ErrorContext = main->PrevErrorContext; restore_stack_base(main->PrevStackBase); + yql_error_report_active = false; } } +extern "C" void yql_prepare_error(const char* msg) { + auto ctx = (TMainContext*)TlsAllocState->MainContext; + ctx->LastError = msg; +} + +extern "C" void yql_raise_error() { + auto ctx = (TMainContext*)TlsAllocState->MainContext; + UdfTerminate(ctx->LastError.c_str()); +} + } // namespace NMiniKQL } // namespace NKikimr @@ -3485,32 +3393,23 @@ public: pfree((void*)datumR); } }; - PG_TRY(); - { - datumL = Receive(dataL, sizeL); - datumR = Receive(dataR, sizeR); - FmgrInfo finfo; - InitFunc(CompareProcId, &finfo, 2, 2); - LOCAL_FCINFO(callInfo, 2); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 2; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { datumL, false }; - callInfo->args[1] = { datumR, false }; - auto result = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - return DatumGetInt32(result); - } - PG_CATCH(); - { - // TODO - Y_FAIL("PG error in Compare"); - } - PG_END_TRY(); - return 0; + datumL = Receive(dataL, sizeL); + datumR = Receive(dataR, sizeR); + FmgrInfo finfo; + InitFunc(CompareProcId, &finfo, 2, 2); + LOCAL_FCINFO(callInfo, 2); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 2; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { datumL, false }; + callInfo->args[1] = { datumR, false }; + + auto result = finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + return DatumGetInt32(result); } ui64 Hash(const char* data, size_t size) const { @@ -3522,30 +3421,20 @@ public: pfree((void*)datum); } }; - PG_TRY(); - { - datum = Receive(data, size); - FmgrInfo finfo; - InitFunc(HashProcId, &finfo, 1, 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { datum, false }; + datum = Receive(data, size); + FmgrInfo finfo; + InitFunc(HashProcId, &finfo, 1, 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { datum, false }; - auto result = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - return DatumGetUInt32(result); - } - PG_CATCH(); - { - // TODO - Y_FAIL("PG error in Hash"); - } - PG_END_TRY(); - return 0; + auto result = finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + return DatumGetUInt32(result); } TConvertResult NativeBinaryFromNativeText(const TString& str) const { @@ -3563,36 +3452,36 @@ public: }; PG_TRY(); { - { - FmgrInfo finfo; - InitFunc(InFuncId, &finfo, 1, 3); - LOCAL_FCINFO(callInfo, 3); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 3; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { (Datum)str.c_str(), false }; - callInfo->args[1] = { ObjectIdGetDatum(NMiniKQL::MakeTypeIOParam(*this)), false }; - callInfo->args[2] = { Int32GetDatum(-1), false }; - - datum = finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - } + { FmgrInfo finfo; - InitFunc(SendFuncId, &finfo, 1, 1); - LOCAL_FCINFO(callInfo, 1); + InitFunc(InFuncId, &finfo, 1, 3); + LOCAL_FCINFO(callInfo, 3); Zero(*callInfo); callInfo->flinfo = &finfo; - callInfo->nargs = 1; + callInfo->nargs = 3; callInfo->fncollation = DEFAULT_COLLATION_OID; callInfo->isnull = false; - callInfo->args[0] = { datum, false }; + callInfo->args[0] = { (Datum)str.c_str(), false }; + callInfo->args[1] = { ObjectIdGetDatum(NMiniKQL::MakeTypeIOParam(*this)), false }; + callInfo->args[2] = { Int32GetDatum(-1), false }; - serialized = (text*)finfo.fn_addr(callInfo); + datum = finfo.fn_addr(callInfo); Y_ENSURE(!callInfo->isnull); - return {TString(NMiniKQL::GetVarBuf(serialized)), {}}; } + FmgrInfo finfo; + InitFunc(SendFuncId, &finfo, 1, 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { datum, false }; + + serialized = (text*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + return {TString(NMiniKQL::GetVarBuf(serialized)), {}}; + } PG_CATCH(); { auto error_data = CopyErrorData(); @@ -3620,20 +3509,20 @@ public: }; PG_TRY(); { - datum = Receive(binary.Data(), binary.Size()); - FmgrInfo finfo; - InitFunc(OutFuncId, &finfo, 1, 1); - LOCAL_FCINFO(callInfo, 1); - Zero(*callInfo); - callInfo->flinfo = &finfo; - callInfo->nargs = 1; - callInfo->fncollation = DEFAULT_COLLATION_OID; - callInfo->isnull = false; - callInfo->args[0] = { datum, false }; + datum = Receive(binary.Data(), binary.Size()); + FmgrInfo finfo; + InitFunc(OutFuncId, &finfo, 1, 1); + LOCAL_FCINFO(callInfo, 1); + Zero(*callInfo); + callInfo->flinfo = &finfo; + callInfo->nargs = 1; + callInfo->fncollation = DEFAULT_COLLATION_OID; + callInfo->isnull = false; + callInfo->args[0] = { datum, false }; - str = (char*)finfo.fn_addr(callInfo); - Y_ENSURE(!callInfo->isnull); - return {TString(str), {}}; + str = (char*)finfo.fn_addr(callInfo); + Y_ENSURE(!callInfo->isnull); + return {TString(str), {}}; } PG_CATCH(); { @@ -3751,9 +3640,9 @@ public: }; PG_TRY(); { - datum = Receive(binary.Data(), binary.Size()); - return {}; - } + datum = Receive(binary.Data(), binary.Size()); + return {}; + } PG_CATCH(); { auto error_data = CopyErrorData(); diff --git a/ydb/library/yql/parser/pg_wrapper/parser.cpp b/ydb/library/yql/parser/pg_wrapper/parser.cpp index bff1b533bb4..57124b87e7f 100644 --- a/ydb/library/yql/parser/pg_wrapper/parser.cpp +++ b/ydb/library/yql/parser/pg_wrapper/parser.cpp @@ -200,7 +200,7 @@ void PGParse(const TString& input, IPGParseEvents& events) { walker.Advance(input[i]); } - events.OnError(TIssue(position, TString(parsetree_and_error.error->message))); + events.OnError(TIssue(position, "ERROR: " + TString(parsetree_and_error.error->message) + "\n")); } else { events.OnResult(parsetree_and_error.tree); } @@ -230,6 +230,7 @@ extern "C" void setup_pg_thread_cleanup() { }; static thread_local TThreadCleanup ThreadCleanup; + Log_error_verbosity = PGERROR_DEFAULT; SetDatabaseEncoding(PG_UTF8); MemoryContextInit(); auto owner = ResourceOwnerCreate(NULL, "TopTransaction"); diff --git a/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c b/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c index a872af2499c..89386e8cbd4 100644 --- a/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c +++ b/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/error/elog.c @@ -93,6 +93,7 @@ __thread ErrorContextCallback *error_context_stack = NULL; __thread sigjmp_buf *PG_exception_stack = NULL; +__thread bool yql_error_report_active = false; extern __thread bool redirection_done; @@ -371,7 +372,7 @@ errstart(int elevel, const char *domain) */ if (elevel == ERROR) { - if (PG_exception_stack == NULL || + if ((PG_exception_stack == NULL && !yql_error_report_active) || ExitOnAnyError || proc_exit_inprogress) elevel = FATAL; @@ -1801,7 +1802,12 @@ pg_re_throw(void) /* If possible, throw the error to the next outer setjmp handler */ if (PG_exception_stack != NULL) siglongjmp(*PG_exception_stack, 1); - else + else if (yql_error_report_active) { + ErrorData *edata = &errordata[errordata_stack_depth]; + send_message_to_server_log(edata); + FlushErrorState(); + yql_raise_error(); + } { /* * If we get here, elog(ERROR) was thrown inside a PG_TRY block, which @@ -3110,6 +3116,11 @@ send_message_to_server_log(ErrorData *edata) appendStringInfoChar(&buf, '\n'); } + if (yql_error_report_active) { + yql_prepare_error(buf.data); + return; + } + #ifdef HAVE_SYSLOG /* Write to syslog, if enabled */ if (Log_destination & LOG_DESTINATION_SYSLOG) diff --git a/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h b/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h index e9a5944d338..7e9970f25b5 100644 --- a/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h +++ b/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/elog.h @@ -356,6 +356,7 @@ extern __thread PGDLLIMPORT ErrorContextCallback *error_context_stack; #endif extern __thread PGDLLIMPORT sigjmp_buf *PG_exception_stack; +extern __thread PGDLLIMPORT bool yql_error_report_active; /* Stuff that error handlers might want to use */ @@ -400,6 +401,9 @@ typedef struct ErrorData struct MemoryContextData *assoc_context; } ErrorData; +extern void yql_prepare_error(const char* msg); +extern void yql_raise_error(); + extern void EmitErrorReport(void); extern ErrorData *CopyErrorData(void); extern void FreeErrorData(ErrorData *edata); diff --git a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp index 1d9388de199..0c7e03603d3 100644 --- a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp +++ b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp @@ -69,7 +69,6 @@ Y_UNIT_TEST_SUITE(TPgCodegen) { auto func = codegen->GetModule().getFunction(std::string("arrow_" + name)); Y_ENSURE(func); codegen->AddGlobalMapping("GetPGKernelState", (const void*)&GetPGKernelState); - codegen->AddGlobalMapping("WithPgTry", (const void*)&WithPgTry); codegen->Verify(); codegen->ExportSymbol(func); codegen->Compile(); diff --git a/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp index 972cfec0125..0c9368d0097 100644 --- a/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp +++ b/ydb/library/yql/parser/pg_wrapper/ut/parser_ut.cpp @@ -40,7 +40,7 @@ Y_UNIT_TEST_SUITE(TWrapperTests) { UNIT_ASSERT(!events.Result); UNIT_ASSERT(events.Issue); auto msg = events.Issue->GetMessage(); - UNIT_ASSERT_NO_DIFF(msg, "syntax error at or near \"SELECT1\""); + UNIT_ASSERT_NO_DIFF(msg, "ERROR: syntax error at or near \"SELECT1\"\n"); UNIT_ASSERT_VALUES_EQUAL(events.Issue->Position.Row, 2); UNIT_ASSERT_VALUES_EQUAL(events.Issue->Position.Column, 3); } @@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(TMTWrapperTests) { Y_ENSURE(!events.Result); Y_ENSURE(events.Issue); auto msg = events.Issue->GetMessage(); - Y_ENSURE(msg == "syntax error at or near \"SELECT1\""); + Y_ENSURE(msg == "ERROR: syntax error at or near \"SELECT1\"\n"); Y_ENSURE(events.Issue->Position.Row == 2); Y_ENSURE(events.Issue->Position.Column == 3); } |