diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-02-02 14:56:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-02 14:56:25 +0300 |
commit | 46b9294803d39012d50d2ccbba1172dd01f9297b (patch) | |
tree | 613afecb0976bd510932c5277cfd46a656e062d5 | |
parent | 645f5bd38971d748dcb1460aa3a0ec1c57ce5025 (diff) | |
download | ydb-46b9294803d39012d50d2ccbba1172dd01f9297b.tar.gz |
correct scripts initialization (#14012)
5 files changed, 83 insertions, 34 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index de64e86645..bfbf637921 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -173,6 +173,64 @@ public: ui32 Execute(const ui32 startStepIdx, const std::shared_ptr<IDataSource>& source) const; }; +class TFetchingScriptOwner: TNonCopyable { +private: + TAtomic InitializationDetector = 0; + std::shared_ptr<TFetchingScript> Script; + + void FinishInitialization(std::shared_ptr<TFetchingScript>&& script) { + AFL_VERIFY(AtomicCas(&InitializationDetector, 1, 2)); + Script = std::move(script); + } + +public: + const std::shared_ptr<TFetchingScript>& GetScriptVerified() const { + AFL_VERIFY(Script); + return Script; + } + + TString DebugString() const { + if (Script) { + return TStringBuilder() << Script->DebugString() << Endl; + } else { + return TStringBuilder() << "NO_SCRIPT" << Endl; + } + } + + bool HasScript() const { + return !!Script; + } + + bool NeedInitialization() const { + return AtomicGet(InitializationDetector) != 1; + } + + class TInitializationGuard: TNonCopyable { + private: + TFetchingScriptOwner& Owner; + + public: + TInitializationGuard(TFetchingScriptOwner& owner) + : Owner(owner) { + Owner.StartInitialization(); + } + void InitializationFinished(std::shared_ptr<TFetchingScript>&& script) { + Owner.FinishInitialization(std::move(script)); + } + ~TInitializationGuard() { + AFL_VERIFY(!Owner.NeedInitialization()); + } + }; + + std::optional<TInitializationGuard> StartInitialization() { + if (AtomicCas(&InitializationDetector, 2, 0)) { + return std::optional<TInitializationGuard>(*this); + } else { + return std::nullopt; + } + } +}; + class TColumnsAccumulator { private: TColumnsSetIds FetchingReadyColumns; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index bd7816f177..53c370af40 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -59,22 +59,18 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c } } { - auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] + auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]; - if (!result) { - TGuard<TMutex> wg(Mutex); - result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] - [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0]; - if (!result) { - result = BuildColumnsFetchingPlan( - needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions); - CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0] - [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0] = result; + if (result.NeedInitialization()) { + TGuard<TMutex> g(Mutex); + if (auto gInit = result.StartInitialization()) { + gInit->InitializationFinished(BuildColumnsFetchingPlan( + needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions)); } + AFL_VERIFY(!result.NeedInitialization()); } - AFL_VERIFY(result); - if (*result) { - return *result; + if (result.HasScript()) { + return result.GetScriptVerified(); } else { std::shared_ptr<TFetchingScript> result = std::make_shared<TFetchingScript>(*this); result->SetBranchName("FAKE"); @@ -234,9 +230,9 @@ TString TSpecialReadContext::ProfileDebugString() const { }; for (ui32 i = 0; i < (1 << 6); ++i) { - auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)]; - if (script && *script) { - sb << (*script)->DebugString() << ";"; + auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)]; + if (script.HasScript()) { + sb << script.DebugString() << ";"; } } return sb; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h index 37ca2b265c..4d35be2eca 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h @@ -28,7 +28,7 @@ private: std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; TMutex Mutex; - std::array<std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>, 2> + std::array<std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts; std::shared_ptr<TFetchingScript> AskAccumulatorsScript; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index a14f1c17c9..bfc3d549a9 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -48,21 +48,17 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c } } { - auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0] + auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0] [hasDeletions ? 1 : 0]; - if (!result) { - TGuard<TMutex> wg(Mutex); - result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0] - [hasDeletions ? 1 : 0]; - if (!result) { - result = BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions); - CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0] - [hasDeletions ? 1 : 0] = result; + if (result.NeedInitialization()) { + TGuard<TMutex> g(Mutex); + if (auto gInit = result.StartInitialization()) { + gInit->InitializationFinished( + BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions)); } + AFL_VERIFY(!result.NeedInitialization()); } - AFL_VERIFY(result); - AFL_VERIFY(*result); - return *result; + return result.GetScriptVerified(); } } @@ -144,9 +140,9 @@ TString TSpecialReadContext::ProfileDebugString() const { }; for (ui32 i = 0; i < (1 << 5); ++i) { - auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)]; - if (script && *script) { - sb << (*script)->DebugString() << ";"; + auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)]; + if (script.HasScript()) { + sb << script.DebugString() << ";"; } } return sb; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h index d1dd942a61..f4e3d0ab36 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h @@ -23,8 +23,7 @@ private: std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const; TMutex Mutex; - std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2> - CacheFetchingScripts; + std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts; std::shared_ptr<TFetchingScript> AskAccumulatorsScript; virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<NCommon::IDataSource>& source) override; |