aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-03-05 17:25:47 +0300
committerGitHub <noreply@github.com>2024-03-05 17:25:47 +0300
commitae4590afe2997e7154a290eb72f2d365cce0fafb (patch)
treebce6d21cdf3293218bafeeb5aa56961a4dfd3d83
parentac8e6193476f9d04caa460aaee0be01ba9634229 (diff)
downloadydb-ae4590afe2997e7154a290eb72f2d365cce0fafb.tar.gz
fix race in case abort and merge processing in time (#2463)
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h22
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h2
3 files changed, 7 insertions, 19 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index dce2a8a3e1..dee65b0bda 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -146,7 +146,7 @@ void TFetchingInterval::ConstructResult() {
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx);
}
- if (AtomicCas(&ResultConstructionInProgress, 1, 0)) {
+ if (AtomicCas(&SourcesFinalized, 1, 0)) {
auto task = std::make_shared<TMergeTask>(std::move(MergingContext), Context, std::move(Sources));
task->SetPriority(NConveyor::ITask::EPriority::High);
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
index 50f7fe76a8..c7d3687229 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h
@@ -48,26 +48,12 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe
private:
using TTaskBase = NResourceBroker::NSubscribe::ITask;
std::shared_ptr<TMergingContext> MergingContext;
- TAtomic ResultConstructionInProgress = 0;
+ TAtomic SourcesFinalized = 0;
std::shared_ptr<TSpecialReadContext> Context;
NColumnShard::TCounterGuard TaskGuard;
std::map<ui32, std::shared_ptr<IDataSource>> Sources;
void ConstructResult();
- IDataSource& GetSourceVerified(const ui32 idx) {
- auto it = Sources.find(idx);
- Y_ABORT_UNLESS(it != Sources.end());
- return *it->second;
- }
-
- std::shared_ptr<IDataSource> ExtractSourceVerified(const ui32 idx) {
- auto it = Sources.find(idx);
- Y_ABORT_UNLESS(it != Sources.end());
- auto result = it->second;
- Sources.erase(it);
- return result;
- }
-
std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
const ui32 IntervalIdx;
TAtomicCounter ReadySourcesCount = 0;
@@ -91,8 +77,10 @@ public:
}
void Abort() {
- for (auto&& i : Sources) {
- i.second->Abort();
+ if (AtomicCas(&SourcesFinalized, 1, 0)) {
+ for (auto&& i : Sources) {
+ i.second->Abort();
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
index 13e1f25f0d..8b5e138be8 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h
@@ -27,7 +27,7 @@ protected:
return sb;
}
- virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) override;
+ virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
virtual bool DoReadNextInterval() override;
virtual void DoAbort() override {