diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 08:11:14 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 08:11:14 +0300 |
commit | 1818d24064c3177c8ca6226b878f3f69d90965dc (patch) | |
tree | a60d63016c6b41fd9d7dfcae0413ab0e88377d17 | |
parent | f577e5f412ee2a58aa7121733735fc8dd5c0fa2e (diff) | |
download | ydb-1818d24064c3177c8ca6226b878f3f69d90965dc.tar.gz |
correct condition for multiple metadata
4 files changed, 25 insertions, 4 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 45edb94bd0..337eb9fd0e 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -46,7 +46,7 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data } else { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); if (cmt.empty()) { - return; // ignore duplicates + return; // ignore duplicates from another read metadata ranges } const NOlap::TCommittedBlob& cmtBlob = cmt.key(); ui32 batchNo = cmt.mapped(); @@ -112,7 +112,7 @@ TColumnShardScanIterator::~TColumnShardScanIterator() { } void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) { - if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped()) { + if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped() || !task->IsSameProcessor(DataTasksProcessor)) { return; } Y_VERIFY(task->Apply(IndexedData.GetGranulesContext())); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 56497ad8b0..ec1ddd2cd8 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -436,7 +436,7 @@ private: void NextReadMetadata() { auto g = Stats.MakeGuard("NextReadMetadata"); ScanIterator.reset(); - + Stats.NextReadMetadata(); ++ReadMetadataIndex; if (ReadMetadataIndex == ReadMetadataRanges.size()) { @@ -640,8 +640,14 @@ private: THashMap<TString, TInstant> StartGuards; THashMap<TString, TInstant> SectionFirst; THashMap<TString, TInstant> SectionLast; + bool FirstMetadataCurrently = true; public: + void NextReadMetadata() { + StartBlobRequest.clear(); + FirstMetadataCurrently = false; + } + TString DebugString() const { const TInstant now = TInstant::Now(); TStringBuilder sb; @@ -722,7 +728,11 @@ private: void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) { auto it = StartBlobRequest.find(br); - Y_VERIFY(it != StartBlobRequest.end()); + if (FirstMetadataCurrently) { + Y_VERIFY(it != StartBlobRequest.end()); + } else if (it == StartBlobRequest.end()) { + return; + } const TDuration d = replyInstant - it->second; if (fromCache) { CacheBlobs.Received(br, d); diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index a26e4daf1b..7da0a05e6d 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -26,6 +26,10 @@ TDataTasksProcessorContainer IDataTasksProcessor::ITask::GetTasksProcessorContai return TDataTasksProcessorContainer(OwnerOperator); } +bool IDataTasksProcessor::ITask::IsSameProcessor(const TDataTasksProcessorContainer& receivedProcessor) const { + return receivedProcessor.IsSameProcessor(GetTasksProcessorContainer()); +} + bool IDataTasksProcessor::Add(ITask::TPtr task) { if (IsStopped()) { return false; diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h index fa0a8b1ce2..040a2fe730 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -32,6 +32,9 @@ public: : OwnerOperator(ownerOperator) { } + + bool IsSameProcessor(const TDataTasksProcessorContainer& receivedProcessor) const; + using TPtr = std::shared_ptr<ITask>; virtual ~ITask() = default; bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const; @@ -70,6 +73,10 @@ public: } + bool IsSameProcessor(const TDataTasksProcessorContainer& container) const { + return (ui64)Object.get() == (ui64)container.Object.get(); + } + void Stop() { if (Object) { Object->Stop(); |