diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-04-26 21:35:42 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-04-26 21:35:42 +0300 |
commit | 92c6a57a032dd359d57a40d5a05a19e10deb0360 (patch) | |
tree | f07b0c22efb90fe33e89a316a77903c26b7c22b6 | |
parent | adc1bef0e6c8d177a6e2fecb7c822696582b29df (diff) | |
download | ydb-92c6a57a032dd359d57a40d5a05a19e10deb0360.tar.gz |
YQ-727 Use mkql force aquire & release.
ref:4baab85ae0f41c83c868808d880f13d096708675
-rw-r--r-- | ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_alloc.cpp | 21 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_alloc.h | 22 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 20 |
5 files changed, 38 insertions, 27 deletions
diff --git a/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt b/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt index f0101fd708..324ba024e0 100644 --- a/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt +++ b/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tablet_flat-ut_large PUBLIC test-libs-exec test-libs-table udf-service-exception_policy + yql-sql-pg_dummy ) target_link_options(ydb-core-tablet_flat-ut_large PRIVATE -Wl,-no_deduplicate diff --git a/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt b/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt index 00687a11c6..8cabc32a13 100644 --- a/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt +++ b/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-tablet_flat-ut_large PUBLIC test-libs-exec test-libs-table udf-service-exception_policy + yql-sql-pg_dummy ) target_link_options(ydb-core-tablet_flat-ut_large PRIVATE -ldl diff --git a/ydb/library/yql/minikql/mkql_alloc.cpp b/ydb/library/yql/minikql/mkql_alloc.cpp index e691bf13ee..edaedd39b6 100644 --- a/ydb/library/yql/minikql/mkql_alloc.cpp +++ b/ydb/library/yql/minikql/mkql_alloc.cpp @@ -94,6 +94,27 @@ size_t TAllocState::GetDeallocatedInPages() const { return deallocated; } +void TScopedAlloc::Acquire() { + if (!AttachedCount_) { + PrevState_ = TlsAllocState; + TlsAllocState = &MyState_; + PgAcquireThreadContext(MyState_.MainContext); + } else { + Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread"); + } + + ++AttachedCount_; +} + +void TScopedAlloc::Release() { + if (AttachedCount_ && --AttachedCount_ == 0) { + Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread"); + PgReleaseThreadContext(MyState_.MainContext); + TlsAllocState = PrevState_; + PrevState_ = nullptr; + } +} + void* MKQLAllocSlow(size_t sz, TAllocState* state) { auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT); auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize); diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index 33d02f3a77..62fb1771df 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -136,26 +136,8 @@ public: return MyState_; } - void Acquire() { - if (!AttachedCount_) { - PrevState_ = TlsAllocState; - TlsAllocState = &MyState_; - PgAcquireThreadContext(MyState_.MainContext); - } else { - Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread"); - } - - ++AttachedCount_; - } - - void Release() { - if (AttachedCount_ && --AttachedCount_ == 0) { - Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread"); - PgReleaseThreadContext(MyState_.MainContext); - TlsAllocState = PrevState_; - PrevState_ = nullptr; - } - } + void Acquire(); + void Release(); size_t GetUsed() const { return MyState_.GetUsed(); } size_t GetPeakUsed() const { return MyState_.GetPeakUsed(); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index b53f3dd3b1..715f174928 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -297,11 +297,12 @@ public: return false; TAllocState *const allocState = TlsAllocState; - Y_VERIFY(allocState == AllocState, "Wrong TLS alloc state pre check."); - TypeEnv.GetAllocator().Release(); + PgReleaseThreadContext(allocState->MainContext); + TlsAllocState = nullptr; const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); - TypeEnv.GetAllocator().Acquire(); - Y_VERIFY(allocState == AllocState, "Wrong TLS alloc state post check."); + TlsAllocState = allocState; + PgAcquireThreadContext(allocState->MainContext); + switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadFinished::EventType: Finished = true; @@ -326,8 +327,14 @@ private: const auto randStub = CreateDeterministicRandomProvider(1); const auto timeStub = CreateDeterministicTimeProvider(10000000); - const auto alloc = TypeEnv.BindAllocator(); - AllocState = TlsAllocState; + Y_VERIFY(!TlsAllocState); + TlsAllocState = &TypeEnv.GetAllocator().Ref(); + PgAcquireThreadContext(TypeEnv.GetAllocator().Ref().MainContext); + Y_DEFER{ + PgReleaseThreadContext(TypeEnv.GetAllocator().Ref().MainContext); + TlsAllocState = nullptr; + }; + const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry); TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String))); @@ -371,7 +378,6 @@ private: const TString Format, RowType, Compression; const NActors::TActorId ComputeActorId; TOutput::TPtr Outputs; - TAllocState * AllocState = nullptr; bool Finished = false; }; |