diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-29 20:02:40 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-29 20:02:40 +0300 |
commit | f5e6392913227cb1a4299c7722690a3d64616255 (patch) | |
tree | 78d2605d1fd0d088d580f3ad69a3c89097c80237 | |
parent | 48ac40272844f8e875067d7e785b879304bdb9ce (diff) | |
download | ydb-f5e6392913227cb1a4299c7722690a3d64616255.tar.gz |
use different queues instead of front/back
10 files changed, 74 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index ec06d0ccdf8..68fd73c8674 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -8,6 +8,10 @@ TScanCounters::TScanCounters(const TString& module) : TBase(module) , ProcessingOverload(TBase::GetDeriviative("ProcessingOverload")) , ReadingOverload(TBase::GetDeriviative("ReadingOverload")) + , PriorityFetchBytes(TBase::GetDeriviative("PriorityFetch/Bytes")) + , PriorityFetchCount(TBase::GetDeriviative("PriorityFetch/Count")) + , GeneralFetchBytes(TBase::GetDeriviative("GeneralFetch/Bytes")) + , GeneralFetchCount(TBase::GetDeriviative("GeneralFetch/Count")) , PortionBytes(TBase::GetDeriviative("PortionBytes")) , FilterBytes(TBase::GetDeriviative("FilterBytes")) , PostFilterBytes(TBase::GetDeriviative("PostFilterBytes")) diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index 1e1597e533a..f905b001d1f 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -92,6 +92,11 @@ private: using TBase = TCommonCountersOwner; NMonitoring::TDynamicCounters::TCounterPtr ProcessingOverload; NMonitoring::TDynamicCounters::TCounterPtr ReadingOverload; + + NMonitoring::TDynamicCounters::TCounterPtr PriorityFetchBytes; + NMonitoring::TDynamicCounters::TCounterPtr PriorityFetchCount; + NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchBytes; + NMonitoring::TDynamicCounters::TCounterPtr GeneralFetchCount; public: NMonitoring::TDynamicCounters::TCounterPtr PortionBytes; NMonitoring::TDynamicCounters::TCounterPtr FilterBytes; @@ -125,7 +130,17 @@ public: TScanCounters(const TString& module = "Scan"); - void OnProcessingOverloaded() { + void OnPriorityFetch(const ui64 size) const { + PriorityFetchBytes->Add(size); + PriorityFetchCount->Add(1); + } + + void OnGeneralFetch(const ui64 size) const { + GeneralFetchBytes->Add(size); + GeneralFetchCount->Add(1); + } + + void OnProcessingOverloaded() const { ProcessingOverload->Add(1); } void OnReadingOverloaded() const { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index d54f79f9b62..90758cc318a 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -109,7 +109,7 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: if (batch.GetFetchedInfo().GetFilter()) { Counters.PostFilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; - FetchBlobsQueue.emplace_front(batch.GetGranule(), range); + PriorityBlobsQueue.emplace_back(batch.GetGranule(), range); } else { Counters.FilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataFilterBytes += range.Size; @@ -129,12 +129,18 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { Y_VERIFY(!GranulesContext); GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); ui64 portionsBytes = 0; + std::set<ui64> granulesReady; + ui64 prevGranule = 0; for (auto& portionInfo : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) { portionsBytes += portionInfo.BlobsBytes(); Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule); granule->RegisterBatchForFetching(portionInfo); + if (prevGranule != portionInfo.Granule()) { + Y_VERIFY(granulesReady.emplace(portionInfo.Granule()).second); + prevGranule = portionInfo.Granule(); + } } GranulesContext->PrepareForStart(); @@ -445,22 +451,33 @@ TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, bool TIndexedReadData::IsFinished() const { Y_VERIFY(GranulesContext); - return NotIndexed.empty() && FetchBlobsQueue.empty() && GranulesContext->IsFinished(); + return NotIndexed.empty() && FetchBlobsQueue.empty() && PriorityBlobsQueue.empty() && GranulesContext->IsFinished(); } void TIndexedReadData::Abort() { Y_VERIFY(GranulesContext); FetchBlobsQueue.Stop(); + PriorityBlobsQueue.Stop(); GranulesContext->Abort(); } NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() { Y_VERIFY(GranulesContext); + { + auto* f = PriorityBlobsQueue.front(); + if (f) { + GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange()); + Counters.OnPriorityFetch(f->GetRange().Size); + return PriorityBlobsQueue.pop_front(); + } + } + auto* f = FetchBlobsQueue.front(); if (!f) { return TBlobRange(); } - if (!f->GetGranuleId() || GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { + if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { + Counters.OnGeneralFetch(f->GetRange().Size); return FetchBlobsQueue.pop_front(); } else { Counters.OnProcessingOverloaded(); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 8e26fc05e97..3ea10230cf2 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -24,6 +24,7 @@ private: NColumnShard::TConcreteScanCounters Counters; NColumnShard::TDataTasksProcessorContainer TasksProcessor; TFetchBlobsQueue FetchBlobsQueue; + TFetchBlobsQueue PriorityBlobsQueue; NOlap::TReadMetadata::TConstPtr ReadMetadata; bool OnePhaseReadMode = false; std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; @@ -87,11 +88,11 @@ public: } void AddBlobToFetchInFront(const ui64 granuleId, const TBlobRange& range) { - FetchBlobsQueue.emplace_front(granuleId, range); + PriorityBlobsQueue.emplace_back(granuleId, range); } bool HasMoreBlobs() const { - return FetchBlobsQueue.size(); + return FetchBlobsQueue.size() || PriorityBlobsQueue.size(); } TBlobRange ExtractNextBlob(); diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index b91877b993c..7aeed88a99d 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -52,6 +52,10 @@ NIndexedReader::TBatch* TGranulesFillingContext::GetBatchInfo(const TBatchAddres return Processing.GetBatchInfo(address); } +NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfoVerified(const TBatchAddress& address) { + return Processing.GetBatchInfoVerified(address); +} + NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const { return Owner.GetTasksProcessor(); } @@ -78,18 +82,19 @@ bool TGranulesFillingContext::CheckBufferAvailable() const { Result.GetBlobsSize() + Processing.GetBlobsSize() < ProcessingBytesLimit; } +bool TGranulesFillingContext::ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range) { + Y_VERIFY_DEBUG(!Result.IsReady(granuleId)); + Processing.StartBlobProcessing(granuleId, range); + return true; +} + void TGranulesFillingContext::OnGranuleReady(const ui64 granuleId) { Result.AddResult(Processing.ExtractReadyVerified(granuleId)); } std::vector<NKikimr::NOlap::NIndexedReader::TGranule::TPtr> TGranulesFillingContext::DetachReadyInOrder() { Y_VERIFY(SortingPolicy); - const ui32 sizeBefore = Result.GetCount(); - auto result = SortingPolicy->DetachReadyGranules(Result); - if (sizeBefore == Result.GetCount()) { - Y_VERIFY(InternalReading || CheckBufferAvailable() || Processing.GetProcessingGranulesCount()); - } - return result; + return SortingPolicy->DetachReadyGranules(Result); } } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index c8d2ea4963a..571f88f1698 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -11,7 +11,7 @@ class TIndexedReadData; namespace NKikimr::NOlap::NIndexedReader { -class TGranulesFillingContext { +class TGranulesFillingContext: TNonCopyable { private: YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames); TReadMetadata::TConstPtr ReadMetadata; @@ -33,7 +33,8 @@ private: static constexpr i64 ProcessingBytesLimit = GranulesCountProcessingLimit * ExpectedBytesForGranule; bool CheckBufferAvailable() const; public: - bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange& range); + bool ForceStartProcessGranule(const ui64 granuleId, const TBlobRange& range); + bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range); TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading); void OnBlobReady(const ui64 /*granuleId*/, const TBlobRange& /*range*/) noexcept { @@ -55,7 +56,7 @@ public: return SortingPolicy; } - NColumnShard::TScanCounters GetCounters() const noexcept { + const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { return Counters; } @@ -63,6 +64,7 @@ public: void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); + NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp index 3c31ff4a9aa..f4eb1186188 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp @@ -8,6 +8,7 @@ void TProcessingController::DrainNotIndexedBatches(THashMap<ui64, std::shared_pt return; } NotIndexedBatchesInitialized = true; + GranulesInProcessing.erase(0); auto granules = GranulesWaiting; for (auto&& [_, gPtr] : granules) { if (!batches) { @@ -68,8 +69,13 @@ TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) { void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) { Counters.Aggregations->AddGranuleProcessingBytes(range.Size); if (GranulesInProcessing.emplace(granuleId).second) { - Y_VERIFY(GranulesWaiting.contains(granuleId)); - Counters.Aggregations->AddGranuleProcessing(); + if (granuleId) { + Y_VERIFY(GranulesWaiting.contains(granuleId)); + Counters.Aggregations->AddGranuleProcessing(); + } + } + if (!granuleId) { + Y_VERIFY(!NotIndexedBatchesInitialized); } BlobsSize += range.Size; } @@ -81,4 +87,10 @@ void TProcessingController::Abort() { BlobsSize = 0; } +NKikimr::NOlap::NIndexedReader::TBatch& TProcessingController::GetBatchInfoVerified(const TBatchAddress& address) { + NIndexedReader::TBatch* bInfo = GetBatchInfo(address); + Y_VERIFY(bInfo); + return *bInfo; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h index ba8f3d22d43..de40c312a27 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.h +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h @@ -20,6 +20,7 @@ public: void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); + NIndexedReader::TBatch& GetBatchInfoVerified(const TBatchAddress& address); ui32 GetProcessingGranulesCount() const { return GranulesInProcessing.size(); diff --git a/ydb/core/tx/columnshard/engines/reader/queue.cpp b/ydb/core/tx/columnshard/engines/reader/queue.cpp index c578cd857dd..fc857a54291 100644 --- a/ydb/core/tx/columnshard/engines/reader/queue.cpp +++ b/ydb/core/tx/columnshard/engines/reader/queue.cpp @@ -12,11 +12,6 @@ NKikimr::NOlap::TBlobRange TFetchBlobsQueue::pop_front() { } } -void TFetchBlobsQueue::emplace_front(const ui64 granuleId, const TBlobRange& range) { - Y_VERIFY(!StoppedFlag); - IteratorBlobsSequential.emplace_front(granuleId, range); -} - void TFetchBlobsQueue::emplace_back(const ui64 granuleId, const TBlobRange& range) { Y_VERIFY(!StoppedFlag); IteratorBlobsSequential.emplace_back(granuleId, range); diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h index 383b0cc0937..7c5415000af 100644 --- a/ydb/core/tx/columnshard/engines/reader/queue.h +++ b/ydb/core/tx/columnshard/engines/reader/queue.h @@ -59,7 +59,6 @@ public: } TBlobRange pop_front(); - void emplace_front(const ui64 granuleId, const TBlobRange& range); void emplace_back(const ui64 granuleId, const TBlobRange& range); }; |