aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-03 08:11:14 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-03 08:11:14 +0300
commit1818d24064c3177c8ca6226b878f3f69d90965dc (patch)
treea60d63016c6b41fd9d7dfcae0413ab0e88377d17
parentf577e5f412ee2a58aa7121733735fc8dd5c0fa2e (diff)
downloadydb-1818d24064c3177c8ca6226b878f3f69d90965dc.tar.gz
correct condition for multiple metadata
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h7
4 files changed, 25 insertions, 4 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 45edb94bd0..337eb9fd0e 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -46,7 +46,7 @@ void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data
} else {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
if (cmt.empty()) {
- return; // ignore duplicates
+ return; // ignore duplicates from another read metadata ranges
}
const NOlap::TCommittedBlob& cmtBlob = cmt.key();
ui32 batchNo = cmt.mapped();
@@ -112,7 +112,7 @@ TColumnShardScanIterator::~TColumnShardScanIterator() {
}
void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
- if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped()) {
+ if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped() || !task->IsSameProcessor(DataTasksProcessor)) {
return;
}
Y_VERIFY(task->Apply(IndexedData.GetGranulesContext()));
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 56497ad8b0..ec1ddd2cd8 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -436,7 +436,7 @@ private:
void NextReadMetadata() {
auto g = Stats.MakeGuard("NextReadMetadata");
ScanIterator.reset();
-
+ Stats.NextReadMetadata();
++ReadMetadataIndex;
if (ReadMetadataIndex == ReadMetadataRanges.size()) {
@@ -640,8 +640,14 @@ private:
THashMap<TString, TInstant> StartGuards;
THashMap<TString, TInstant> SectionFirst;
THashMap<TString, TInstant> SectionLast;
+ bool FirstMetadataCurrently = true;
public:
+ void NextReadMetadata() {
+ StartBlobRequest.clear();
+ FirstMetadataCurrently = false;
+ }
+
TString DebugString() const {
const TInstant now = TInstant::Now();
TStringBuilder sb;
@@ -722,7 +728,11 @@ private:
void BlobReceived(const NBlobCache::TBlobRange& br, const bool fromCache, const TInstant replyInstant) {
auto it = StartBlobRequest.find(br);
- Y_VERIFY(it != StartBlobRequest.end());
+ if (FirstMetadataCurrently) {
+ Y_VERIFY(it != StartBlobRequest.end());
+ } else if (it == StartBlobRequest.end()) {
+ return;
+ }
const TDuration d = replyInstant - it->second;
if (fromCache) {
CacheBlobs.Received(br, d);
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index a26e4daf1b..7da0a05e6d 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -26,6 +26,10 @@ TDataTasksProcessorContainer IDataTasksProcessor::ITask::GetTasksProcessorContai
return TDataTasksProcessorContainer(OwnerOperator);
}
+bool IDataTasksProcessor::ITask::IsSameProcessor(const TDataTasksProcessorContainer& receivedProcessor) const {
+ return receivedProcessor.IsSameProcessor(GetTasksProcessorContainer());
+}
+
bool IDataTasksProcessor::Add(ITask::TPtr task) {
if (IsStopped()) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
index fa0a8b1ce2..040a2fe730 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -32,6 +32,9 @@ public:
: OwnerOperator(ownerOperator) {
}
+
+ bool IsSameProcessor(const TDataTasksProcessorContainer& receivedProcessor) const;
+
using TPtr = std::shared_ptr<ITask>;
virtual ~ITask() = default;
bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const;
@@ -70,6 +73,10 @@ public:
}
+ bool IsSameProcessor(const TDataTasksProcessorContainer& container) const {
+ return (ui64)Object.get() == (ui64)container.Object.get();
+ }
+
void Stop() {
if (Object) {
Object->Stop();