diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-05 12:28:42 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-05-05 12:28:42 +0300 |
commit | a42ea897e1adc1a781252433d414994980d6ca60 (patch) | |
tree | 6c6aa39e2ace3ce7052c6d5a5fa7335c30f2b590 | |
parent | 85069d919915e1f46d718faf0289c3f78ac7bb2b (diff) | |
download | ydb-a42ea897e1adc1a781252433d414994980d6ca60.tar.gz |
split batch fetcher agent and fetched info
21 files changed, 313 insertions, 260 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index ba4dd5fcae..b85513d74c 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -116,7 +116,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { Y_VERIFY(IndexedBlobs.emplace(range).second); Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); - if (batch.GetFilter()) { + if (batch.GetFetchedInfo().GetFilter()) { Counters.PostFilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; FetchBlobsQueue.emplace_front(range); @@ -138,16 +138,15 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { NotIndexed.resize(inputBatch); - ui32 batchNo = inputBatch; Y_VERIFY(!GranulesContext); - GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode, inputBatch + ReadMetadata->SelectInfo->Portions.size()); + GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); ui64 portionsBytes = 0; for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) { portionsBytes += portionInfo.BlobsBytes(); Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); NIndexedReader::TGranule& granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule); - granule.AddBatch(batchNo++, portionInfo); + granule.AddBatch(portionInfo); } GranulesContext->PrepareForStart(); diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index a243682b86..51d24420a3 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -55,7 +55,7 @@ public: void InitRead(ui32 numNotIndexed); void Abort() { Y_VERIFY(GranulesContext); - return GranulesContext->Abort(); + return GranulesContext->Abort(); } bool IsInProgress() const { Y_VERIFY(GranulesContext); diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 02acfdbc02..fca7b1fe26 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -19,14 +19,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 69c7c54139..d2914b8715 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -20,14 +20,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 69c7c54139..d2914b8715 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -20,14 +20,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 02acfdbc02..fca7b1fe26 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -19,14 +19,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC core-formats-arrow ) target_sources(columnshard-engines-reader PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp ) diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 47e00395b6..0cfbcf5d4c 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -6,8 +6,8 @@ namespace NKikimr::NOlap::NIndexedReader { -TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo) - : BatchNo(batchNo) +TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo) + : BatchAddress(address) , Portion(portionInfo.Records[0].Portion) , Granule(owner.GetGranuleId()) , Owner(&owner) @@ -19,7 +19,7 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI Owner->SetDuplicationsAvailable(true); if (portionInfo.CanHaveDups()) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion"); - DuplicationsAvailable = true; + DuplicationsAvailableFlag = true; } } } @@ -27,13 +27,13 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) { Y_VERIFY(WaitIndexed.empty()); Y_VERIFY(PortionInfo->Produced()); - Y_VERIFY(!FilteredBatch); + Y_VERIFY(!FetchedInfo.GetFilteredBatch()); auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds); Data.clear(); - if (!Filter) { - return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), Owner->GetEarlyFilterColumns(), processor); + if (!FetchedInfo.GetFilter()) { + return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, + *this, Owner->GetEarlyFilterColumns(), processor, Owner->GetOwner().GetSortingPolicy()); } else { - Y_VERIFY(FilterBatch); return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor); } } @@ -81,7 +81,7 @@ void TBatch::ResetCommon(const std::set<ui32>& columnIds) { } void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) { - Y_VERIFY(!Filter); + Y_VERIFY(!FetchedInfo.GetFilter()); ResetCommon(columnIds); for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { @@ -97,7 +97,7 @@ void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) { } void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { - Y_VERIFY(Filter); + Y_VERIFY(FetchedInfo.GetFilter()); ResetCommon(columnIds); std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects; for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { @@ -112,7 +112,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { for (auto&& columnInfo : orderedObjects) { ui32 expected = 0; - auto it = Filter->GetIterator(); + auto it = FetchedInfo.GetFilter()->GetIterator(); bool undefinedShift = false; bool itFinished = false; for (auto&& [chunk, rec] : columnInfo.second) { @@ -136,20 +136,14 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { } bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, - const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter) { - Y_VERIFY(filter); - Y_VERIFY(!Filter); - Y_VERIFY(!FilterBatch); - Filter = filter; - FilterBatch = filterBatch; - OriginalRecordsCount = originalRecordsCount; - FutureFilter = futureFilter; + const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter) +{ + FetchedInfo.InitFilter(filter, filterBatch, originalRecordsCount, notAppliedEarlyFilter); return Owner->OnFilterReady(*this); } void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) { - Y_VERIFY(!FilteredBatch); - FilteredBatch = batch; + FetchedInfo.InitBatch(batch); Owner->OnBatchReady(*this, batch); } @@ -164,28 +158,16 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) return true; } -bool TBatch::NeedAdditionalData() const { - if (!Filter) { - return true; - } - if (!FilteredBatch || !FilteredBatch->num_rows()) { - return false; - } - if (AskedColumnsAlready(Owner->GetOwner().GetPostFilterColumns())) { - return false; - } - return true; +ui64 TBatch::GetUsefulBytes(const ui64 bytes) const { + return bytes * FetchedInfo.GetUsefulDataKff(); } -ui64 TBatch::GetUsefulBytes(const ui64 bytes) const { - if (!FilteredBatch || !FilteredBatch->num_rows()) { - return 0; - } - Y_VERIFY_DEBUG(OriginalRecordsCount); - if (!OriginalRecordsCount) { - return 0; +std::optional<ui32> TBatch::GetMergePoolId() const { + if (IsSortableInGranule()) { + return Granule; + } else { + return {}; } - return bytes * FilteredBatch->num_rows() / OriginalRecordsCount; } } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index ed4d3676fb..05f2e42ae5 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -1,4 +1,5 @@ #pragma once +#include "common.h" #include "conveyor_task.h" #include <ydb/core/formats/arrow/arrow_filter.h> @@ -16,81 +17,86 @@ struct TReadMetadata; namespace NKikimr::NOlap::NIndexedReader { class TGranule; +class TBatch; -class TBatch { +class TBatchFetchedInfo { private: - ui64 BatchNo = 0; - ui64 Portion = 0; - ui64 Granule = 0; - ui64 WaitingBytes = 0; - ui64 FetchedBytes = 0; - - THashSet<TBlobRange> WaitIndexed; - std::shared_ptr<arrow::RecordBatch> FilteredBatch; - std::shared_ptr<arrow::RecordBatch> FilterBatch; - std::shared_ptr<NArrow::TColumnFilter> Filter; - std::shared_ptr<NArrow::TColumnFilter> FutureFilter; - + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter); ui32 OriginalRecordsCount = 0; - - bool DuplicationsAvailable = false; - THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; - TGranule* Owner; - const TPortionInfo* PortionInfo = nullptr; - - std::optional<std::set<ui32>> CurrentColumnIds; - std::set<ui32> AskedColumnIds; - void ResetCommon(const std::set<ui32>& columnIds); - ui64 GetUsefulBytes(const ui64 bytes) const; - public: - bool IsDuplicationsAvailable() const noexcept { - return DuplicationsAvailable; - } - - void SetDuplicationsAvailable(bool val) noexcept { - DuplicationsAvailable = val; - } - - ui64 GetBatchNo() const noexcept { - return BatchNo; - } - - ui64 GetPortion() const noexcept { - return Portion; + void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, + const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter) { + Y_VERIFY(filter); + Y_VERIFY(!Filter); + Y_VERIFY(!FilterBatch); + Filter = filter; + FilterBatch = filterBatch; + OriginalRecordsCount = originalRecordsCount; + NotAppliedEarlyFilter = notAppliedEarlyFilter; + } + + double GetUsefulDataKff() const { + if (!FilterBatch || !FilterBatch->num_rows()) { + return 0; + } + Y_VERIFY_DEBUG(OriginalRecordsCount); + if (!OriginalRecordsCount) { + return 0; + } + return 1.0 * FilterBatch->num_rows() / OriginalRecordsCount; } - ui64 GetGranule() const noexcept { - return Granule; + bool IsFiltered() const { + return !!Filter; } - ui64 GetWaitingBytes() const noexcept { - return WaitingBytes; + void InitBatch(std::shared_ptr<arrow::RecordBatch> fullBatch) { + Y_VERIFY(!FilteredBatch); + FilteredBatch = fullBatch; } - ui64 GetFetchedBytes() const noexcept { - return FetchedBytes; + ui32 GetFilteredRecordsCount() const { + Y_VERIFY(IsFiltered()); + if (!FilterBatch) { + return 0; + } else { + return FilterBatch->num_rows(); + } } +}; - const std::optional<std::set<ui32>>& GetCurrentColumnIds() const noexcept { - return CurrentColumnIds; - } +class TBatch { +private: + const TBatchAddress BatchAddress; + YDB_READONLY(ui64, Portion, 0); + YDB_READONLY(ui64, Granule, 0); + YDB_READONLY(ui64, WaitingBytes, 0); + YDB_READONLY(ui64, FetchedBytes, 0); - const std::shared_ptr<arrow::RecordBatch>& GetFilteredBatch() const noexcept { - return FilteredBatch; - } + THashSet<TBlobRange> WaitIndexed; + + YDB_READONLY_FLAG(DuplicationsAvailable, false); + YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo); + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; + TGranule* Owner; + const TPortionInfo* PortionInfo = nullptr; - const std::shared_ptr<arrow::RecordBatch>& GetFilterBatch() const noexcept { - return FilterBatch; - } + YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); + std::set<ui32> AskedColumnIds; + void ResetCommon(const std::set<ui32>& columnIds); + ui64 GetUsefulBytes(const ui64 bytes) const; - const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const noexcept { - return Filter; +public: + bool AllowEarlyFilter() const { + return PortionInfo->AllowEarlyFilter(); } - - const std::shared_ptr<NArrow::TColumnFilter>& GetFutureFilter() const noexcept { - return FutureFilter; + const TBatchAddress& GetBatchAddress() const { + return BatchAddress; } + std::optional<ui32> GetMergePoolId() const; ui64 GetUsefulWaitingBytes() const { return GetUsefulBytes(WaitingBytes); @@ -100,11 +106,10 @@ public: return GetUsefulBytes(FetchedBytes); } - bool NeedAdditionalData() const; bool IsSortableInGranule() const { return PortionInfo->IsSortableInGranule(); } - TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo); + TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo); bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; @@ -112,19 +117,9 @@ public: void ResetWithFilter(const std::set<ui32>& columnIds); ui64 GetFetchBytes(const std::set<ui32>& columnIds); - bool IsFiltered() const { - return !!Filter; - } - ui32 GetFilteredRecordsCount() const { - Y_VERIFY(IsFiltered()); - if (!FilterBatch) { - return 0; - } else { - return FilterBatch->num_rows(); - } - } + ui32 GetFilteredRecordsCount() const; bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch, - const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter); + const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter); void InitBatch(std::shared_ptr<arrow::RecordBatch> batch); NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata); diff --git a/ydb/core/tx/columnshard/engines/reader/common.cpp b/ydb/core/tx/columnshard/engines/reader/common.cpp new file mode 100644 index 0000000000..5e29cd8c9c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common.cpp @@ -0,0 +1,17 @@ +#include "common.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+TString TBatchAddress::ToString() const {
+ return TStringBuilder() << GranuleIdx << "," << BatchGranuleIdx;
+}
+
+TBatchAddress::TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx) + : GranuleIdx(granuleIdx)
+ , BatchGranuleIdx(batchGranuleIdx) +{
+
+}
+
+} diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h new file mode 100644 index 0000000000..c1dcc4cec6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/common.h @@ -0,0 +1,18 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <util/system/types.h> +#include <util/generic/string.h> + +namespace NKikimr::NOlap::NIndexedReader { + +class TBatchAddress { +private: + YDB_READONLY(ui32, GranuleIdx, 0); + YDB_READONLY(ui32, BatchGranuleIdx, 0); +public: + TString ToString() const; + + TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx); +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index a8f6be4a73..eed248d5c9 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -4,13 +4,12 @@ namespace NKikimr::NOlap::NIndexedReader { -TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount) +TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading) : ReadMetadata(readMetadata) , InternalReading(internalReading) , Owner(owner) , Counters(owner.GetCounters()) { - Batches.resize(batchesCount, nullptr); SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy(); UsedColumns = ReadMetadata->GetUsedColumnIds(); @@ -44,11 +43,10 @@ void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchIn return Owner.OnBatchReady(batchInfo, batch); } -NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const ui32 batchNo) { - Y_VERIFY(batchNo < Batches.size()); - auto ptr = Batches[batchNo]; - Y_VERIFY(ptr); - return *ptr; +NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) { + Y_VERIFY(address.GetGranuleIdx() < GranulesStorage.size()); + auto& g = GranulesStorage[address.GetGranuleIdx()]; + return g.GetBatchInfo(address.GetBatchGranuleIdx()); } NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const { @@ -56,7 +54,7 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get } void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) { - for (auto&& [_, g] : Granules) { + for (auto&& g : GranulesStorage) { if (!batches) { g.AddNotIndexedBatch(nullptr); } else { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index e9652e0b11..d70752a41e 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -18,19 +18,19 @@ private: TIndexedReadData& Owner; THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; - THashMap<ui64, NIndexedReader::TGranule> Granules; + std::deque<NIndexedReader::TGranule> GranulesStorage; + THashMap<ui64, NIndexedReader::TGranule*> GranulesUpserted; std::set<ui32> EarlyFilterColumns; std::set<ui32> PostFilterColumns; std::set<ui32> FilterStageColumns; std::set<ui32> UsedColumns; IOrderPolicy::TPtr SortingPolicy; NColumnShard::TScanCounters Counters; - std::vector<NIndexedReader::TBatch*> Batches; bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; public: - TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount); + TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading); TReadMetadata::TConstPtr GetReadMetadata() const noexcept { return ReadMetadata; @@ -55,18 +55,18 @@ public: NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const; void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); - TBatch& GetBatchInfo(const ui32 batchNo); + TBatch& GetBatchInfo(const TBatchAddress& address); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); NIndexedReader::TGranule& GetGranuleVerified(const ui64 granuleId) { - auto it = Granules.find(granuleId); - Y_VERIFY(it != Granules.end()); - return it->second; + auto it = GranulesUpserted.find(granuleId); + Y_VERIFY(it != GranulesUpserted.end()); + return *it->second; } - bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); } + bool IsInProgress() const { return GranulesStorage.size() > ReadyGranulesAccumulator.size(); } void OnNewBatch(TBatch& batch) { if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) { @@ -74,7 +74,6 @@ public: } else { batch.ResetNoFilter(UsedColumns); } - Batches[batch.GetBatchNo()] = &batch; } std::vector<TGranule*> DetachReadyInOrder() { @@ -84,20 +83,21 @@ public: void Abort() { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); - for (auto&& i : Granules) { - ReadyGranulesAccumulator.emplace(i.first); + for (auto&& i : GranulesStorage) { + ReadyGranulesAccumulator.emplace(i.GetGranuleId()); } AbortedFlag = true; - Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size()); + Y_VERIFY(ReadyGranulesAccumulator.size() == GranulesStorage.size()); Y_VERIFY(!IsInProgress()); } TGranule& UpsertGranule(const ui64 granuleId) { - auto itGranule = Granules.find(granuleId); - if (itGranule == Granules.end()) { - itGranule = Granules.emplace(granuleId, NIndexedReader::TGranule(granuleId, *this)).first; + auto itGranule = GranulesUpserted.find(granuleId); + if (itGranule == GranulesUpserted.end()) { + GranulesStorage.emplace_back(NIndexedReader::TGranule(granuleId, GranulesStorage.size(), *this)); + itGranule = GranulesUpserted.emplace(granuleId, &GranulesStorage.back()).first; } - return itGranule->second; + return *itGranule->second; } void OnGranuleReady(TGranule& granule) { diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index c7f9c5f0ba..8f9079d3c3 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -21,16 +21,16 @@ bool TAssembleFilter::DoExecuteImpl() { return true; } if (ReadMetadata->Program) { - auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); if (AllowEarlyFilter) { + auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); Filter->CombineSequential(*earlyFilter); if (!earlyFilter->Apply(batch)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount); FilteredBatch = nullptr; return true; } - } else { - EarlyFilter = earlyFilter; + } else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) { + EarlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program)); } } @@ -51,7 +51,7 @@ bool TAssembleFilter::DoExecuteImpl() { } bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const { - TBatch& batch = owner.GetBatchInfo(BatchNo); + TBatch& batch = owner.GetBatchInfo(BatchAddress); Y_VERIFY(OriginalCount); owner.GetCounters().OriginalRowsCount->Add(OriginalCount); owner.GetCounters().AssembleFilterCount->Add(1); diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h index 5b32be9363..d8efa4e8e9 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -1,4 +1,5 @@ #pragma once +#include "common.h" #include "conveyor_task.h" #include <ydb/core/formats/arrow/arrow_filter.h> @@ -17,22 +18,25 @@ namespace NKikimr::NOlap::NIndexedReader { NOlap::TReadMetadata::TConstPtr ReadMetadata; std::shared_ptr<NArrow::TColumnFilter> Filter; std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; - const ui32 BatchNo; + const TBatchAddress BatchAddress; ui32 OriginalCount = 0; bool AllowEarlyFilter = false; std::set<ui32> FilterColumnIds; + IOrderPolicy::TPtr BatchesOrderPolicy; protected: virtual bool DoApply(TGranulesFillingContext& owner) const override; virtual bool DoExecuteImpl() override; public: TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, - TBatch& batch, const bool allowEarlyFilter, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor) + TBatch& batch, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor, + IOrderPolicy::TPtr batchesOrderPolicy) : TBase(processor) , BatchConstructor(batchConstructor) , ReadMetadata(readMetadata) - , BatchNo(batch.GetBatchNo()) - , AllowEarlyFilter(allowEarlyFilter) + , BatchAddress(batch.GetBatchAddress()) + , AllowEarlyFilter(batch.AllowEarlyFilter()) , FilterColumnIds(filterColumnIds) + , BatchesOrderPolicy(batchesOrderPolicy) { TBase::SetPriority(TBase::EPriority::Normal); } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index bd633044fc..217e46911f 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -12,9 +12,10 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco return; } } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", batchInfo.GetBatchNo())("count", WaitBatches.size()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId) + ("batch_address", batchInfo.GetBatchAddress().ToString())("count", WaitBatches.size()); Y_VERIFY(!ReadyFlag); - Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo())); + Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchAddress().GetBatchGranuleIdx())); if (batch && batch->num_rows()) { if (batchInfo.IsSortableInGranule()) { SortableBatches.emplace_back(batch); @@ -35,11 +36,12 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco CheckReady(); } -NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) { +NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const TPortionInfo& portionInfo) { Y_VERIFY(!ReadyFlag); - WaitBatches.emplace(batchNo); - Batches.emplace_back(TBatch(batchNo, *this, portionInfo)); - Y_VERIFY(GranuleBatchNumbers.emplace(batchNo).second); + ui32 batchGranuleIdx = Batches.size(); + WaitBatches.emplace(batchGranuleIdx); + Batches.emplace_back(TBatch(TBatchAddress(GranuleIdx, batchGranuleIdx), *this, portionInfo)); + Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second); Owner->OnNewBatch(Batches.back()); return Batches.back(); } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index eddaa66dc1..442af41ed9 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -14,6 +14,7 @@ class TGranulesFillingContext; class TGranule { private: ui64 GranuleId = 0; + YDB_READONLY(ui64, GranuleIdx, 0); bool NotIndexedBatchReadyFlag = false; std::shared_ptr<arrow::RecordBatch> NotIndexedBatch; @@ -31,8 +32,9 @@ private: void CheckReady(); public: - TGranule(const ui64 granuleId, TGranulesFillingContext& owner) + TGranule(const ui64 granuleId, const ui64 granuleIdx, TGranulesFillingContext& owner) : GranuleId(granuleId) + , GranuleIdx(granuleIdx) , Owner(&owner) { } @@ -79,6 +81,11 @@ public: return result; } + TBatch& GetBatchInfo(const ui32 batchIdx) { + Y_VERIFY(batchIdx < Batches.size()); + return Batches[batchIdx]; + } + void AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch); const TGranulesFillingContext& GetOwner() const { @@ -89,7 +96,7 @@ public: const std::set<ui32>& GetEarlyFilterColumns() const; void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); bool OnFilterReady(TBatch& batchInfo); - TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo); + TBatch& AddBatch(const TPortionInfo& portionInfo); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const; }; diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp index 8f51265b7b..7ba4e4c250 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp @@ -53,16 +53,12 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont } if (g.Start()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size()); - MergeStream.AddIndependentSource(g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); + MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter()); } auto& batches = g.GetBatches(); - while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) { + while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) { auto b = batches.front(); - if (b->IsSortableInGranule()) { - MergeStream.AddPoolSource(0, b->GetFilterBatch()); - } else { - MergeStream.AddIndependentSource(b->GetFilterBatch(), b->GetFutureFilter()); - } + MergeStream.AddPoolSource(b->GetMergePoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter()); OnBatchFilterInitialized(*b, context); batches.pop_front(); } @@ -118,42 +114,43 @@ TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) CurrentItemsLimit = ReadMetadata->Limit; } -void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) { +void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) { + auto& batch = batchOriginal.GetFetchedInfo(); Y_VERIFY(!!batch.GetFilter()); if (!batch.GetFilteredRecordsCount()) { context.GetCounters().EmptyFilterCount->Add(1); - context.GetCounters().EmptyFilterFetchedBytes->Add(batch.GetFetchedBytes()); - context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); - batch.InitBatch(nullptr); + context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); + context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); + batchOriginal.InitBatch(nullptr); } else { context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows()); - if (batch.AskedColumnsAlready(context.GetPostFilterColumns())) { + if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) { context.GetCounters().FilterOnlyCount->Add(1); - context.GetCounters().FilterOnlyFetchedBytes->Add(batch.GetFetchedBytes()); - context.GetCounters().FilterOnlyUsefulBytes->Add(batch.GetUsefulFetchedBytes()); - context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns())); + context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes()); + context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); + context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns())); - batch.InitBatch(batch.GetFilterBatch()); + batchOriginal.InitBatch(batch.GetFilterBatch()); } else { - context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batch.GetFetchedBytes()); - context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batch.GetUsefulFetchedBytes()); + context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes()); + context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes()); - batch.ResetWithFilter(context.GetPostFilterColumns()); - if (batch.IsFetchingReady()) { + batchOriginal.ResetWithFilter(context.GetPostFilterColumns()); + if (batchOriginal.IsFetchingReady()) { auto processor = context.GetTasksProcessor(); - if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), context.GetReadMetadata())) { + if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) { processor.Add(context, assembleBatchTask); } } context.GetCounters().TwoPhasesCount->Add(1); - context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batch.GetWaitingBytes()); - context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batch.GetUsefulWaitingBytes()); + context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes()); + context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") ("filtered_count", batch.GetFilterBatch()->num_rows()) - ("blobs_count", batch.GetWaitingBlobs().size()) - ("columns_count", batch.GetCurrentColumnIds()->size()) - ("fetch_size", batch.GetWaitingBytes()) + ("blobs_count", batchOriginal.GetWaitingBlobs().size()) + ("columns_count", batchOriginal.GetCurrentColumnIds()->size()) + ("fetch_size", batchOriginal.GetWaitingBytes()) ; } } diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h index 842488aeb8..66f2fba42b 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_controller.h +++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h @@ -8,6 +8,14 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranulesFillingContext; class IOrderPolicy { +public: + enum class EFeatures: ui32 { + CanInterrupt = 1, + NeedNotAppliedEarlyFilter = 1 << 1 + }; + using TFeatures = ui32; +private: + mutable std::optional<TFeatures> Features; protected: TReadMetadata::TConstPtr ReadMetadata; virtual void DoFill(TGranulesFillingContext& context) = 0; @@ -21,6 +29,15 @@ protected: } void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context); + virtual TFeatures DoGetFeatures() const { + return 0; + } + TFeatures GetFeatures() const { + if (!Features) { + Features = DoGetFeatures(); + } + return *Features; + } public: using TPtr = std::shared_ptr<IOrderPolicy>; virtual ~IOrderPolicy() = default; @@ -35,8 +52,12 @@ public: } - virtual bool CanInterrupt() const { - return false; + bool CanInterrupt() const { + return GetFeatures() & (TFeatures)EFeatures::CanInterrupt; + } + + bool NeedNotAppliedEarlyFilter() const { + return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; } bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) { @@ -143,6 +164,10 @@ protected: virtual void DoFill(TGranulesFillingContext& context) override; virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override; virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override; + virtual TFeatures DoGetFeatures() const override { + return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter; + } + public: virtual std::set<ui32> GetFilterStageColumns() override { std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds(); @@ -152,10 +177,6 @@ public: return result; } - virtual bool CanInterrupt() const override { - return true; - } - TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata); virtual bool ReadyForAddNotIndexedToEnd() const override { return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty(); diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index e857d671f3..b1c4bbb542 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -34,7 +34,7 @@ bool TAssembleBatch::DoExecuteImpl() { } bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const { - TBatch& batch = owner.GetBatchInfo(BatchNo); + TBatch& batch = owner.GetBatchInfo(BatchAddress); batch.InitBatch(FullBatch); return true; } @@ -45,14 +45,14 @@ TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstruct : TBase(processor) , BatchConstructor(batchConstructor) , FullColumnsOrder(fullColumnsOrder) - , Filter(currentBatch.GetFilter()) - , FilterBatch(currentBatch.GetFilterBatch()) - , BatchNo(currentBatch.GetBatchNo()) + , Filter(currentBatch.GetFetchedInfo().GetFilter()) + , FilterBatch(currentBatch.GetFetchedInfo().GetFilterBatch()) + , BatchAddress(currentBatch.GetBatchAddress()) { TBase::SetPriority(TBase::EPriority::High); - Y_VERIFY(currentBatch.GetFilter()); - Y_VERIFY(currentBatch.GetFilterBatch()); - Y_VERIFY(!currentBatch.GetFilteredBatch()); + Y_VERIFY(currentBatch.GetFetchedInfo().GetFilter()); + Y_VERIFY(currentBatch.GetFetchedInfo().GetFilterBatch()); + Y_VERIFY(!currentBatch.GetFetchedInfo().GetFilteredBatch()); } } diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h index 0dad312c34..db09e5ea06 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h @@ -1,4 +1,5 @@ #pragma once +#include "common.h" #include "conveyor_task.h" #include <ydb/core/tx/columnshard/engines/portion_info.h> @@ -18,7 +19,7 @@ private: std::shared_ptr<NArrow::TColumnFilter> Filter; std::shared_ptr<arrow::RecordBatch> FilterBatch; - const ui32 BatchNo; + const TBatchAddress BatchAddress; protected: virtual bool DoApply(TGranulesFillingContext& owner) const override; virtual bool DoExecuteImpl() override; diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index b25b7a4338..1c481b528c 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -13,8 +13,8 @@ class TMergePartialStream { private: class TBatchIterator { private: - i64 Position = 0; - std::optional<ui32> PoolId; + YDB_ACCESSOR(i64, Position, 0); + YDB_OPT(ui32, PoolId); std::shared_ptr<arrow::RecordBatch> Batch; std::shared_ptr<NArrow::TColumnFilter> Filter; @@ -79,17 +79,9 @@ private: } } - bool HasPoolId() const noexcept { - return PoolId.has_value(); - } - - ui32 GetPoolIdUnsafe() const noexcept { - return *PoolId; - } - bool CheckNextBatch(const TBatchIterator& nextIterator) { Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size()); - return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0; + return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, nextIterator.GetFirstPosition()) * ReverseSortKff < 0; } class TPosition { @@ -163,6 +155,19 @@ private: } }; + class TIteratorData { + private: + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); + public: + TIteratorData(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) + : Batch(batch) + , Filter(filter) + { + + } + }; + bool NextInHeap(const bool needPop) { if (SortHeap.empty()) { return false; @@ -182,16 +187,17 @@ private: SortHeap.pop_back(); } else { it->second.pop_front(); - TBatchIterator newIterator(it->second.front(), nullptr, SortSchema, Reverse, SortHeap.back().GetPoolIdUnsafe()); - SortHeap.back().CheckNextBatch(newIterator); - std::swap(SortHeap.back(), newIterator); + TBatchIterator oldIterator = std::move(SortHeap.back()); + SortHeap.pop_back(); + AddToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false); + oldIterator.CheckNextBatch(SortHeap.back()); std::push_heap(SortHeap.begin(), SortHeap.end()); } } return SortHeap.size(); } - THashMap<ui32, std::deque<std::shared_ptr<arrow::RecordBatch>>> BatchPools; + THashMap<ui32, std::deque<TIteratorData>> BatchPools; std::vector<std::shared_ptr<arrow::RecordBatch>> IndependentBatches; std::vector<TBatchIterator> SortHeap; std::shared_ptr<arrow::Schema> SortSchema; @@ -210,6 +216,19 @@ private: } return position; } + + void AddToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) { + if (!filter || filter->IsTotalAllowFilter()) { + SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId)); + } else if (filter->IsTotalDenyFilter()) { + return; + } else { + SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, poolId)); + } + if (restoreHeap) { + std::push_heap(SortHeap.begin(), SortHeap.end()); + } + } public: TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse) : SortSchema(sortSchema) @@ -221,22 +240,6 @@ public: return SortHeap.size(); } - void AddIndependentSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { - if (!batch || !batch->num_rows()) { - return; - } - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); - IndependentBatches.emplace_back(batch); - if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, {})); - } else if (filter->IsTotalDenyFilter()) { - return; - } else { - SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, {})); - } - std::push_heap(SortHeap.begin(), SortHeap.end()); - } - bool HasRecordsInPool(const ui32 poolId) const { auto it = BatchPools.find(poolId); if (it == BatchPools.end()) { @@ -245,18 +248,23 @@ public: return it->second.size(); } - void AddPoolSource(const ui32 poolId, std::shared_ptr<arrow::RecordBatch> batch) { + void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) { if (!batch || !batch->num_rows()) { return; } - auto it = BatchPools.find(poolId); - if (it == BatchPools.end()) { - it = BatchPools.emplace(poolId, std::deque<std::shared_ptr<arrow::RecordBatch>>()).first; - } - it->second.emplace_back(batch); - if (it->second.size() == 1) { - SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId)); - std::push_heap(SortHeap.begin(), SortHeap.end()); + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema)); + if (!poolId) { + IndependentBatches.emplace_back(batch); + AddToHeap(poolId, batch, filter, true); + } else { + auto it = BatchPools.find(*poolId); + if (it == BatchPools.end()) { + it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first; + } + it->second.emplace_back(batch, filter); + if (it->second.size() == 1) { + AddToHeap(poolId, batch, filter, true); + } } } |