aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-30 09:24:13 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-30 09:24:13 +0300
commit46510d6c88912219669dfe229e9f6b55603d97ca (patch)
treeb41c88538f080b125a2792d087414ba0bde0ec0b
parent4151a49ff0f7a9c3eab84ccaf38d2a0ab64c189f (diff)
downloadydb-46510d6c88912219669dfe229e9f6b55603d97ca.tar.gz
move fetching check usage inside
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp21
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp6
5 files changed, 26 insertions, 18 deletions
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 90758cc318..770d55539f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -171,11 +171,6 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da
if (!portionBatch->AddIndexedReady(blobRange, data)) {
return;
}
- if (portionBatch->IsFetchingReady()) {
- if (auto batch = portionBatch->AssembleTask(TasksProcessor.GetObject(), ReadMetadata)) {
- TasksProcessor.Add(*GranulesContext, batch);
- }
- }
}
std::shared_ptr<arrow::RecordBatch>
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 42e8d8694c..b05c74a9a9 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -12,14 +12,15 @@ TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo
, Portion(portionInfo.Records[0].Portion)
, Granule(owner.GetGranuleId())
, Owner(&owner)
- , PortionInfo(&portionInfo) {
+ , PortionInfo(&portionInfo)
+{
Y_VERIFY(portionInfo.Records.size());
if (portionInfo.CanIntersectOthers()) {
- AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "intersect_portion");
+ ACFL_TRACE("event", "intersect_portion");
Owner->SetDuplicationsAvailable(true);
if (portionInfo.CanHaveDups()) {
- AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "dup_portion");
+ ACFL_TRACE("event", "dup_portion");
DuplicationsAvailableFlag = true;
}
}
@@ -143,6 +144,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
}
}
}
+ CheckReadyForAssemble();
}
bool TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch,
@@ -166,6 +168,7 @@ bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData)
FetchedBytes += bRange.Size;
Data.emplace(bRange, TPortionInfo::TAssembleBlobInfo(blobData));
Owner->OnBlobReady(bRange);
+ CheckReadyForAssemble();
return true;
}
@@ -212,4 +215,16 @@ void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::
}
}
+bool TBatch::CheckReadyForAssemble() {
+ if (IsFetchingReady()) {
+ auto& context = Owner->GetOwner();
+ auto processor = context.GetTasksProcessor();
+ if (auto assembleBatchTask = AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
+ processor.Add(context, assembleBatchTask);
+ }
+ return true;
+ }
+ return false;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 86b767c88a..40969f6e91 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -93,6 +93,10 @@ private:
std::set<ui32> AskedColumnIds;
void ResetCommon(const std::set<ui32>& columnIds);
ui64 GetUsefulBytes(const ui64 bytes) const;
+ bool CheckReadyForAssemble();
+ bool IsFetchingReady() const {
+ return WaitIndexed.empty();
+ }
public:
std::shared_ptr<TSortableBatchPosition> GetFirstPK(const bool reverse, const TIndexInfo& indexInfo) const;
@@ -136,10 +140,6 @@ public:
return *Owner;
}
- bool IsFetchingReady() const {
- return WaitIndexed.empty();
- }
-
const TPortionInfo& GetPortionInfo() const {
return *PortionInfo;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index f2318c4c9f..8fe3ca8adf 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -88,6 +88,10 @@ public:
return *Owner;
}
+ TGranulesFillingContext& GetOwner() {
+ return *Owner;
+ }
+
class TBatchForMerge {
private:
TBatch* Batch = nullptr;
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
index b565d57306..63e96f8527 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/abstract.cpp
@@ -25,12 +25,6 @@ void IOrderPolicy::OnBatchFilterInitialized(TBatch& batchOriginal, TGranulesFill
context.GetCounters().TwoPhasesFilterUsefulBytes->Add(batchOriginal.GetUsefulFetchedBytes());
batchOriginal.ResetWithFilter(context.GetPostFilterColumns());
- if (batchOriginal.IsFetchingReady()) {
- auto processor = context.GetTasksProcessor();
- if (auto assembleBatchTask = batchOriginal.AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
- processor.Add(context, assembleBatchTask);
- }
- }
context.GetCounters().TwoPhasesCount->Add(1);
context.GetCounters().TwoPhasesPostFilterFetchedBytes->Add(batchOriginal.GetWaitingBytes());