diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 19:10:27 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-03 19:10:27 +0300 |
commit | 76c944bc91c97bc0dec94800cfc19fc1d9f5c734 (patch) | |
tree | a2a6e260c2598cebc2b43524c5ab3e0e3fd6f8e9 | |
parent | 7b101672fe85f87bdadd7b110b1aba1f9aa24414 (diff) | |
download | ydb-76c944bc91c97bc0dec94800cfc19fc1d9f5c734.tar.gz |
control start merge for granule
remove useless hashmap
remove useless checkers
7 files changed, 100 insertions, 61 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 1cf45874a5..9ed9ebc93d 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -223,7 +223,7 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR if (NotIndexed.size()) { auto mergedBatch = MergeNotIndexed(std::move(NotIndexed)); // merged has no dups if (mergedBatch) { - // Init split by granules structs + // Init split by granules structures Y_VERIFY(ReadMetadata->SelectInfo); TColumnEngineForLogs::TMarksGranules marksGranules(*ReadMetadata->SelectInfo); @@ -233,16 +233,20 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR Y_VERIFY(!marksGranules.Empty()); auto outNotIndexed = marksGranules.SliceIntoGranules(mergedBatch, IndexInfo()); - GranulesContext->AddNotIndexedBatches(outNotIndexed); + GranulesContext->DrainNotIndexedBatches(&outNotIndexed); Y_VERIFY(outNotIndexed.size() <= 1); if (outNotIndexed.size() == 1) { auto it = outNotIndexed.find(0); Y_VERIFY(it != outNotIndexed.end()); NotIndexedOutscopeBatch = it->second; } + } else { + GranulesContext->DrainNotIndexedBatches(nullptr); } NotIndexed.clear(); ReadyNotIndexed = 0; + } else { + GranulesContext->DrainNotIndexedBatches(nullptr); } // Extract ready to out granules: ready granules that are not blocked by other (not ready) granules @@ -312,17 +316,11 @@ TIndexedReadData::MergeNotIndexed(std::vector<std::shared_ptr<arrow::RecordBatch Y_VERIFY(ReadMetadata->IsSorted()); Y_VERIFY(IndexInfo().GetSortingKey()); - { // remove empty batches - size_t dst = 0; - for (size_t src = 0; src < batches.size(); ++src) { - if (batches[src] && batches[src]->num_rows()) { - if (dst != src) { - batches[dst] = batches[src]; - } - ++dst; - } - } - batches.resize(dst); + { + const auto pred = [](const std::shared_ptr<arrow::RecordBatch>& b) { + return !b || !b->num_rows(); + }; + batches.erase(std::remove_if(batches.begin(), batches.end(), pred), batches.end()); } if (batches.empty()) { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index ea1c4ee2f1..a8f6be4a73 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -55,22 +55,20 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get return Owner.GetTasksProcessor(); } -void TGranulesFillingContext::AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches) { - std::shared_ptr<arrow::RecordBatch> externalBatch; - for (auto it = batches.begin(); it != batches.end(); ++it) { - if (!it->first) { - externalBatch = it->second; - continue; +void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) { + for (auto&& [_, g] : Granules) { + if (!batches) { + g.AddNotIndexedBatch(nullptr); + } else { + auto it = batches->find(g.GetGranuleId()); + if (it == batches->end()) { + g.AddNotIndexedBatch(nullptr); + } else { + g.AddNotIndexedBatch(it->second); + } + batches->erase(it); } - auto itGranule = Granules.find(it->first); - Y_VERIFY(itGranule != Granules.end()); - itGranule->second.AddNotIndexedBatch(it->second); } - THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> resultLocal; - if (externalBatch) { - resultLocal.emplace(0, externalBatch); - } - std::swap(batches, resultLocal); } } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index cab84fc290..96c9cb40e8 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -34,7 +34,7 @@ public: NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; - void AddNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>& batches); + void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); TBatch& GetBatchInfo(const ui32 batchNo); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); @@ -85,6 +85,10 @@ public: Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); } + void Wakeup(TGranule& granule) { + SortingPolicy->Wakeup(granule, *this); + } + void PrepareForStart() { SortingPolicy->Fill(*this); } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index 3de3003cbe..bd633044fc 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -12,6 +12,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco return; } } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", batchInfo.GetBatchNo())("count", WaitBatches.size()); Y_VERIFY(!ReadyFlag); Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo())); if (batch && batch->num_rows()) { @@ -31,10 +32,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco } } Owner->OnBatchReady(batchInfo, batch); - if (WaitBatches.empty()) { - ReadyFlag = true; - Owner->OnGranuleReady(*this); - } + CheckReady(); } NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) { @@ -95,18 +93,30 @@ std::deque<TBatch*> TGranule::SortBatchesByPK(const bool reverse, TReadMetadata: } void TGranule::AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch) { - if (!batch || !batch->num_rows()) { + Y_VERIFY(!NotIndexedBatchReadyFlag || !batch); + if (!NotIndexedBatchReadyFlag) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", "add_not_indexed_batch")("count", WaitBatches.size()); + } else { return; } - AFL_ERROR(NKikimrServices::KQP_COMPUTE)("event", "add_not_indexed_batch"); - Y_VERIFY(NonSortableBatches.empty()); - Y_VERIFY(SortableBatches.empty()); - Y_VERIFY(!NotIndexedBatch); - NotIndexedBatch = batch; - if (Owner->GetReadMetadata()->Program) { - NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program)); + NotIndexedBatchReadyFlag = true; + if (batch && batch->num_rows()) { + Y_VERIFY(!NotIndexedBatch); + NotIndexedBatch = batch; + if (Owner->GetReadMetadata()->Program) { + NotIndexedBatchFutureFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, Owner->GetReadMetadata()->Program)); + } + DuplicationsAvailableFlag = true; + } + CheckReady(); + Owner->Wakeup(*this); +} + +void TGranule::CheckReady() { + if (WaitBatches.empty() && NotIndexedBatchReadyFlag) { + ReadyFlag = true; + Owner->OnGranuleReady(*this); } - DuplicationsAvailableFlag = true; } } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index cc6763a364..49543f6948 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -14,10 +14,13 @@ class TGranulesFillingContext; class TGranule { private: YDB_READONLY(ui64, GranuleId, 0); - std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches; - std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches; + + YDB_READONLY_FLAG(NotIndexedBatchReady, false); YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, NotIndexedBatch); YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotIndexedBatchFutureFilter); + + std::vector<std::shared_ptr<arrow::RecordBatch>> NonSortableBatches; + std::vector<std::shared_ptr<arrow::RecordBatch>> SortableBatches; YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); YDB_READONLY_FLAG(Ready, false); std::deque<TBatch> Batches; @@ -25,6 +28,8 @@ private: std::set<ui32> GranuleBatchNumbers; TGranulesFillingContext* Owner = nullptr; YDB_READONLY_DEF(THashSet<const void*>, BatchesToDedup); + + void CheckReady(); public: TGranule(const ui64 granuleId, TGranulesFillingContext& owner) : GranuleId(granuleId) diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp index 0c82375122..3875f9b635 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -35,24 +35,27 @@ std::vector<TGranule*> TNonSorting::DoDetachReadyGranules(THashMap<ui64, NIndexe return result; } -bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) { +bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingContext& context) { Y_VERIFY(ReadMetadata->Limit); if (!CurrentItemsLimit) { return false; } Y_VERIFY(GranulesOutOrderForPortions.size()); - if (granule.GetGranuleId() != GranulesOutOrderForPortions.front()->GetGranuleId()) { + if (GranulesOutOrderForPortions.front().GetGranule()->GetGranuleId() != granule.GetGranuleId()) { return false; } while (GranulesOutOrderForPortions.size()) { - auto it = OrderedBatches.find(GranulesOutOrderForPortions.front()->GetGranuleId()); - auto g = GranulesOutOrderForPortions.front(); - Y_VERIFY(it != OrderedBatches.end()); - if (!it->second.GetStarted()) { - MergeStream.AddIndependentSource(g->GetNotIndexedBatch(), g->GetNotIndexedBatchFutureFilter()); - it->second.SetStarted(true); + auto& g = GranulesOutOrderForPortions.front(); + // granule have to wait NotIndexedBatch initialization, at first (StartableFlag initialization). + // other batches will be delivered in OrderedBatches[granuleId] order + if (!g.GetGranule()->IsNotIndexedBatchReady()) { + break; + } + if (g.Start()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size()); + MergeStream.AddIndependentSource(g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); } - auto& batches = it->second.MutableBatches(); + auto& batches = g.MutableBatches(); while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) { auto b = batches.front(); if (b->IsSortableInGranule()) { @@ -73,13 +76,16 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& b->InitBatch(nullptr); batches.pop_front(); } - OrderedBatches.erase(it); GranulesOutOrderForPortions.pop_front(); } else { break; } } - return false; + return true; +} + +bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& granule, TGranulesFillingContext& context) { + return Wakeup(granule, context); } void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { @@ -87,9 +93,8 @@ void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { for (ui64 granule : granulesOrder) { TGranule& g = context.GetGranuleVerified(granule); GranulesOutOrder.emplace_back(&g); - Y_VERIFY(OrderedBatches.emplace(granule, TGranuleScanInfo(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata))).second); + GranulesOutOrderForPortions.emplace_back(g.SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), &g); } - GranulesOutOrderForPortions = GranulesOutOrder; } std::vector<TGranule*> TPKSortingWithLimit::DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) { diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h index 2f697219b6..969c9cd0e3 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.h +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h @@ -11,6 +11,9 @@ class IOrderPolicy { protected: TReadMetadata::TConstPtr ReadMetadata; virtual void DoFill(TGranulesFillingContext& context) = 0; + virtual bool DoWakeup(const TGranule& /*granule*/, TGranulesFillingContext& /*context*/) { + return true; + } virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) = 0; virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& /*granule*/, TGranulesFillingContext& context) { OnBatchFilterInitialized(batchInfo, context); @@ -50,6 +53,10 @@ public: void Fill(TGranulesFillingContext& context) { DoFill(context); } + + bool Wakeup(const TGranule& granule, TGranulesFillingContext& context) { + return DoWakeup(granule, context); + } }; class TNonSorting: public IOrderPolicy { @@ -89,13 +96,25 @@ public: } }; -class TGranuleScanInfo { +class TGranuleOrdered { private: - YDB_ACCESSOR(bool, Started, false); + bool StartedFlag = false; YDB_ACCESSOR_DEF(std::deque<TBatch*>, Batches); + YDB_READONLY(const TGranule*, Granule, nullptr); public: - TGranuleScanInfo(std::deque<TBatch*>&& batches) + bool Start() { + if (!StartedFlag) { + StartedFlag = true; + return true; + } else { + return false; + } + + } + + TGranuleOrdered(std::deque<TBatch*>&& batches, TGranule* granule) : Batches(std::move(batches)) + , Granule(granule) { } @@ -105,11 +124,11 @@ class TPKSortingWithLimit: public IOrderPolicy { private: using TBase = IOrderPolicy; std::deque<TGranule*> GranulesOutOrder; - std::deque<TGranule*> GranulesOutOrderForPortions; - THashMap<ui64, TGranuleScanInfo> OrderedBatches; + std::deque<TGranuleOrdered> GranulesOutOrderForPortions; ui32 CurrentItemsLimit = 0; TMergePartialStream MergeStream; protected: + virtual bool DoWakeup(const TGranule& granule, TGranulesFillingContext& context) override; virtual void DoFill(TGranulesFillingContext& context) override; virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; |