aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-02-02 14:56:25 +0300
committerGitHub <noreply@github.com>2025-02-02 14:56:25 +0300
commit46b9294803d39012d50d2ccbba1172dd01f9297b (patch)
tree613afecb0976bd510932c5277cfd46a656e062d5
parent645f5bd38971d748dcb1460aa3a0ec1c57ce5025 (diff)
downloadydb-46b9294803d39012d50d2ccbba1172dd01f9297b.tar.gz
correct scripts initialization (#14012)
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h58
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h3
5 files changed, 83 insertions, 34 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
index de64e86645..bfbf637921 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h
@@ -173,6 +173,64 @@ public:
ui32 Execute(const ui32 startStepIdx, const std::shared_ptr<IDataSource>& source) const;
};
+class TFetchingScriptOwner: TNonCopyable {
+private:
+ TAtomic InitializationDetector = 0;
+ std::shared_ptr<TFetchingScript> Script;
+
+ void FinishInitialization(std::shared_ptr<TFetchingScript>&& script) {
+ AFL_VERIFY(AtomicCas(&InitializationDetector, 1, 2));
+ Script = std::move(script);
+ }
+
+public:
+ const std::shared_ptr<TFetchingScript>& GetScriptVerified() const {
+ AFL_VERIFY(Script);
+ return Script;
+ }
+
+ TString DebugString() const {
+ if (Script) {
+ return TStringBuilder() << Script->DebugString() << Endl;
+ } else {
+ return TStringBuilder() << "NO_SCRIPT" << Endl;
+ }
+ }
+
+ bool HasScript() const {
+ return !!Script;
+ }
+
+ bool NeedInitialization() const {
+ return AtomicGet(InitializationDetector) != 1;
+ }
+
+ class TInitializationGuard: TNonCopyable {
+ private:
+ TFetchingScriptOwner& Owner;
+
+ public:
+ TInitializationGuard(TFetchingScriptOwner& owner)
+ : Owner(owner) {
+ Owner.StartInitialization();
+ }
+ void InitializationFinished(std::shared_ptr<TFetchingScript>&& script) {
+ Owner.FinishInitialization(std::move(script));
+ }
+ ~TInitializationGuard() {
+ AFL_VERIFY(!Owner.NeedInitialization());
+ }
+ };
+
+ std::optional<TInitializationGuard> StartInitialization() {
+ if (AtomicCas(&InitializationDetector, 2, 0)) {
+ return std::optional<TInitializationGuard>(*this);
+ } else {
+ return std::nullopt;
+ }
+ }
+};
+
class TColumnsAccumulator {
private:
TColumnsSetIds FetchingReadyColumns;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
index bd7816f177..53c370af40 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp
@@ -59,22 +59,18 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
}
}
{
- auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
+ auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
- if (!result) {
- TGuard<TMutex> wg(Mutex);
- result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
- [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
- if (!result) {
- result = BuildColumnsFetchingPlan(
- needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions);
- CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
- [needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0] = result;
+ if (result.NeedInitialization()) {
+ TGuard<TMutex> g(Mutex);
+ if (auto gInit = result.StartInitialization()) {
+ gInit->InitializationFinished(BuildColumnsFetchingPlan(
+ needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions));
}
+ AFL_VERIFY(!result.NeedInitialization());
}
- AFL_VERIFY(result);
- if (*result) {
- return *result;
+ if (result.HasScript()) {
+ return result.GetScriptVerified();
} else {
std::shared_ptr<TFetchingScript> result = std::make_shared<TFetchingScript>(*this);
result->SetBranchName("FAKE");
@@ -234,9 +230,9 @@ TString TSpecialReadContext::ProfileDebugString() const {
};
for (ui32 i = 0; i < (1 << 6); ++i) {
- auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)];
- if (script && *script) {
- sb << (*script)->DebugString() << ";";
+ auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)];
+ if (script.HasScript()) {
+ sb << script.DebugString() << ";";
}
}
return sb;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h
index 37ca2b265c..4d35be2eca 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h
@@ -28,7 +28,7 @@ private:
std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource,
const bool partialUsageByPredicate, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
TMutex Mutex;
- std::array<std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>, 2>
+ std::array<std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2>, 2>
CacheFetchingScripts;
std::shared_ptr<TFetchingScript> AskAccumulatorsScript;
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
index a14f1c17c9..bfc3d549a9 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
@@ -48,21 +48,17 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
}
}
{
- auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
+ auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
[hasDeletions ? 1 : 0];
- if (!result) {
- TGuard<TMutex> wg(Mutex);
- result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
- [hasDeletions ? 1 : 0];
- if (!result) {
- result = BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions);
- CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
- [hasDeletions ? 1 : 0] = result;
+ if (result.NeedInitialization()) {
+ TGuard<TMutex> g(Mutex);
+ if (auto gInit = result.StartInitialization()) {
+ gInit->InitializationFinished(
+ BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions));
}
+ AFL_VERIFY(!result.NeedInitialization());
}
- AFL_VERIFY(result);
- AFL_VERIFY(*result);
- return *result;
+ return result.GetScriptVerified();
}
}
@@ -144,9 +140,9 @@ TString TSpecialReadContext::ProfileDebugString() const {
};
for (ui32 i = 0; i < (1 << 5); ++i) {
- auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)];
- if (script && *script) {
- sb << (*script)->DebugString() << ";";
+ auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)];
+ if (script.HasScript()) {
+ sb << script.DebugString() << ";";
}
}
return sb;
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
index d1dd942a61..f4e3d0ab36 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
@@ -23,8 +23,7 @@ private:
std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt,
const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
TMutex Mutex;
- std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>
- CacheFetchingScripts;
+ std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;
std::shared_ptr<TFetchingScript> AskAccumulatorsScript;
virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<NCommon::IDataSource>& source) override;