aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-04-26 21:35:42 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-04-26 21:35:42 +0300
commit92c6a57a032dd359d57a40d5a05a19e10deb0360 (patch)
treef07b0c22efb90fe33e89a316a77903c26b7c22b6
parentadc1bef0e6c8d177a6e2fecb7c822696582b29df (diff)
downloadydb-92c6a57a032dd359d57a40d5a05a19e10deb0360.tar.gz
YQ-727 Use mkql force aquire & release.
ref:4baab85ae0f41c83c868808d880f13d096708675
-rw-r--r--ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/minikql/mkql_alloc.cpp21
-rw-r--r--ydb/library/yql/minikql/mkql_alloc.h22
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp20
5 files changed, 38 insertions, 27 deletions
diff --git a/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt b/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt
index f0101fd708..324ba024e0 100644
--- a/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt
+++ b/ydb/core/tablet_flat/ut_large/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-core-tablet_flat-ut_large PUBLIC
test-libs-exec
test-libs-table
udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_link_options(ydb-core-tablet_flat-ut_large PRIVATE
-Wl,-no_deduplicate
diff --git a/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt b/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt
index 00687a11c6..8cabc32a13 100644
--- a/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt
+++ b/ydb/core/tablet_flat/ut_large/CMakeLists.linux.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-tablet_flat-ut_large PUBLIC
test-libs-exec
test-libs-table
udf-service-exception_policy
+ yql-sql-pg_dummy
)
target_link_options(ydb-core-tablet_flat-ut_large PRIVATE
-ldl
diff --git a/ydb/library/yql/minikql/mkql_alloc.cpp b/ydb/library/yql/minikql/mkql_alloc.cpp
index e691bf13ee..edaedd39b6 100644
--- a/ydb/library/yql/minikql/mkql_alloc.cpp
+++ b/ydb/library/yql/minikql/mkql_alloc.cpp
@@ -94,6 +94,27 @@ size_t TAllocState::GetDeallocatedInPages() const {
return deallocated;
}
+void TScopedAlloc::Acquire() {
+ if (!AttachedCount_) {
+ PrevState_ = TlsAllocState;
+ TlsAllocState = &MyState_;
+ PgAcquireThreadContext(MyState_.MainContext);
+ } else {
+ Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread");
+ }
+
+ ++AttachedCount_;
+}
+
+void TScopedAlloc::Release() {
+ if (AttachedCount_ && --AttachedCount_ == 0) {
+ Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread");
+ PgReleaseThreadContext(MyState_.MainContext);
+ TlsAllocState = PrevState_;
+ PrevState_ = nullptr;
+ }
+}
+
void* MKQLAllocSlow(size_t sz, TAllocState* state) {
auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h
index 33d02f3a77..62fb1771df 100644
--- a/ydb/library/yql/minikql/mkql_alloc.h
+++ b/ydb/library/yql/minikql/mkql_alloc.h
@@ -136,26 +136,8 @@ public:
return MyState_;
}
- void Acquire() {
- if (!AttachedCount_) {
- PrevState_ = TlsAllocState;
- TlsAllocState = &MyState_;
- PgAcquireThreadContext(MyState_.MainContext);
- } else {
- Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread");
- }
-
- ++AttachedCount_;
- }
-
- void Release() {
- if (AttachedCount_ && --AttachedCount_ == 0) {
- Y_VERIFY(TlsAllocState == &MyState_, "Mismatch allocator in thread");
- PgReleaseThreadContext(MyState_.MainContext);
- TlsAllocState = PrevState_;
- PrevState_ = nullptr;
- }
- }
+ void Acquire();
+ void Release();
size_t GetUsed() const { return MyState_.GetUsed(); }
size_t GetPeakUsed() const { return MyState_.GetPeakUsed(); }
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index b53f3dd3b1..715f174928 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -297,11 +297,12 @@ public:
return false;
TAllocState *const allocState = TlsAllocState;
- Y_VERIFY(allocState == AllocState, "Wrong TLS alloc state pre check.");
- TypeEnv.GetAllocator().Release();
+ PgReleaseThreadContext(allocState->MainContext);
+ TlsAllocState = nullptr;
const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
- TypeEnv.GetAllocator().Acquire();
- Y_VERIFY(allocState == AllocState, "Wrong TLS alloc state post check.");
+ TlsAllocState = allocState;
+ PgAcquireThreadContext(allocState->MainContext);
+
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadFinished::EventType:
Finished = true;
@@ -326,8 +327,14 @@ private:
const auto randStub = CreateDeterministicRandomProvider(1);
const auto timeStub = CreateDeterministicTimeProvider(10000000);
- const auto alloc = TypeEnv.BindAllocator();
- AllocState = TlsAllocState;
+ Y_VERIFY(!TlsAllocState);
+ TlsAllocState = &TypeEnv.GetAllocator().Ref();
+ PgAcquireThreadContext(TypeEnv.GetAllocator().Ref().MainContext);
+ Y_DEFER{
+ PgReleaseThreadContext(TypeEnv.GetAllocator().Ref().MainContext);
+ TlsAllocState = nullptr;
+ };
+
const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry);
TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String)));
@@ -371,7 +378,6 @@ private:
const TString Format, RowType, Compression;
const NActors::TActorId ComputeActorId;
TOutput::TPtr Outputs;
- TAllocState * AllocState = nullptr;
bool Finished = false;
};