aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-05-05 12:28:42 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-05-05 12:28:42 +0300
commita42ea897e1adc1a781252433d414994980d6ca60 (patch)
tree6c6aa39e2ace3ce7052c6d5a5fa7335c30f2b590
parent85069d919915e1f46d718faf0289c3f78ac7bb2b (diff)
downloadydb-a42ea897e1adc1a781252433d414994980d6ca60.tar.gz
split batch fetcher agent and fetched info
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp60
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h139
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common.h18
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h32
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.cpp49
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_controller.h33
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h88
21 files changed, 313 insertions, 260 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index ba4dd5fcae..b85513d74c 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -116,7 +116,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
Y_VERIFY(IndexedBlobs.emplace(range).second);
Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second);
- if (batch.GetFilter()) {
+ if (batch.GetFetchedInfo().GetFilter()) {
Counters.PostFilterBytes->Add(range.Size);
ReadMetadata->ReadStats->DataAdditionalBytes += range.Size;
FetchBlobsQueue.emplace_front(range);
@@ -138,16 +138,15 @@ void TIndexedReadData::InitRead(ui32 inputBatch) {
NotIndexed.resize(inputBatch);
- ui32 batchNo = inputBatch;
Y_VERIFY(!GranulesContext);
- GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode, inputBatch + ReadMetadata->SelectInfo->Portions.size());
+ GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode);
ui64 portionsBytes = 0;
for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) {
portionsBytes += portionInfo.BlobsBytes();
Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata);
NIndexedReader::TGranule& granule = GranulesContext->UpsertGranule(portionInfo.Records[0].Granule);
- granule.AddBatch(batchNo++, portionInfo);
+ granule.AddBatch(portionInfo);
}
GranulesContext->PrepareForStart();
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index a243682b86..51d24420a3 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -55,7 +55,7 @@ public:
void InitRead(ui32 numNotIndexed);
void Abort() {
Y_VERIFY(GranulesContext);
- return GranulesContext->Abort();
+ return GranulesContext->Abort();
}
bool IsInProgress() const {
Y_VERIFY(GranulesContext);
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
index 02acfdbc02..fca7b1fe26 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -19,14 +19,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
index 69c7c54139..d2914b8715 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -20,14 +20,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
index 69c7c54139..d2914b8715 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -20,14 +20,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
index 02acfdbc02..fca7b1fe26 100644
--- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -19,14 +19,15 @@ target_link_libraries(columnshard-engines-reader PUBLIC
core-formats-arrow
)
target_sources(columnshard-engines-reader PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 47e00395b6..0cfbcf5d4c 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -6,8 +6,8 @@
namespace NKikimr::NOlap::NIndexedReader {
-TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo)
- : BatchNo(batchNo)
+TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo)
+ : BatchAddress(address)
, Portion(portionInfo.Records[0].Portion)
, Granule(owner.GetGranuleId())
, Owner(&owner)
@@ -19,7 +19,7 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI
Owner->SetDuplicationsAvailable(true);
if (portionInfo.CanHaveDups()) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion");
- DuplicationsAvailable = true;
+ DuplicationsAvailableFlag = true;
}
}
}
@@ -27,13 +27,13 @@ TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionI
NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) {
Y_VERIFY(WaitIndexed.empty());
Y_VERIFY(PortionInfo->Produced());
- Y_VERIFY(!FilteredBatch);
+ Y_VERIFY(!FetchedInfo.GetFilteredBatch());
auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds);
Data.clear();
- if (!Filter) {
- return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), Owner->GetEarlyFilterColumns(), processor);
+ if (!FetchedInfo.GetFilter()) {
+ return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata,
+ *this, Owner->GetEarlyFilterColumns(), processor, Owner->GetOwner().GetSortingPolicy());
} else {
- Y_VERIFY(FilterBatch);
return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor);
}
}
@@ -81,7 +81,7 @@ void TBatch::ResetCommon(const std::set<ui32>& columnIds) {
}
void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) {
- Y_VERIFY(!Filter);
+ Y_VERIFY(!FetchedInfo.GetFilter());
ResetCommon(columnIds);
for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
@@ -97,7 +97,7 @@ void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) {
}
void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
- Y_VERIFY(Filter);
+ Y_VERIFY(FetchedInfo.GetFilter());
ResetCommon(columnIds);
std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects;
for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
@@ -112,7 +112,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
for (auto&& columnInfo : orderedObjects) {
ui32 expected = 0;
- auto it = Filter->GetIterator();
+ auto it = FetchedInfo.GetFilter()->GetIterator();
bool undefinedShift = false;
bool itFinished = false;
for (auto&& [chunk, rec] : columnInfo.second) {
@@ -136,20 +136,14 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
}
bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
- const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter) {
- Y_VERIFY(filter);
- Y_VERIFY(!Filter);
- Y_VERIFY(!FilterBatch);
- Filter = filter;
- FilterBatch = filterBatch;
- OriginalRecordsCount = originalRecordsCount;
- FutureFilter = futureFilter;
+ const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter)
+{
+ FetchedInfo.InitFilter(filter, filterBatch, originalRecordsCount, notAppliedEarlyFilter);
return Owner->OnFilterReady(*this);
}
void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) {
- Y_VERIFY(!FilteredBatch);
- FilteredBatch = batch;
+ FetchedInfo.InitBatch(batch);
Owner->OnBatchReady(*this, batch);
}
@@ -164,28 +158,16 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData)
return true;
}
-bool TBatch::NeedAdditionalData() const {
- if (!Filter) {
- return true;
- }
- if (!FilteredBatch || !FilteredBatch->num_rows()) {
- return false;
- }
- if (AskedColumnsAlready(Owner->GetOwner().GetPostFilterColumns())) {
- return false;
- }
- return true;
+ui64 TBatch::GetUsefulBytes(const ui64 bytes) const {
+ return bytes * FetchedInfo.GetUsefulDataKff();
}
-ui64 TBatch::GetUsefulBytes(const ui64 bytes) const {
- if (!FilteredBatch || !FilteredBatch->num_rows()) {
- return 0;
- }
- Y_VERIFY_DEBUG(OriginalRecordsCount);
- if (!OriginalRecordsCount) {
- return 0;
+std::optional<ui32> TBatch::GetMergePoolId() const {
+ if (IsSortableInGranule()) {
+ return Granule;
+ } else {
+ return {};
}
- return bytes * FilteredBatch->num_rows() / OriginalRecordsCount;
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index ed4d3676fb..05f2e42ae5 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -1,4 +1,5 @@
#pragma once
+#include "common.h"
#include "conveyor_task.h"
#include <ydb/core/formats/arrow/arrow_filter.h>
@@ -16,81 +17,86 @@ struct TReadMetadata;
namespace NKikimr::NOlap::NIndexedReader {
class TGranule;
+class TBatch;
-class TBatch {
+class TBatchFetchedInfo {
private:
- ui64 BatchNo = 0;
- ui64 Portion = 0;
- ui64 Granule = 0;
- ui64 WaitingBytes = 0;
- ui64 FetchedBytes = 0;
-
- THashSet<TBlobRange> WaitIndexed;
- std::shared_ptr<arrow::RecordBatch> FilteredBatch;
- std::shared_ptr<arrow::RecordBatch> FilterBatch;
- std::shared_ptr<NArrow::TColumnFilter> Filter;
- std::shared_ptr<NArrow::TColumnFilter> FutureFilter;
-
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, NotAppliedEarlyFilter);
ui32 OriginalRecordsCount = 0;
-
- bool DuplicationsAvailable = false;
- THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
- TGranule* Owner;
- const TPortionInfo* PortionInfo = nullptr;
-
- std::optional<std::set<ui32>> CurrentColumnIds;
- std::set<ui32> AskedColumnIds;
- void ResetCommon(const std::set<ui32>& columnIds);
- ui64 GetUsefulBytes(const ui64 bytes) const;
-
public:
- bool IsDuplicationsAvailable() const noexcept {
- return DuplicationsAvailable;
- }
-
- void SetDuplicationsAvailable(bool val) noexcept {
- DuplicationsAvailable = val;
- }
-
- ui64 GetBatchNo() const noexcept {
- return BatchNo;
- }
-
- ui64 GetPortion() const noexcept {
- return Portion;
+ void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
+ const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter) {
+ Y_VERIFY(filter);
+ Y_VERIFY(!Filter);
+ Y_VERIFY(!FilterBatch);
+ Filter = filter;
+ FilterBatch = filterBatch;
+ OriginalRecordsCount = originalRecordsCount;
+ NotAppliedEarlyFilter = notAppliedEarlyFilter;
+ }
+
+ double GetUsefulDataKff() const {
+ if (!FilterBatch || !FilterBatch->num_rows()) {
+ return 0;
+ }
+ Y_VERIFY_DEBUG(OriginalRecordsCount);
+ if (!OriginalRecordsCount) {
+ return 0;
+ }
+ return 1.0 * FilterBatch->num_rows() / OriginalRecordsCount;
}
- ui64 GetGranule() const noexcept {
- return Granule;
+ bool IsFiltered() const {
+ return !!Filter;
}
- ui64 GetWaitingBytes() const noexcept {
- return WaitingBytes;
+ void InitBatch(std::shared_ptr<arrow::RecordBatch> fullBatch) {
+ Y_VERIFY(!FilteredBatch);
+ FilteredBatch = fullBatch;
}
- ui64 GetFetchedBytes() const noexcept {
- return FetchedBytes;
+ ui32 GetFilteredRecordsCount() const {
+ Y_VERIFY(IsFiltered());
+ if (!FilterBatch) {
+ return 0;
+ } else {
+ return FilterBatch->num_rows();
+ }
}
+};
- const std::optional<std::set<ui32>>& GetCurrentColumnIds() const noexcept {
- return CurrentColumnIds;
- }
+class TBatch {
+private:
+ const TBatchAddress BatchAddress;
+ YDB_READONLY(ui64, Portion, 0);
+ YDB_READONLY(ui64, Granule, 0);
+ YDB_READONLY(ui64, WaitingBytes, 0);
+ YDB_READONLY(ui64, FetchedBytes, 0);
- const std::shared_ptr<arrow::RecordBatch>& GetFilteredBatch() const noexcept {
- return FilteredBatch;
- }
+ THashSet<TBlobRange> WaitIndexed;
+
+ YDB_READONLY_FLAG(DuplicationsAvailable, false);
+ YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo);
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
+ TGranule* Owner;
+ const TPortionInfo* PortionInfo = nullptr;
- const std::shared_ptr<arrow::RecordBatch>& GetFilterBatch() const noexcept {
- return FilterBatch;
- }
+ YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
+ std::set<ui32> AskedColumnIds;
+ void ResetCommon(const std::set<ui32>& columnIds);
+ ui64 GetUsefulBytes(const ui64 bytes) const;
- const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const noexcept {
- return Filter;
+public:
+ bool AllowEarlyFilter() const {
+ return PortionInfo->AllowEarlyFilter();
}
-
- const std::shared_ptr<NArrow::TColumnFilter>& GetFutureFilter() const noexcept {
- return FutureFilter;
+ const TBatchAddress& GetBatchAddress() const {
+ return BatchAddress;
}
+ std::optional<ui32> GetMergePoolId() const;
ui64 GetUsefulWaitingBytes() const {
return GetUsefulBytes(WaitingBytes);
@@ -100,11 +106,10 @@ public:
return GetUsefulBytes(FetchedBytes);
}
- bool NeedAdditionalData() const;
bool IsSortableInGranule() const {
return PortionInfo->IsSortableInGranule();
}
- TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo);
+ TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo);
bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
@@ -112,19 +117,9 @@ public:
void ResetWithFilter(const std::set<ui32>& columnIds);
ui64 GetFetchBytes(const std::set<ui32>& columnIds);
- bool IsFiltered() const {
- return !!Filter;
- }
- ui32 GetFilteredRecordsCount() const {
- Y_VERIFY(IsFiltered());
- if (!FilterBatch) {
- return 0;
- } else {
- return FilterBatch->num_rows();
- }
- }
+ ui32 GetFilteredRecordsCount() const;
bool InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
- const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> futureFilter);
+ const ui32 originalRecordsCount, std::shared_ptr<NArrow::TColumnFilter> notAppliedEarlyFilter);
void InitBatch(std::shared_ptr<arrow::RecordBatch> batch);
NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata);
diff --git a/ydb/core/tx/columnshard/engines/reader/common.cpp b/ydb/core/tx/columnshard/engines/reader/common.cpp
new file mode 100644
index 0000000000..5e29cd8c9c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/common.cpp
@@ -0,0 +1,17 @@
+#include "common.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+TString TBatchAddress::ToString() const {
+ return TStringBuilder() << GranuleIdx << "," << BatchGranuleIdx;
+}
+
+TBatchAddress::TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx)
+ : GranuleIdx(granuleIdx)
+ , BatchGranuleIdx(batchGranuleIdx)
+{
+
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/common.h b/ydb/core/tx/columnshard/engines/reader/common.h
new file mode 100644
index 0000000000..c1dcc4cec6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/common.h
@@ -0,0 +1,18 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <util/system/types.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TBatchAddress {
+private:
+ YDB_READONLY(ui32, GranuleIdx, 0);
+ YDB_READONLY(ui32, BatchGranuleIdx, 0);
+public:
+ TString ToString() const;
+
+ TBatchAddress(const ui32 granuleIdx, const ui32 batchGranuleIdx);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index a8f6be4a73..eed248d5c9 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -4,13 +4,12 @@
namespace NKikimr::NOlap::NIndexedReader {
-TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount)
+TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading)
: ReadMetadata(readMetadata)
, InternalReading(internalReading)
, Owner(owner)
, Counters(owner.GetCounters())
{
- Batches.resize(batchesCount, nullptr);
SortingPolicy = InternalReading ? std::make_shared<TNonSorting>(ReadMetadata) : ReadMetadata->BuildSortingPolicy();
UsedColumns = ReadMetadata->GetUsedColumnIds();
@@ -44,11 +43,10 @@ void TGranulesFillingContext::OnBatchReady(const NIndexedReader::TBatch& batchIn
return Owner.OnBatchReady(batchInfo, batch);
}
-NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const ui32 batchNo) {
- Y_VERIFY(batchNo < Batches.size());
- auto ptr = Batches[batchNo];
- Y_VERIFY(ptr);
- return *ptr;
+NKikimr::NOlap::NIndexedReader::TBatch& TGranulesFillingContext::GetBatchInfo(const TBatchAddress& address) {
+ Y_VERIFY(address.GetGranuleIdx() < GranulesStorage.size());
+ auto& g = GranulesStorage[address.GetGranuleIdx()];
+ return g.GetBatchInfo(address.GetBatchGranuleIdx());
}
NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::GetTasksProcessor() const {
@@ -56,7 +54,7 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get
}
void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) {
- for (auto&& [_, g] : Granules) {
+ for (auto&& g : GranulesStorage) {
if (!batches) {
g.AddNotIndexedBatch(nullptr);
} else {
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index e9652e0b11..d70752a41e 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -18,19 +18,19 @@ private:
TIndexedReadData& Owner;
THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
std::set<ui64> ReadyGranulesAccumulator;
- THashMap<ui64, NIndexedReader::TGranule> Granules;
+ std::deque<NIndexedReader::TGranule> GranulesStorage;
+ THashMap<ui64, NIndexedReader::TGranule*> GranulesUpserted;
std::set<ui32> EarlyFilterColumns;
std::set<ui32> PostFilterColumns;
std::set<ui32> FilterStageColumns;
std::set<ui32> UsedColumns;
IOrderPolicy::TPtr SortingPolicy;
NColumnShard::TScanCounters Counters;
- std::vector<NIndexedReader::TBatch*> Batches;
bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
public:
- TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading, const ui32 batchesCount);
+ TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading);
TReadMetadata::TConstPtr GetReadMetadata() const noexcept {
return ReadMetadata;
@@ -55,18 +55,18 @@ public:
NColumnShard::TDataTasksProcessorContainer GetTasksProcessor() const;
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
- TBatch& GetBatchInfo(const ui32 batchNo);
+ TBatch& GetBatchInfo(const TBatchAddress& address);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
NIndexedReader::TGranule& GetGranuleVerified(const ui64 granuleId) {
- auto it = Granules.find(granuleId);
- Y_VERIFY(it != Granules.end());
- return it->second;
+ auto it = GranulesUpserted.find(granuleId);
+ Y_VERIFY(it != GranulesUpserted.end());
+ return *it->second;
}
- bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); }
+ bool IsInProgress() const { return GranulesStorage.size() > ReadyGranulesAccumulator.size(); }
void OnNewBatch(TBatch& batch) {
if (!InternalReading && PredictEmptyAfterFilter(batch.GetPortionInfo())) {
@@ -74,7 +74,6 @@ public:
} else {
batch.ResetNoFilter(UsedColumns);
}
- Batches[batch.GetBatchNo()] = &batch;
}
std::vector<TGranule*> DetachReadyInOrder() {
@@ -84,20 +83,21 @@ public:
void Abort() {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort");
- for (auto&& i : Granules) {
- ReadyGranulesAccumulator.emplace(i.first);
+ for (auto&& i : GranulesStorage) {
+ ReadyGranulesAccumulator.emplace(i.GetGranuleId());
}
AbortedFlag = true;
- Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size());
+ Y_VERIFY(ReadyGranulesAccumulator.size() == GranulesStorage.size());
Y_VERIFY(!IsInProgress());
}
TGranule& UpsertGranule(const ui64 granuleId) {
- auto itGranule = Granules.find(granuleId);
- if (itGranule == Granules.end()) {
- itGranule = Granules.emplace(granuleId, NIndexedReader::TGranule(granuleId, *this)).first;
+ auto itGranule = GranulesUpserted.find(granuleId);
+ if (itGranule == GranulesUpserted.end()) {
+ GranulesStorage.emplace_back(NIndexedReader::TGranule(granuleId, GranulesStorage.size(), *this));
+ itGranule = GranulesUpserted.emplace(granuleId, &GranulesStorage.back()).first;
}
- return itGranule->second;
+ return *itGranule->second;
}
void OnGranuleReady(TGranule& granule) {
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index c7f9c5f0ba..8f9079d3c3 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -21,16 +21,16 @@ bool TAssembleFilter::DoExecuteImpl() {
return true;
}
if (ReadMetadata->Program) {
- auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
if (AllowEarlyFilter) {
+ auto earlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
Filter->CombineSequential(*earlyFilter);
if (!earlyFilter->Apply(batch)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount);
FilteredBatch = nullptr;
return true;
}
- } else {
- EarlyFilter = earlyFilter;
+ } else if (BatchesOrderPolicy->NeedNotAppliedEarlyFilter()) {
+ EarlyFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::EarlyFilter(batch, ReadMetadata->Program));
}
}
@@ -51,7 +51,7 @@ bool TAssembleFilter::DoExecuteImpl() {
}
bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const {
- TBatch& batch = owner.GetBatchInfo(BatchNo);
+ TBatch& batch = owner.GetBatchInfo(BatchAddress);
Y_VERIFY(OriginalCount);
owner.GetCounters().OriginalRowsCount->Add(OriginalCount);
owner.GetCounters().AssembleFilterCount->Add(1);
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
index 5b32be9363..d8efa4e8e9 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -1,4 +1,5 @@
#pragma once
+#include "common.h"
#include "conveyor_task.h"
#include <ydb/core/formats/arrow/arrow_filter.h>
@@ -17,22 +18,25 @@ namespace NKikimr::NOlap::NIndexedReader {
NOlap::TReadMetadata::TConstPtr ReadMetadata;
std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
- const ui32 BatchNo;
+ const TBatchAddress BatchAddress;
ui32 OriginalCount = 0;
bool AllowEarlyFilter = false;
std::set<ui32> FilterColumnIds;
+ IOrderPolicy::TPtr BatchesOrderPolicy;
protected:
virtual bool DoApply(TGranulesFillingContext& owner) const override;
virtual bool DoExecuteImpl() override;
public:
TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
- TBatch& batch, const bool allowEarlyFilter, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor)
+ TBatch& batch, const std::set<ui32>& filterColumnIds, NColumnShard::IDataTasksProcessor::TPtr processor,
+ IOrderPolicy::TPtr batchesOrderPolicy)
: TBase(processor)
, BatchConstructor(batchConstructor)
, ReadMetadata(readMetadata)
- , BatchNo(batch.GetBatchNo())
- , AllowEarlyFilter(allowEarlyFilter)
+ , BatchAddress(batch.GetBatchAddress())
+ , AllowEarlyFilter(batch.AllowEarlyFilter())
, FilterColumnIds(filterColumnIds)
+ , BatchesOrderPolicy(batchesOrderPolicy)
{
TBase::SetPriority(TBase::EPriority::Normal);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index bd633044fc..217e46911f 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -12,9 +12,10 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
return;
}
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)("batch_no", batchInfo.GetBatchNo())("count", WaitBatches.size());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "new_batch")("granule_id", GranuleId)
+ ("batch_address", batchInfo.GetBatchAddress().ToString())("count", WaitBatches.size());
Y_VERIFY(!ReadyFlag);
- Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo()));
+ Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchAddress().GetBatchGranuleIdx()));
if (batch && batch->num_rows()) {
if (batchInfo.IsSortableInGranule()) {
SortableBatches.emplace_back(batch);
@@ -35,11 +36,12 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
CheckReady();
}
-NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) {
+NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const TPortionInfo& portionInfo) {
Y_VERIFY(!ReadyFlag);
- WaitBatches.emplace(batchNo);
- Batches.emplace_back(TBatch(batchNo, *this, portionInfo));
- Y_VERIFY(GranuleBatchNumbers.emplace(batchNo).second);
+ ui32 batchGranuleIdx = Batches.size();
+ WaitBatches.emplace(batchGranuleIdx);
+ Batches.emplace_back(TBatch(TBatchAddress(GranuleIdx, batchGranuleIdx), *this, portionInfo));
+ Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second);
Owner->OnNewBatch(Batches.back());
return Batches.back();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index eddaa66dc1..442af41ed9 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -14,6 +14,7 @@ class TGranulesFillingContext;
class TGranule {
private:
ui64 GranuleId = 0;
+ YDB_READONLY(ui64, GranuleIdx, 0);
bool NotIndexedBatchReadyFlag = false;
std::shared_ptr<arrow::RecordBatch> NotIndexedBatch;
@@ -31,8 +32,9 @@ private:
void CheckReady();
public:
- TGranule(const ui64 granuleId, TGranulesFillingContext& owner)
+ TGranule(const ui64 granuleId, const ui64 granuleIdx, TGranulesFillingContext& owner)
: GranuleId(granuleId)
+ , GranuleIdx(granuleIdx)
, Owner(&owner) {
}
@@ -79,6 +81,11 @@ public:
return result;
}
+ TBatch& GetBatchInfo(const ui32 batchIdx) {
+ Y_VERIFY(batchIdx < Batches.size());
+ return Batches[batchIdx];
+ }
+
void AddNotIndexedBatch(std::shared_ptr<arrow::RecordBatch> batch);
const TGranulesFillingContext& GetOwner() const {
@@ -89,7 +96,7 @@ public:
const std::set<ui32>& GetEarlyFilterColumns() const;
void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
bool OnFilterReady(TBatch& batchInfo);
- TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo);
+ TBatch& AddBatch(const TPortionInfo& portionInfo);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const;
};
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
index 8f51265b7b..7ba4e4c250 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.cpp
@@ -53,16 +53,12 @@ bool TPKSortingWithLimit::DoWakeup(const TGranule& granule, TGranulesFillingCont
}
if (g.Start()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "granule_started")("granule_id", g.GetGranule()->GetGranuleId())("count", GranulesOutOrderForPortions.size());
- MergeStream.AddIndependentSource(g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
+ MergeStream.AddPoolSource({}, g.GetGranule()->GetNotIndexedBatch(), g.GetGranule()->GetNotIndexedBatchFutureFilter());
}
auto& batches = g.GetBatches();
- while (batches.size() && batches.front()->IsFiltered() && CurrentItemsLimit) {
+ while (batches.size() && batches.front()->GetFetchedInfo().IsFiltered() && CurrentItemsLimit) {
auto b = batches.front();
- if (b->IsSortableInGranule()) {
- MergeStream.AddPoolSource(0, b->GetFilterBatch());
- } else {
- MergeStream.AddIndependentSource(b->GetFilterBatch(), b->GetFutureFilter());
- }
+ MergeStream.AddPoolSource(b->GetMergePoolId(), b->GetFetchedInfo().GetFilterBatch(), b->GetFetchedInfo().GetNotAppliedEarlyFilter());
OnBatchFilterInitialized(*b, context);
batches.pop_front();
}
@@ -118,42 +114,43 @@ TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
CurrentItemsLimit = ReadMetadata->Limit;
}
-void IOrderPolicy::OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context) {
+void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFillingContext& context) {
+ auto& batch = batchOriginal.GetFetchedInfo();
Y_VERIFY(!!batch.GetFilter());
if (!batch.GetFilteredRecordsCount()) {
context.GetCounters().EmptyFilterCount->Add(1);
- context.GetCounters().EmptyFilterFetchedBytes->Add(batch.GetFetchedBytes());
- context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
- batch.InitBatch(nullptr);
+ context.GetCounters().EmptyFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
+ context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
+ batchOriginal.InitBatch(nullptr);
} else {
context.GetCounters().FilteredRowsCount->Add(batch.GetFilterBatch()->num_rows());
- if (batch.AskedColumnsAlready(context.GetPostFilterColumns())) {
+ if (batchOriginal.AskedColumnsAlready(context.GetPostFilterColumns())) {
context.GetCounters().FilterOnlyCount->Add(1);
- context.GetCounters().FilterOnlyFetchedBytes->Add(batch.GetFetchedBytes());
- context.GetCounters().FilterOnlyUsefulBytes->Add(batch.GetUsefulFetchedBytes());
- context.GetCounters().SkippedBytes->Add(batch.GetFetchBytes(context.GetPostFilterColumns()));
+ context.GetCounters().FilterOnlyFetchedBytes->Add(batchOriginal.GetFetchedBytes());
+ context.GetCounters().FilterOnlyUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
+ context.GetCounters().SkippedBytes->Add(batchOriginal.GetFetchBytes(context.GetPostFilterColumns()));
- batch.InitBatch(batch.GetFilterBatch());
+ batchOriginal.InitBatch(batch.GetFilterBatch());
} else {
- context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batch.GetFetchedBytes());
- context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batch.GetUsefulFetchedBytes());
+ context.GetCounters().TwoPhasesFilterFetchedBytes->Add(batchOriginal.GetFetchedBytes());
+ context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
- batch.ResetWithFilter(context.GetPostFilterColumns());
- if (batch.IsFetchingReady()) {
+ batchOriginal.ResetWithFilter(context.GetPostFilterColumns());
+ if (batchOriginal.IsFetchingReady()) {
auto processor = context.GetTasksProcessor();
- if (auto assembleBatchTask = batch.AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
+ if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
processor.Add(context, assembleBatchTask);
}
}
context.GetCounters().TwoPhasesCount->Add(1);
- context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batch.GetWaitingBytes());
- context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batch.GetUsefulWaitingBytes());
+ context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes());
+ context.GetCounters().TwoPhasesPostFilterUsefulBytes->Add(batchOriginal.GetUsefulWaitingBytes());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
("filtered_count", batch.GetFilterBatch()->num_rows())
- ("blobs_count", batch.GetWaitingBlobs().size())
- ("columns_count", batch.GetCurrentColumnIds()->size())
- ("fetch_size", batch.GetWaitingBytes())
+ ("blobs_count", batchOriginal.GetWaitingBlobs().size())
+ ("columns_count", batchOriginal.GetCurrentColumnIds()->size())
+ ("fetch_size", batchOriginal.GetWaitingBytes())
;
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/order_controller.h b/ydb/core/tx/columnshard/engines/reader/order_controller.h
index 842488aeb8..66f2fba42b 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_controller.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_controller.h
@@ -8,6 +8,14 @@ namespace NKikimr::NOlap::NIndexedReader {
class TGranulesFillingContext;
class IOrderPolicy {
+public:
+ enum class EFeatures: ui32 {
+ CanInterrupt = 1,
+ NeedNotAppliedEarlyFilter = 1 << 1
+ };
+ using TFeatures = ui32;
+private:
+ mutable std::optional<TFeatures> Features;
protected:
TReadMetadata::TConstPtr ReadMetadata;
virtual void DoFill(TGranulesFillingContext& context) = 0;
@@ -21,6 +29,15 @@ protected:
}
void OnBatchFilterInitialized(TBatch& batch, TGranulesFillingContext& context);
+ virtual TFeatures DoGetFeatures() const {
+ return 0;
+ }
+ TFeatures GetFeatures() const {
+ if (!Features) {
+ Features = DoGetFeatures();
+ }
+ return *Features;
+ }
public:
using TPtr = std::shared_ptr<IOrderPolicy>;
virtual ~IOrderPolicy() = default;
@@ -35,8 +52,12 @@ public:
}
- virtual bool CanInterrupt() const {
- return false;
+ bool CanInterrupt() const {
+ return GetFeatures() & (TFeatures)EFeatures::CanInterrupt;
+ }
+
+ bool NeedNotAppliedEarlyFilter() const {
+ return GetFeatures() & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
}
bool OnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) {
@@ -143,6 +164,10 @@ protected:
virtual void DoFill(TGranulesFillingContext& context) override;
virtual std::vector<TGranule*> DoDetachReadyGranules(THashMap<ui64, NIndexedReader::TGranule*>& granulesToOut) override;
virtual bool DoOnFilterReady(TBatch& batchInfo, const TGranule& granule, TGranulesFillingContext& context) override;
+ virtual TFeatures DoGetFeatures() const override {
+ return (TFeatures)EFeatures::CanInterrupt & (TFeatures)EFeatures::NeedNotAppliedEarlyFilter;
+ }
+
public:
virtual std::set<ui32> GetFilterStageColumns() override {
std::set<ui32> result = ReadMetadata->GetEarlyFilterColumnIds();
@@ -152,10 +177,6 @@ public:
return result;
}
- virtual bool CanInterrupt() const override {
- return true;
- }
-
TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata);
virtual bool ReadyForAddNotIndexedToEnd() const override {
return ReadMetadata->IsDescSorted() && GranulesOutOrder.empty();
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index e857d671f3..b1c4bbb542 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -34,7 +34,7 @@ bool TAssembleBatch::DoExecuteImpl() {
}
bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const {
- TBatch& batch = owner.GetBatchInfo(BatchNo);
+ TBatch& batch = owner.GetBatchInfo(BatchAddress);
batch.InitBatch(FullBatch);
return true;
}
@@ -45,14 +45,14 @@ TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstruct
: TBase(processor)
, BatchConstructor(batchConstructor)
, FullColumnsOrder(fullColumnsOrder)
- , Filter(currentBatch.GetFilter())
- , FilterBatch(currentBatch.GetFilterBatch())
- , BatchNo(currentBatch.GetBatchNo())
+ , Filter(currentBatch.GetFetchedInfo().GetFilter())
+ , FilterBatch(currentBatch.GetFetchedInfo().GetFilterBatch())
+ , BatchAddress(currentBatch.GetBatchAddress())
{
TBase::SetPriority(TBase::EPriority::High);
- Y_VERIFY(currentBatch.GetFilter());
- Y_VERIFY(currentBatch.GetFilterBatch());
- Y_VERIFY(!currentBatch.GetFilteredBatch());
+ Y_VERIFY(currentBatch.GetFetchedInfo().GetFilter());
+ Y_VERIFY(currentBatch.GetFetchedInfo().GetFilterBatch());
+ Y_VERIFY(!currentBatch.GetFetchedInfo().GetFilteredBatch());
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
index 0dad312c34..db09e5ea06 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
@@ -1,4 +1,5 @@
#pragma once
+#include "common.h"
#include "conveyor_task.h"
#include <ydb/core/tx/columnshard/engines/portion_info.h>
@@ -18,7 +19,7 @@ private:
std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<arrow::RecordBatch> FilterBatch;
- const ui32 BatchNo;
+ const TBatchAddress BatchAddress;
protected:
virtual bool DoApply(TGranulesFillingContext& owner) const override;
virtual bool DoExecuteImpl() override;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index b25b7a4338..1c481b528c 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -13,8 +13,8 @@ class TMergePartialStream {
private:
class TBatchIterator {
private:
- i64 Position = 0;
- std::optional<ui32> PoolId;
+ YDB_ACCESSOR(i64, Position, 0);
+ YDB_OPT(ui32, PoolId);
std::shared_ptr<arrow::RecordBatch> Batch;
std::shared_ptr<NArrow::TColumnFilter> Filter;
@@ -79,17 +79,9 @@ private:
}
}
- bool HasPoolId() const noexcept {
- return PoolId.has_value();
- }
-
- ui32 GetPoolIdUnsafe() const noexcept {
- return *PoolId;
- }
-
bool CheckNextBatch(const TBatchIterator& nextIterator) {
Y_VERIFY_DEBUG(nextIterator.Columns.size() == Columns.size());
- return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, 0) * ReverseSortKff < 0;
+ return NArrow::ColumnsCompare(Columns, GetLastPosition(), nextIterator.Columns, nextIterator.GetFirstPosition()) * ReverseSortKff < 0;
}
class TPosition {
@@ -163,6 +155,19 @@ private:
}
};
+ class TIteratorData {
+ private:
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ public:
+ TIteratorData(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter)
+ : Batch(batch)
+ , Filter(filter)
+ {
+
+ }
+ };
+
bool NextInHeap(const bool needPop) {
if (SortHeap.empty()) {
return false;
@@ -182,16 +187,17 @@ private:
SortHeap.pop_back();
} else {
it->second.pop_front();
- TBatchIterator newIterator(it->second.front(), nullptr, SortSchema, Reverse, SortHeap.back().GetPoolIdUnsafe());
- SortHeap.back().CheckNextBatch(newIterator);
- std::swap(SortHeap.back(), newIterator);
+ TBatchIterator oldIterator = std::move(SortHeap.back());
+ SortHeap.pop_back();
+ AddToHeap(SortHeap.back().GetPoolIdUnsafe(), it->second.front().GetBatch(), it->second.front().GetFilter(), false);
+ oldIterator.CheckNextBatch(SortHeap.back());
std::push_heap(SortHeap.begin(), SortHeap.end());
}
}
return SortHeap.size();
}
- THashMap<ui32, std::deque<std::shared_ptr<arrow::RecordBatch>>> BatchPools;
+ THashMap<ui32, std::deque<TIteratorData>> BatchPools;
std::vector<std::shared_ptr<arrow::RecordBatch>> IndependentBatches;
std::vector<TBatchIterator> SortHeap;
std::shared_ptr<arrow::Schema> SortSchema;
@@ -210,6 +216,19 @@ private:
}
return position;
}
+
+ void AddToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) {
+ if (!filter || filter->IsTotalAllowFilter()) {
+ SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId));
+ } else if (filter->IsTotalDenyFilter()) {
+ return;
+ } else {
+ SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, poolId));
+ }
+ if (restoreHeap) {
+ std::push_heap(SortHeap.begin(), SortHeap.end());
+ }
+ }
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse)
: SortSchema(sortSchema)
@@ -221,22 +240,6 @@ public:
return SortHeap.size();
}
- void AddIndependentSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
- if (!batch || !batch->num_rows()) {
- return;
- }
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
- IndependentBatches.emplace_back(batch);
- if (!filter || filter->IsTotalAllowFilter()) {
- SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, {}));
- } else if (filter->IsTotalDenyFilter()) {
- return;
- } else {
- SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema, Reverse, {}));
- }
- std::push_heap(SortHeap.begin(), SortHeap.end());
- }
-
bool HasRecordsInPool(const ui32 poolId) const {
auto it = BatchPools.find(poolId);
if (it == BatchPools.end()) {
@@ -245,18 +248,23 @@ public:
return it->second.size();
}
- void AddPoolSource(const ui32 poolId, std::shared_ptr<arrow::RecordBatch> batch) {
+ void AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
- auto it = BatchPools.find(poolId);
- if (it == BatchPools.end()) {
- it = BatchPools.emplace(poolId, std::deque<std::shared_ptr<arrow::RecordBatch>>()).first;
- }
- it->second.emplace_back(batch);
- if (it->second.size() == 1) {
- SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema, Reverse, poolId));
- std::push_heap(SortHeap.begin(), SortHeap.end());
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
+ if (!poolId) {
+ IndependentBatches.emplace_back(batch);
+ AddToHeap(poolId, batch, filter, true);
+ } else {
+ auto it = BatchPools.find(*poolId);
+ if (it == BatchPools.end()) {
+ it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first;
+ }
+ it->second.emplace_back(batch, filter);
+ if (it->second.size() == 1) {
+ AddToHeap(poolId, batch, filter, true);
+ }
}
}