summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <[email protected]>2022-03-31 20:31:29 +0300
committervvvv <[email protected]>2022-03-31 20:31:29 +0300
commitfa4b82d3614d68d8c7ede2bf4f58ffa9f9487784 (patch)
tree9aa7b7eb604f042818369009ed8a525cb9b4992c
parentb109b0629a87c94e27eb2b296c7b8c63a446a034 (diff)
YQL-13710 setup pg memory contexts once
ref:16c481951cdfe145829e99a3e94d7e54e3c83a2d
-rw-r--r--ydb/library/yql/minikql/mkql_alloc.h10
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp87
-rw-r--r--ydb/library/yql/parser/pg_wrapper/parser.cpp10
-rwxr-xr-xydb/library/yql/parser/pg_wrapper/verify.sh2
-rw-r--r--ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/public/udf/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp16
7 files changed, 75 insertions, 52 deletions
diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h
index 733f6502dea..bf17e527bca 100644
--- a/ydb/library/yql/minikql/mkql_alloc.h
+++ b/ydb/library/yql/minikql/mkql_alloc.h
@@ -57,6 +57,7 @@ struct TAllocState : public TAlignedPagePool
TListEntry OffloadedBlocksRoot;
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
+ void* MainContext = nullptr;
void* CurrentContext = nullptr;
::NKikimr::NUdf::TBoxedValueLink Root;
@@ -74,6 +75,11 @@ struct TAllocState : public TAlignedPagePool
extern Y_POD_THREAD(TAllocState*) TlsAllocState;
+void* PgInitializeMainContext();
+void PgDestroyMainContext(void* ctx);
+void PgAcquireThreadContext(void* ctx);
+void PgReleaseThreadContext(void* ctx);
+
class TPAllocScope {
public:
TPAllocScope() {
@@ -115,6 +121,7 @@ public:
explicit TScopedAlloc(const TAlignedPagePoolCounters& counters = TAlignedPagePoolCounters(), bool supportsSizedAllocators = false)
: MyState_(counters, supportsSizedAllocators)
{
+ MyState_.MainContext = PgInitializeMainContext();
Acquire();
}
@@ -122,6 +129,7 @@ public:
{
MyState_.KillAllBoxed();
Release();
+ PgDestroyMainContext(MyState_.MainContext);
}
TAllocState& Ref() {
@@ -132,6 +140,7 @@ public:
if (!AttachedCount_) {
PrevState_ = TlsAllocState;
TlsAllocState = &MyState_;
+ PgAcquireThreadContext(MyState_.MainContext);
} else {
Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread");
}
@@ -142,6 +151,7 @@ public:
void Release() {
if (AttachedCount_ && --AttachedCount_ == 0) {
Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread");
+ PgReleaseThreadContext(MyState_.MainContext);
TlsAllocState = PrevState_;
PrevState_ = nullptr;
}
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index f9685302763..71b5fe66e09 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -51,6 +51,12 @@ namespace NYql {
using namespace NKikimr::NMiniKQL;
+struct TMainContext {
+ MemoryContextData Data;
+ MemoryContext PrevCurrentMemoryContext = nullptr;
+ MemoryContext PrevErrorContext = nullptr;
+};
+
ui32 GetFullVarSize(const text* s) {
return VARSIZE(s);
}
@@ -201,22 +207,6 @@ const MemoryContextMethods MkqlMethods = {
#endif
};
-struct TMkqlPgAdapter {
- TMkqlPgAdapter() {
- MemoryContextCreate((MemoryContext)&Data,
- T_AllocSetContext,
- &MkqlMethods,
- nullptr,
- "mkql");
- }
-
- static MemoryContext Instance() {
- return (MemoryContext)&Singleton<TMkqlPgAdapter>()->Data;
- }
-
- MemoryContextData Data;
-};
-
class TVPtrHolder {
public:
TVPtrHolder() {
@@ -235,9 +225,6 @@ private:
TVPtrHolder TVPtrHolder::Instance;
-#define SET_MEMORY_CONTEXT \
- CurrentMemoryContext = ErrorContext = TMkqlPgAdapter::Instance();
-
inline ui32 MakeTypeIOParam(const NPg::TTypeDesc& desc) {
return desc.ElementTypeId ? desc.ElementTypeId : desc.TypeId;
}
@@ -261,8 +248,6 @@ public:
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
- SET_MEMORY_CONTEXT;
-
LOCAL_FCINFO(callInfo, 3);
Zero(*callInfo);
callInfo->flinfo = const_cast<FmgrInfo*>(&FInfo);
@@ -402,7 +387,6 @@ public:
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& compCtx) const {
- SET_MEMORY_CONTEXT;
auto& state = GetState(compCtx);
auto& callInfo = state.CallInfo.Ref();
if (UseContext) {
@@ -591,7 +575,6 @@ public:
return value.Release();
}
- SET_MEMORY_CONTEXT;
TPAllocScope call;
auto& state = GetState(compCtx);
auto& callInfo1 = state.CallInfo1.Ref();
@@ -794,7 +777,6 @@ public:
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8: {
const auto& ref = value.AsStringRef();
- SET_MEMORY_CONTEXT;
return PointerDatumToPod((Datum)MakeVar(ref));
}
default:
@@ -812,7 +794,6 @@ private:
TComputationNodeFactory GetPgFactory() {
return [] (TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
- pg_thread_init();
TStringBuf name = callable.GetType()->GetName();
if (name == "PgConst") {
const auto typeIdData = AS_VALUE(TDataLiteral, callable.GetInput(0));
@@ -976,7 +957,6 @@ void WriteYsonValueInTableFormatPg(TOutputBuf& buf, TPgType* type, const NUdf::T
break;
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
FmgrInfo finfo;
@@ -1049,7 +1029,6 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
break;
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
FmgrInfo finfo;
@@ -1123,19 +1102,16 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
case TEXTOID: {
CHECK_EXPECTED(cmd, StringMarker);
auto s = buf.ReadYtString();
- SET_MEMORY_CONTEXT;
auto ret = MakeVar(s);
return PointerDatumToPod((Datum)ret);
}
case CSTRINGOID: {
CHECK_EXPECTED(cmd, StringMarker);
auto s = buf.ReadYtString();
- SET_MEMORY_CONTEXT;
auto ret = MakeCString(s);
return PointerDatumToPod((Datum)ret);
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
auto s = buf.ReadYtString();
StringInfoData stringInfo;
@@ -1212,7 +1188,6 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm
ui32 size;
buf.ReadMany((char*)&size, sizeof(size));
CHECK_STRING_LENGTH_UNSIGNED(size);
- SET_MEMORY_CONTEXT;
text* s = (text*)palloc(size + VARHDRSZ);
auto mem = s;
Y_DEFER {
@@ -1232,7 +1207,6 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm
ui32 size;
buf.ReadMany((char*)&size, sizeof(size));
CHECK_STRING_LENGTH_UNSIGNED(size);
- SET_MEMORY_CONTEXT;
char* s = (char*)palloc(size + 1);
auto mem = s;
Y_DEFER {
@@ -1248,7 +1222,6 @@ NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NComm
return PointerDatumToPod((Datum)s);
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
ui32 size;
buf.ReadMany((char*)&size, sizeof(size));
@@ -1348,7 +1321,6 @@ void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxe
break;
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
FmgrInfo finfo;
@@ -1492,7 +1464,6 @@ void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffe
break;
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
FmgrInfo finfo;
@@ -1552,7 +1523,6 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
case BYTEAOID:
case VARCHAROID:
case TEXTOID: {
- SET_MEMORY_CONTEXT;
auto size = NDetails::UnpackUInt32(buf);
MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small");
const char* ptr = buf.data();
@@ -1561,7 +1531,6 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
return PointerDatumToPod((Datum)ret);
}
case CSTRINGOID: {
- SET_MEMORY_CONTEXT;
auto size = NDetails::UnpackUInt32(buf);
MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small");
const char* ptr = buf.data();
@@ -1570,7 +1539,6 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf) {
return PointerDatumToPod((Datum)ret);
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
auto size = NDetails::UnpackUInt32(buf);
MKQL_ENSURE(size <= buf.size(), "Bad packed data. Buffer too small");
@@ -1652,7 +1620,6 @@ void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVect
break;
}
default:
- SET_MEMORY_CONTEXT;
TPAllocScope call;
const auto& typeInfo = NPg::LookupType(type->GetTypeId());
FmgrInfo finfo;
@@ -1713,14 +1680,12 @@ NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVect
case TEXTOID: {
buffer.clear();
const auto s = NDetail::DecodeString<false>(input, buffer);
- SET_MEMORY_CONTEXT;
auto ret = MakeVar(s);
return PointerDatumToPod((Datum)ret);
}
case CSTRINGOID: {
buffer.clear();
const auto s = NDetail::DecodeString<false>(input, buffer);
- SET_MEMORY_CONTEXT;
auto ret = MakeCString(s);
return PointerDatumToPod((Datum)ret);
}
@@ -1767,13 +1732,13 @@ void* PgInitializeContext(const std::string_view& contextType) {
*(NodeTag*)ctx = T_AggState;
ctx->curaggcontext = (ExprContext*)MKQLAllocWithSize(sizeof(ExprContext));
Zero(*ctx->curaggcontext);
- ctx->curaggcontext->ecxt_per_tuple_memory = TMkqlPgAdapter::Instance();
+ ctx->curaggcontext->ecxt_per_tuple_memory = (MemoryContext)&((TMainContext*)TlsAllocState->MainContext)->Data;
return ctx;
} else if (contextType == "WinAgg") {
auto ctx = (WindowAggState*)MKQLAllocWithSize(sizeof(WindowAggState));
Zero(*ctx);
*(NodeTag*)ctx = T_WindowAggState;
- ctx->curaggcontext = TMkqlPgAdapter::Instance();
+ ctx->curaggcontext = (MemoryContext)&((TMainContext*)TlsAllocState->MainContext)->Data;
return ctx;
} else {
ythrow yexception() << "Unsupported context type: " << contextType;
@@ -1807,7 +1772,6 @@ public:
}
ui64 Hash(NUdf::TUnboxedValuePod lhs) const override {
- SET_MEMORY_CONTEXT;
LOCAL_FCINFO(callInfo, 1);
Zero(*callInfo);
callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoHash);
@@ -1861,7 +1825,6 @@ public:
}
bool Less(NUdf::TUnboxedValuePod lhs, NUdf::TUnboxedValuePod rhs) const override {
- SET_MEMORY_CONTEXT;
LOCAL_FCINFO(callInfo, 2);
Zero(*callInfo);
callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoLess);
@@ -1893,7 +1856,6 @@ public:
}
int Compare(NUdf::TUnboxedValuePod lhs, NUdf::TUnboxedValuePod rhs) const override {
- SET_MEMORY_CONTEXT;
LOCAL_FCINFO(callInfo, 2);
Zero(*callInfo);
callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoCompare);
@@ -1951,7 +1913,6 @@ public:
}
bool Equals(NUdf::TUnboxedValuePod lhs, NUdf::TUnboxedValuePod rhs) const override {
- SET_MEMORY_CONTEXT;
LOCAL_FCINFO(callInfo, 2);
Zero(*callInfo);
callInfo->flinfo = const_cast<FmgrInfo*>(&FInfoEquate);
@@ -1993,6 +1954,38 @@ NUdf::IEquate::TPtr MakePgEquate(const NMiniKQL::TPgType* type) {
return new TPgEquate(type);
}
+void* PgInitializeMainContext() {
+ auto ctx = (TMainContext*)malloc(sizeof(TMainContext));
+ MemoryContextCreate((MemoryContext)&ctx->Data,
+ T_AllocSetContext,
+ &MkqlMethods,
+ nullptr,
+ "mkql");
+ return ctx;
+}
+
+void PgDestroyMainContext(void* ctx) {
+ free(ctx);
+}
+
+void PgAcquireThreadContext(void* ctx) {
+ if (ctx) {
+ pg_thread_init();
+ auto main = (TMainContext*)ctx;
+ main->PrevCurrentMemoryContext = CurrentMemoryContext;
+ main->PrevErrorContext = ErrorContext;
+ CurrentMemoryContext = ErrorContext = (MemoryContext)&main->Data;
+ }
+}
+
+void PgReleaseThreadContext(void* ctx) {
+ if (ctx) {
+ auto main = (TMainContext*)ctx;
+ CurrentMemoryContext = main->PrevCurrentMemoryContext;
+ ErrorContext = main->PrevErrorContext;
+ }
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/library/yql/parser/pg_wrapper/parser.cpp b/ydb/library/yql/parser/pg_wrapper/parser.cpp
index b83d1978354..4f13ed66c72 100644
--- a/ydb/library/yql/parser/pg_wrapper/parser.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/parser.cpp
@@ -233,21 +233,23 @@ static struct TGlobalInit {
void PGParse(const TString& input, IPGParseEvents& events) {
pg_thread_init();
- MemoryContext ctx = NULL;
PgQueryInternalParsetreeAndError parsetree_and_error;
+ auto prevCurrentMemoryContext = CurrentMemoryContext;
+ auto prevErrorContext = ErrorContext;
+
CurrentMemoryContext = (MemoryContext)malloc(sizeof(MemoryContextData));
MemoryContextCreate(CurrentMemoryContext,
T_AllocSetContext,
&MyMethods,
nullptr,
- "yql");
+ "parser");
ErrorContext = CurrentMemoryContext;
Y_DEFER {
free(CurrentMemoryContext);
- CurrentMemoryContext = nullptr;
- ErrorContext = nullptr;
+ CurrentMemoryContext = prevCurrentMemoryContext;
+ ErrorContext = prevErrorContext;
};
TAlloc alloc;
diff --git a/ydb/library/yql/parser/pg_wrapper/verify.sh b/ydb/library/yql/parser/pg_wrapper/verify.sh
index f562e60202b..870a7ab93f8 100755
--- a/ydb/library/yql/parser/pg_wrapper/verify.sh
+++ b/ydb/library/yql/parser/pg_wrapper/verify.sh
@@ -7,7 +7,7 @@ ya make || exit $?
echo -n "Checking static variables: "
data=$(objdump libyql-parser-pg_wrapper.a -t | grep -E "\.data\.|\.bss\." | \
-grep -v -E "progname|pg_popcount32|pg_popcount64|pg_comp_crc32c|TMkqlPgAdapter|_ZN4NYqlL10GlobalInitE|BlockSig|StartupBlockSig|UnBlockSig" | \
+grep -v -E "progname|pg_popcount32|pg_popcount64|pg_comp_crc32c|_ZN4NYqlL10GlobalInitE|BlockSig|StartupBlockSig|UnBlockSig" | \
grep -v -E "local_my_wait_event_info|my_wait_event_info|maxSems|nextSemKey|numSems|sharedSemas|AnonymousShmem|AnonymousShmemSize" | \
grep -v -E "UsedShmemSegAddr|UsedShmemSegID|_ZN4NYql11TVPtrHolder8InstanceE" | \
grep -v -E "on_proc_exit_index|on_shmem_exit_index|before_shmem_exit_index")
diff --git a/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt b/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt
index 90e056f401d..7b9ad9f1f12 100644
--- a/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt
+++ b/ydb/library/yql/public/udf/ut/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-library-yql-public-udf-ut PUBLIC
cpp-testing-unittest_main
yql-public-udf
udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_sources(ydb-library-yql-public-udf-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/udf_counter_ut.cpp
diff --git a/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt b/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt
index 1cfaa0bb179..51a6d1255cc 100644
--- a/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt
+++ b/ydb/library/yql/public/udf/ut/CMakeLists.linux.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-library-yql-public-udf-ut PUBLIC
cpp-testing-unittest_main
yql-public-udf
udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_sources(ydb-library-yql-public-udf-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/public/udf/udf_counter_ut.cpp
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 8d777c9a490..9ca7fa1d151 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -76,6 +76,22 @@ extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikim
namespace NKikimr {
namespace NMiniKQL {
+void* PgInitializeMainContext() {
+ return nullptr;
+}
+
+void PgDestroyMainContext(void* ctx) {
+ Y_UNUSED(ctx);
+}
+
+void PgAcquireThreadContext(void* ctx) {
+ Y_UNUSED(ctx);
+}
+
+void PgReleaseThreadContext(void* ctx) {
+ Y_UNUSED(ctx);
+}
+
void PGPackImpl(const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffer& buf) {
Y_UNUSED(type);
Y_UNUSED(value);