diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 09:24:13 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 09:24:13 +0300 |
commit | 46510d6c88912219669dfe229e9f6b55603d97ca (patch) | |
tree | b41c88538f080b125a2792d087414ba0bde0ec0b | |
parent | 4151a49ff0f7a9c3eab84ccaf38d2a0ab64c189f (diff) | |
download | ydb-46510d6c88912219669dfe229e9f6b55603d97ca.tar.gz |
move fetching check usage inside
5 files changed, 26 insertions, 18 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 90758cc318..770d55539f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -171,11 +171,6 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da if (!portionBatch->AddIndexedReady(blobRange, data)) { return; } - if (portionBatch->IsFetchingReady()) { - if (auto batch = portionBatch->AssembleTask(TasksProcessor.GetObject(), ReadMetadata)) { - TasksProcessor.Add(*GranulesContext, batch); - } - } } std::shared_ptr<arrow::RecordBatch> diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 42e8d8694c..b05c74a9a9 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -12,14 +12,15 @@ TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo , Portion(portionInfo.Records[0].Portion) , Granule(owner.GetGranuleId()) , Owner(&owner) - , PortionInfo(&portionInfo) { + , PortionInfo(&portionInfo) +{ Y_VERIFY(portionInfo.Records.size()); if (portionInfo.CanIntersectOthers()) { - AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "intersect_portion"); + ACFL_TRACE("event", "intersect_portion"); Owner->SetDuplicationsAvailable(true); if (portionInfo.CanHaveDups()) { - AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion"); + ACFL_TRACE("event", "dup_portion"); DuplicationsAvailableFlag = true; } } @@ -143,6 +144,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { } } } + CheckReadyForAssemble(); } bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, @@ -166,6 +168,7 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) FetchedBytes += bRange.Size; Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData)); Owner->OnBlobReady(bRange); + CheckReadyForAssemble(); return true; } @@ -212,4 +215,16 @@ void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std:: } } +bool TBatch::CheckReadyForAssemble() { + if (IsFetchingReady()) { + auto& context = Owner->GetOwner(); + auto processor = context.GetTasksProcessor(); + if (auto assembleBatchTask = AssembleTask(processor.GetObject(), context.GetReadMetadata())) { + processor.Add(context, assembleBatchTask); + } + return true; + } + return false; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 86b767c88a..40969f6e91 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -93,6 +93,10 @@ private: std::set<ui32> AskedColumnIds; void ResetCommon(const std::set<ui32>& columnIds); ui64 GetUsefulBytes(const ui64 bytes) const; + bool CheckReadyForAssemble(); + bool IsFetchingReady() const { + return WaitIndexed.empty(); + } public: std::shared_ptr<TSortableBatchPosition> GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const; @@ -136,10 +140,6 @@ public: return *Owner; } - bool IsFetchingReady() const { - return WaitIndexed.empty(); - } - const TPortionInfo& GetPortionInfo() const { return *PortionInfo; } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index f2318c4c9f..8fe3ca8adf 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -88,6 +88,10 @@ public: return *Owner; } + TGranulesFillingContext& GetOwner() { + return *Owner; + } + class TBatchForMerge { private: TBatch* Batch = nullptr; diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp index b565d57306..63e96f8527 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp @@ -25,12 +25,6 @@ void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFill context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); batchOriginal.ResetWithFilter(context.GetPostFilterColumns()); - if (batchOriginal.IsFetchingReady()) { - auto processor = context.GetTasksProcessor(); - if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) { - processor.Add(context, assembleBatchTask); - } - } context.GetCounters().TwoPhasesCount->Add(1); context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes()); |