diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-03-05 17:25:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-05 17:25:47 +0300 |
commit | ae4590afe2997e7154a290eb72f2d365cce0fafb (patch) | |
tree | bce6d21cdf3293218bafeeb5aa56961a4dfd3d83 | |
parent | ac8e6193476f9d04caa460aaee0be01ba9634229 (diff) | |
download | ydb-ae4590afe2997e7154a290eb72f2d365cce0fafb.tar.gz |
fix race in case abort and merge processing in time (#2463)
3 files changed, 7 insertions, 19 deletions
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index dce2a8a3e1..dee65b0bda 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -146,7 +146,7 @@ void TFetchingInterval::ConstructResult() { } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "start_construct_result")("interval_idx", IntervalIdx); } - if (AtomicCas(&ResultConstructionInProgress, 1, 0)) { + if (AtomicCas(&SourcesFinalized, 1, 0)) { auto task = std::make_shared<TMergeTask>(std::move(MergingContext), Context, std::move(Sources)); task->SetPriority(NConveyor::ITask::EPriority::High); NConveyor::TScanServiceOperator::SendTaskToExecute(task); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h index 50f7fe76a8..c7d3687229 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.h @@ -48,26 +48,12 @@ class TFetchingInterval: public TNonCopyable, public NResourceBroker::NSubscribe private: using TTaskBase = NResourceBroker::NSubscribe::ITask; std::shared_ptr<TMergingContext> MergingContext; - TAtomic ResultConstructionInProgress = 0; + TAtomic SourcesFinalized = 0; std::shared_ptr<TSpecialReadContext> Context; NColumnShard::TCounterGuard TaskGuard; std::map<ui32, std::shared_ptr<IDataSource>> Sources; void ConstructResult(); - IDataSource& GetSourceVerified(const ui32 idx) { - auto it = Sources.find(idx); - Y_ABORT_UNLESS(it != Sources.end()); - return *it->second; - } - - std::shared_ptr<IDataSource> ExtractSourceVerified(const ui32 idx) { - auto it = Sources.find(idx); - Y_ABORT_UNLESS(it != Sources.end()); - auto result = it->second; - Sources.erase(it); - return result; - } - std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard; const ui32 IntervalIdx; TAtomicCounter ReadySourcesCount = 0; @@ -91,8 +77,10 @@ public: } void Abort() { - for (auto&& i : Sources) { - i.second->Abort(); + if (AtomicCas(&SourcesFinalized, 1, 0)) { + for (auto&& i : Sources) { + i.second->Abort(); + } } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h index 13e1f25f0d..8b5e138be8 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/plain_read_data.h @@ -27,7 +27,7 @@ protected: return sb; } - virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) override; + virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override; virtual bool DoReadNextInterval() override; virtual void DoAbort() override { |