aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-26 13:58:32 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-26 13:58:32 +0300
commited058cefb2cdc6ce20f6ef2e91d61e8829715ddc (patch)
treeaca8ca939695b5be2be8d9ed6edac6e9e070d3d3
parent7f8bc6caf32ad5a761d92e19f5bbb5e076aece4e (diff)
downloadydb-ed058cefb2cdc6ce20f6ef2e91d61e8829715ddc.tar.gz
additional signals
early stop usage
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp4
-rw-r--r--ydb/core/tx/columnshard/counters.cpp27
-rw-r--r--ydb/core/tx/columnshard/counters.h27
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp23
8 files changed, 77 insertions, 26 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 065929dec5..d3eaa60dfa 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -113,10 +113,10 @@ TColumnShardScanIterator::~TColumnShardScanIterator() {
}
void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
- if (!task->IsDataProcessed()) {
+ if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped()) {
return;
}
- task->Apply(IndexedData);
+ Y_VERIFY(task->Apply(IndexedData));
}
}
diff --git a/ydb/core/tx/columnshard/counters.cpp b/ydb/core/tx/columnshard/counters.cpp
index a660582add..5a11fad76e 100644
--- a/ydb/core/tx/columnshard/counters.cpp
+++ b/ydb/core/tx/columnshard/counters.cpp
@@ -6,18 +6,29 @@ namespace NKikimr::NColumnShard {
TScanCounters::TScanCounters(const TString& module) {
::NMonitoring::TDynamicCounterPtr subGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard");
+
PortionBytes = subGroup->GetCounter(module + "/PortionBytes", true);
FilterBytes = subGroup->GetCounter(module + "/FilterBytes", true);
PostFilterBytes = subGroup->GetCounter(module + "/PostFilterBytes", true);
- PostFilterPortionsCount = subGroup->GetCounter(module + "/PostFilterPortionsCount", true);
- FilterOnlyPortionsCount = subGroup->GetCounter(module + "/FilterOnlyPortionsCount", true);
- FilterOnlyPortionsBytes = subGroup->GetCounter(module + "/FilterOnlyPortionsBytes", true);
- EmptyFilterPortionsCount = subGroup->GetCounter(module + "/EmptyFilterPortionsCount", true);
- EmptyFilterPortionsBytes = subGroup->GetCounter(module + "/EmptyFilterPortionsBytes", true);
- FilteredRowsCount = subGroup->GetCounter(module + "/FilteredRowsCount", true);
- UsefulFilterBytes = subGroup->GetCounter(module + "/UsefulFilterBytes", true);
- UsefulPostFilterBytes = subGroup->GetCounter(module + "/UsefulPostFilterBytes", true);
+
+ AssembleFilterCount = subGroup->GetCounter(module + "/AssembleFilterCount", true);
+
+ FilterOnlyCount = subGroup->GetCounter(module + "/FilterOnlyCount", true);
+ FilterOnlyFetchedBytes = subGroup->GetCounter(module + "/FilterOnlyFetchedBytes", true);
+ FilterOnlyUsefulBytes = subGroup->GetCounter(module + "/FilterOnlyUsefulBytes", true);
+
+ EmptyFilterCount = subGroup->GetCounter(module + "/EmptyFilterCount", true);
+ EmptyFilterFetchedBytes = subGroup->GetCounter(module + "/EmptyFilterFetchedBytes", true);
+
OriginalRowsCount = subGroup->GetCounter(module + "/OriginalRowsCount", true);
+ FilteredRowsCount = subGroup->GetCounter(module + "/FilteredRowsCount", true);
+ SkippedBytes = subGroup->GetCounter(module + "/SkippedBytes", true);
+
+ TwoPhasesCount = subGroup->GetCounter(module + "/TwoPhasesCount", true);
+ TwoPhasesFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesFilterFetchedBytes", true);
+ TwoPhasesFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesFilterUsefulBytes", true);
+ TwoPhasesPostFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterFetchedBytes", true);
+ TwoPhasesPostFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterUsefulBytes", true);
}
}
diff --git a/ydb/core/tx/columnshard/counters.h b/ydb/core/tx/columnshard/counters.h
index 1801a18767..f862fb2ccd 100644
--- a/ydb/core/tx/columnshard/counters.h
+++ b/ydb/core/tx/columnshard/counters.h
@@ -9,16 +9,25 @@ private:
YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PortionBytes);
YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterBytes);
YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterPortionsCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyPortionsCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyPortionsBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterPortionsCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterPortionsBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilteredRowsCount);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, UsefulFilterBytes);
- YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, UsefulPostFilterBytes);
+
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, AssembleFilterCount);
+
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyCount);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyFetchedBytes);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyUsefulBytes);
+
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterCount);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterFetchedBytes);
+
YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, OriginalRowsCount);
-
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilteredRowsCount);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, SkippedBytes);
+
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesCount);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterFetchedBytes);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterUsefulBytes);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterFetchedBytes);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterUsefulBytes);
public:
TScanCounters(const TString& module = "Scan");
};
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index b8aa8a4336..776760b3dd 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -48,6 +48,20 @@ bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const {
return true;
}
+ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) {
+ ui64 result = 0;
+ for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
+ if (columnIds && !columnIds->contains(rec.ColumnId)) {
+ continue;
+ }
+ Y_VERIFY(rec.Portion == Portion);
+ Y_VERIFY(rec.Valid());
+ Y_VERIFY(Granule == rec.Granule);
+ result += rec.BlobRange.Size;
+ }
+ return result;
+}
+
void TBatch::Reset(const std::set<ui32>* columnIds) {
if (!columnIds) {
CurrentColumnIds.reset();
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index 475ec5323d..d6dd76809c 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -44,6 +44,7 @@ public:
bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
void Reset(const std::set<ui32>* columnIds);
+ ui64 GetFetchBytes(const std::set<ui32>* columnIds);
void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch);
void InitBatch(std::shared_ptr<arrow::RecordBatch> batch);
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index e41445afab..c013266d6f 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -15,6 +15,9 @@ bool IDataTasksProcessor::ITask::DoExecute() {
bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) const {
if (OwnerOperator) {
OwnerOperator->ReplyReceived();
+ if (OwnerOperator->IsStopped()) {
+ return true;
+ }
}
return DoApply(indexedDataRead);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
index f9d7adacca..efb808cdac 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -79,6 +79,10 @@ public:
return Object && Object->InWaiting();
}
+ bool IsStopped() const {
+ return Object && Object->IsStopped();
+ }
+
void Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task);
};
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index f11ddbe863..0354d5d4f2 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -41,21 +41,30 @@ bool TAssembleFilter::DoApply(TIndexedReadData& owner) const {
Y_VERIFY(OriginalCount);
owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount);
batch.InitFilter(Filter, FilteredBatch);
+ owner.GetCounters().GetAssembleFilterCount()->Add(1);
if (!FilteredBatch || FilteredBatch->num_rows() == 0) {
- owner.GetCounters().GetEmptyFilterPortionsCount()->Add(1);
- owner.GetCounters().GetEmptyFilterPortionsBytes()->Add(batch.GetFetchedBytes());
+ owner.GetCounters().GetEmptyFilterCount()->Add(1);
+ owner.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes());
+ owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns()));
batch.InitBatch(FilteredBatch);
} else {
owner.GetCounters().GetFilteredRowsCount()->Add(FilteredBatch->num_rows());
- owner.GetCounters().GetUsefulFilterBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount);
if (batch.AskedColumnsAlready(owner.GetPostFilterColumns())) {
- owner.GetCounters().GetFilterOnlyPortionsCount()->Add(1);
- owner.GetCounters().GetFilterOnlyPortionsBytes()->Add(batch.GetFetchedBytes());
+ owner.GetCounters().GetFilterOnlyCount()->Add(1);
+ owner.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes());
+ owner.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount);
+ owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns()));
+
batch.InitBatch(FilteredBatch);
} else {
- owner.GetCounters().GetPostFilterPortionsCount()->Add(1);
+ owner.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes());
+ owner.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount);
+
batch.Reset(&owner.GetPostFilterColumns());
- owner.GetCounters().GetUsefulPostFilterBytes()->Add(batch.GetWaitingBytes() * FilteredBatch->num_rows() / OriginalCount);
+
+ owner.GetCounters().GetTwoPhasesCount()->Add(1);
+ owner.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes());
+ owner.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetWaitingBytes() * FilteredBatch->num_rows() / OriginalCount);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
("filtered_count", FilteredBatch->num_rows())
("blobs_count", batch.GetWaitingBlobs().size())