diff options
| author | vvvv <[email protected]> | 2022-03-31 20:31:29 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2022-03-31 20:31:29 +0300 |
| commit | fa4b82d3614d68d8c7ede2bf4f58ffa9f9487784 (patch) | |
| tree | 9aa7b7eb604f042818369009ed8a525cb9b4992c | |
| parent | b109b0629a87c94e27eb2b296c7b8c63a446a034 (diff) | |
YQL-13710 setup pg memory contexts once
ref:16c481951cdfe145829e99a3e94d7e54e3c83a2d
| -rw-r--r-- | ydb/library/yql/minikql/mkql_alloc.h | 10 | ||||
| -rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 87 | ||||
| -rw-r--r-- | ydb/library/yql/parser/pg_wrapper/parser.cpp | 10 | ||||
| -rwxr-xr-x | ydb/library/yql/parser/pg_wrapper/verify.sh | 2 | ||||
| -rw-r--r-- | ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt | 1 | ||||
| -rw-r--r-- | ydb/library/yql/public/udf/ut/CMakeLists.linux.txt | 1 | ||||
| -rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 16 |
7 files changed, 75 insertions, 52 deletions
diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index 733f6502dea..bf17e527bca 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -57,6 +57,7 @@ struct TAllocState : public TAlignedPagePool TListEntry OffloadedBlocksRoot; TListEntry GlobalPAllocList; TListEntry* CurrentPAllocList; + void* MainContext = nullptr; void* CurrentContext = nullptr; ::NKikimr::NUdf::TBoxedValueLink Root; @@ -74,6 +75,11 @@ struct TAllocState : public TAlignedPagePool extern Y_POD_THREAD(TAllocState*) TlsAllocState; +void* PgInitializeMainContext(); +void PgDestroyMainContext(void* ctx); +void PgAcquireThreadContext(void* ctx); +void PgReleaseThreadContext(void* ctx); + class TPAllocScope { public: TPAllocScope() { @@ -115,6 +121,7 @@ public: explicit TScopedAlloc(const TAlignedPagePoolCounters& counters = TAlignedPagePoolCounters(), bool supportsSizedAllocators = false) : MyState_(counters, supportsSizedAllocators) { + MyState_.MainContext = PgInitializeMainContext(); Acquire(); } @@ -122,6 +129,7 @@ public: { MyState_.KillAllBoxed(); Release(); + PgDestroyMainContext(MyState_.MainContext); } TAllocState& Ref() { @@ -132,6 +140,7 @@ public: if (!AttachedCount_) { PrevState_ = TlsAllocState; TlsAllocState = &MyState_; + PgAcquireThreadContext(MyState_.MainContext); } else { Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread"); } @@ -142,6 +151,7 @@ public: void Release() { if (AttachedCount_ && --AttachedCount_ == 0) { Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread"); + PgReleaseThreadContext(MyState_.MainContext); TlsAllocState = PrevState_; PrevState_ = nullptr; } diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index f9685302763..71b5fe66e09 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -51,6 +51,12 @@ namespace NYql { using namespace NKikimr::NMiniKQL; +struct TMainContext { + MemoryContextData Data; + MemoryContext PrevCurrentMemoryContext = nullptr; + MemoryContext PrevErrorContext = nullptr; +}; + ui32 GetFullVarSize(const text* s) { return VARSIZE(s); } @@ -201,22 +207,6 @@ const MemoryContextMethods MkqlMethods = { #endif }; -struct TMkqlPgAdapter { - TMkqlPgAdapter() { - MemoryContextCreate((MemoryContext)&Data, - T_AllocSetContext, - &MkqlMethods, - nullptr, - "mkql"); - } - - static MemoryContext Instance() { - return (MemoryContext)&Singleton<TMkqlPgAdapter>()->Data; - } - - MemoryContextData Data; -}; - class TVPtrHolder { public: TVPtrHolder() { @@ -235,9 +225,6 @@ private: TVPtrHolder TVPtrHolder::Instance; -#define SET_MEMORY_CONTEXT \ - CurrentMemoryContext = ErrorContext = TMkqlPgAdapter::Instance(); - inline ui32 MakeTypeIOParam(const NPg::TTypeDesc& desc) { return desc.ElementTypeId ? desc.ElementTypeId : desc.TypeId; } @@ -261,8 +248,6 @@ public: } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { - SET_MEMORY_CONTEXT; - LOCAL_FCINFO(callInfo, 3); Zero(*callInfo); callInfo->flinfo = const_cast<FmgrInfo*>(&FInfo); @@ -402,7 +387,6 @@ public: } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const { - SET_MEMORY_CONTEXT; auto& state = GetState(compCtx); auto& callInfo = state.CallInfo.Ref(); if (UseContext) { @@ -591,7 +575,6 @@ public: return value.Release(); } - SET_MEMORY_CONTEXT; TPAllocScope call; auto& state = GetState(compCtx); auto& callInfo1 = state.CallInfo1.Ref(); @@ -794,7 +777,6 @@ public: case NUdf::EDataSlot::String: case NUdf::EDataSlot::Utf8: { const auto& ref = value.AsStringRef(); - SET_MEMORY_CONTEXT; return PointerDatumToPod((Datum)MakeVar(ref)); } default: @@ -812,7 +794,6 @@ private: TComputationNodeFactory GetPgFactory() { return [] (TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { - pg_thread_init(); TStringBuf name = callable.GetType()->GetName(); if (name == "PgConst") { const auto typeIdData = AS_VALUE(TDataLiteral, callable.GetInput(0)); @@ -976,7 +957,6 @@ void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::T break; } default: - SET_MEMORY_CONTEXT; TPAllocScope call; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); FmgrInfo finfo; @@ -1049,7 +1029,6 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v break; } default: - SET_MEMORY_CONTEXT; TPAllocScope call; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); FmgrInfo finfo; @@ -1123,19 +1102,16 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { case TEXTOID: { CHECK_EXPECTED(cmd, StringMarker); auto s = buf.ReadYtString(); - SET_MEMORY_CONTEXT; auto ret = MakeVar(s); return PointerDatumToPod((Datum)ret); } case CSTRINGOID: { CHECK_EXPECTED(cmd, StringMarker); auto s = buf.ReadYtString(); - SET_MEMORY_CONTEXT; auto ret = MakeCString(s); return PointerDatumToPod((Datum)ret); } default: - SET_MEMORY_CONTEXT; TPAllocScope call; auto s = buf.ReadYtString(); StringInfoData stringInfo; @@ -1212,7 +1188,6 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm ui32 size; buf.ReadMany((char*)&size, sizeof(size)); CHECK_STRING_LENGTH_UNSIGNED(size); - SET_MEMORY_CONTEXT; text* s = (text*)palloc(size + VARHDRSZ); auto mem = s; Y_DEFER { @@ -1232,7 +1207,6 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm ui32 size; buf.ReadMany((char*)&size, sizeof(size)); CHECK_STRING_LENGTH_UNSIGNED(size); - SET_MEMORY_CONTEXT; char* s = (char*)palloc(size + 1); auto mem = s; Y_DEFER { @@ -1248,7 +1222,6 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm return PointerDatumToPod((Datum)s); } default: - SET_MEMORY_CONTEXT; TPAllocScope call; ui32 size; buf.ReadMany((char*)&size, sizeof(size)); @@ -1348,7 +1321,6 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe break; } default: - SET_MEMORY_CONTEXT; TPAllocScope call; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); FmgrInfo finfo; @@ -1492,7 +1464,6 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe break; } default: - SET_MEMORY_CONTEXT; TPAllocScope call; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); FmgrInfo finfo; @@ -1552,7 +1523,6 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { case BYTEAOID: case VARCHAROID: case TEXTOID: { - SET_MEMORY_CONTEXT; auto size = NDetails::UnpackUInt32(buf); MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small"); const char* ptr = buf.data(); @@ -1561,7 +1531,6 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { return PointerDatumToPod((Datum)ret); } case CSTRINGOID: { - SET_MEMORY_CONTEXT; auto size = NDetails::UnpackUInt32(buf); MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small"); const char* ptr = buf.data(); @@ -1570,7 +1539,6 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) { return PointerDatumToPod((Datum)ret); } default: - SET_MEMORY_CONTEXT; TPAllocScope call; auto size = NDetails::UnpackUInt32(buf); MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small"); @@ -1652,7 +1620,6 @@ void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVect break; } default: - SET_MEMORY_CONTEXT; TPAllocScope call; const auto& typeInfo = NPg::LookupType(type->GetTypeId()); FmgrInfo finfo; @@ -1713,14 +1680,12 @@ NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVect case TEXTOID: { buffer.clear(); const auto s = NDetail::DecodeString<false>(input, buffer); - SET_MEMORY_CONTEXT; auto ret = MakeVar(s); return PointerDatumToPod((Datum)ret); } case CSTRINGOID: { buffer.clear(); const auto s = NDetail::DecodeString<false>(input, buffer); - SET_MEMORY_CONTEXT; auto ret = MakeCString(s); return PointerDatumToPod((Datum)ret); } @@ -1767,13 +1732,13 @@ void* PgInitializeContext(const std::string_view& contextType) { *(NodeTag*)ctx = T_AggState; ctx->curaggcontext = (ExprContext*)MKQLAllocWithSize(sizeof(ExprContext)); Zero(*ctx->curaggcontext); - ctx->curaggcontext->ecxt_per_tuple_memory = TMkqlPgAdapter::Instance(); + ctx->curaggcontext->ecxt_per_tuple_memory = (MemoryContext)&((TMainContext*)TlsAllocState->MainContext)->Data; return ctx; } else if (contextType == "WinAgg") { auto ctx = (WindowAggState*)MKQLAllocWithSize(sizeof(WindowAggState)); Zero(*ctx); *(NodeTag*)ctx = T_WindowAggState; - ctx->curaggcontext = TMkqlPgAdapter::Instance(); + ctx->curaggcontext = (MemoryContext)&((TMainContext*)TlsAllocState->MainContext)->Data; return ctx; } else { ythrow yexception() << "Unsupported context type: " << contextType; @@ -1807,7 +1772,6 @@ public: } ui64 Hash(NUdf::TUnboxedValuePod lhs) const override { - SET_MEMORY_CONTEXT; LOCAL_FCINFO(callInfo, 1); Zero(*callInfo); callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoHash); @@ -1861,7 +1825,6 @@ public: } bool Less(NUdf::TUnboxedValuePod lhs, NUdf::TUnboxedValuePod rhs) const override { - SET_MEMORY_CONTEXT; LOCAL_FCINFO(callInfo, 2); Zero(*callInfo); callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoLess); @@ -1893,7 +1856,6 @@ public: } int Compare(NUdf::TUnboxedValuePod lhs, NUdf::TUnboxedValuePod rhs) const override { - SET_MEMORY_CONTEXT; LOCAL_FCINFO(callInfo, 2); Zero(*callInfo); callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoCompare); @@ -1951,7 +1913,6 @@ public: } bool Equals(NUdf::TUnboxedValuePod lhs, NUdf::TUnboxedValuePod rhs) const override { - SET_MEMORY_CONTEXT; LOCAL_FCINFO(callInfo, 2); Zero(*callInfo); callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoEquate); @@ -1993,6 +1954,38 @@ NUdf::IEquate::TPtr MakePgEquate(const NMiniKQL::TPgType* type) { return new TPgEquate(type); } +void* PgInitializeMainContext() { + auto ctx = (TMainContext*)malloc(sizeof(TMainContext)); + MemoryContextCreate((MemoryContext)&ctx->Data, + T_AllocSetContext, + &MkqlMethods, + nullptr, + "mkql"); + return ctx; +} + +void PgDestroyMainContext(void* ctx) { + free(ctx); +} + +void PgAcquireThreadContext(void* ctx) { + if (ctx) { + pg_thread_init(); + auto main = (TMainContext*)ctx; + main->PrevCurrentMemoryContext = CurrentMemoryContext; + main->PrevErrorContext = ErrorContext; + CurrentMemoryContext = ErrorContext = (MemoryContext)&main->Data; + } +} + +void PgReleaseThreadContext(void* ctx) { + if (ctx) { + auto main = (TMainContext*)ctx; + CurrentMemoryContext = main->PrevCurrentMemoryContext; + ErrorContext = main->PrevErrorContext; + } +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/parser/pg_wrapper/parser.cpp b/ydb/library/yql/parser/pg_wrapper/parser.cpp index b83d1978354..4f13ed66c72 100644 --- a/ydb/library/yql/parser/pg_wrapper/parser.cpp +++ b/ydb/library/yql/parser/pg_wrapper/parser.cpp @@ -233,21 +233,23 @@ static struct TGlobalInit { void PGParse(const TString& input, IPGParseEvents& events) { pg_thread_init(); - MemoryContext ctx = NULL; PgQueryInternalParsetreeAndError parsetree_and_error; + auto prevCurrentMemoryContext = CurrentMemoryContext; + auto prevErrorContext = ErrorContext; + CurrentMemoryContext = (MemoryContext)malloc(sizeof(MemoryContextData)); MemoryContextCreate(CurrentMemoryContext, T_AllocSetContext, &MyMethods, nullptr, - "yql"); + "parser"); ErrorContext = CurrentMemoryContext; Y_DEFER { free(CurrentMemoryContext); - CurrentMemoryContext = nullptr; - ErrorContext = nullptr; + CurrentMemoryContext = prevCurrentMemoryContext; + ErrorContext = prevErrorContext; }; TAlloc alloc; diff --git a/ydb/library/yql/parser/pg_wrapper/verify.sh b/ydb/library/yql/parser/pg_wrapper/verify.sh index f562e60202b..870a7ab93f8 100755 --- a/ydb/library/yql/parser/pg_wrapper/verify.sh +++ b/ydb/library/yql/parser/pg_wrapper/verify.sh @@ -7,7 +7,7 @@ ya make || exit $? echo -n "Checking static variables: " data=$(objdump libyql-parser-pg_wrapper.a -t | grep -E "\.data\.|\.bss\." | \ -grep -v -E "progname|pg_popcount32|pg_popcount64|pg_comp_crc32c|TMkqlPgAdapter|_ZN4NYqlL10GlobalInitE|BlockSig|StartupBlockSig|UnBlockSig" | \ +grep -v -E "progname|pg_popcount32|pg_popcount64|pg_comp_crc32c|_ZN4NYqlL10GlobalInitE|BlockSig|StartupBlockSig|UnBlockSig" | \ grep -v -E "local_my_wait_event_info|my_wait_event_info|maxSems|nextSemKey|numSems|sharedSemas|AnonymousShmem|AnonymousShmemSize" | \ grep -v -E "UsedShmemSegAddr|UsedShmemSegID|_ZN4NYql11TVPtrHolder8InstanceE" | \ grep -v -E "on_proc_exit_index|on_shmem_exit_index|before_shmem_exit_index") diff --git a/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt index 90e056f401d..7b9ad9f1f12 100644 --- a/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt +++ b/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-library-yql-public-udf-ut PUBLIC cpp-testing-unittest_main yql-public-udf udf-service-exception_policy + yql-sql-pg_dummy ) target_sources(ydb-library-yql-public-udf-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/udf_counter_ut.cpp diff --git a/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt b/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt index 1cfaa0bb179..51a6d1255cc 100644 --- a/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt +++ b/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-library-yql-public-udf-ut PUBLIC cpp-testing-unittest_main yql-public-udf udf-service-exception_policy + yql-sql-pg_dummy ) target_sources(ydb-library-yql-public-udf-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/udf_counter_ut.cpp diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 8d777c9a490..9ca7fa1d151 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -76,6 +76,22 @@ extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikim namespace NKikimr { namespace NMiniKQL { +void* PgInitializeMainContext() { + return nullptr; +} + +void PgDestroyMainContext(void* ctx) { + Y_UNUSED(ctx); +} + +void PgAcquireThreadContext(void* ctx) { + Y_UNUSED(ctx); +} + +void PgReleaseThreadContext(void* ctx) { + Y_UNUSED(ctx); +} + void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffer& buf) { Y_UNUSED(type); Y_UNUSED(value); |
