diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-26 13:58:32 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-26 13:58:32 +0300 |
commit | ed058cefb2cdc6ce20f6ef2e91d61e8829715ddc (patch) | |
tree | aca8ca939695b5be2be8d9ed6edac6e9e070d3d3 | |
parent | 7f8bc6caf32ad5a761d92e19f5bbb5e076aece4e (diff) | |
download | ydb-ed058cefb2cdc6ce20f6ef2e91d61e8829715ddc.tar.gz |
additional signals
early stop usage
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__index_scan.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters.h | 27 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/batch.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/batch.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/conveyor_task.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp | 23 |
8 files changed, 77 insertions, 26 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 065929dec5..d3eaa60dfa 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -113,10 +113,10 @@ TColumnShardScanIterator::~TColumnShardScanIterator() { } void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) { - if (!task->IsDataProcessed()) { + if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped()) { return; } - task->Apply(IndexedData); + Y_VERIFY(task->Apply(IndexedData)); } } diff --git a/ydb/core/tx/columnshard/counters.cpp b/ydb/core/tx/columnshard/counters.cpp index a660582add..5a11fad76e 100644 --- a/ydb/core/tx/columnshard/counters.cpp +++ b/ydb/core/tx/columnshard/counters.cpp @@ -6,18 +6,29 @@ namespace NKikimr::NColumnShard { TScanCounters::TScanCounters(const TString& module) { ::NMonitoring::TDynamicCounterPtr subGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard"); + PortionBytes = subGroup->GetCounter(module + "/PortionBytes", true); FilterBytes = subGroup->GetCounter(module + "/FilterBytes", true); PostFilterBytes = subGroup->GetCounter(module + "/PostFilterBytes", true); - PostFilterPortionsCount = subGroup->GetCounter(module + "/PostFilterPortionsCount", true); - FilterOnlyPortionsCount = subGroup->GetCounter(module + "/FilterOnlyPortionsCount", true); - FilterOnlyPortionsBytes = subGroup->GetCounter(module + "/FilterOnlyPortionsBytes", true); - EmptyFilterPortionsCount = subGroup->GetCounter(module + "/EmptyFilterPortionsCount", true); - EmptyFilterPortionsBytes = subGroup->GetCounter(module + "/EmptyFilterPortionsBytes", true); - FilteredRowsCount = subGroup->GetCounter(module + "/FilteredRowsCount", true); - UsefulFilterBytes = subGroup->GetCounter(module + "/UsefulFilterBytes", true); - UsefulPostFilterBytes = subGroup->GetCounter(module + "/UsefulPostFilterBytes", true); + + AssembleFilterCount = subGroup->GetCounter(module + "/AssembleFilterCount", true); + + FilterOnlyCount = subGroup->GetCounter(module + "/FilterOnlyCount", true); + FilterOnlyFetchedBytes = subGroup->GetCounter(module + "/FilterOnlyFetchedBytes", true); + FilterOnlyUsefulBytes = subGroup->GetCounter(module + "/FilterOnlyUsefulBytes", true); + + EmptyFilterCount = subGroup->GetCounter(module + "/EmptyFilterCount", true); + EmptyFilterFetchedBytes = subGroup->GetCounter(module + "/EmptyFilterFetchedBytes", true); + OriginalRowsCount = subGroup->GetCounter(module + "/OriginalRowsCount", true); + FilteredRowsCount = subGroup->GetCounter(module + "/FilteredRowsCount", true); + SkippedBytes = subGroup->GetCounter(module + "/SkippedBytes", true); + + TwoPhasesCount = subGroup->GetCounter(module + "/TwoPhasesCount", true); + TwoPhasesFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesFilterFetchedBytes", true); + TwoPhasesFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesFilterUsefulBytes", true); + TwoPhasesPostFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterFetchedBytes", true); + TwoPhasesPostFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterUsefulBytes", true); } } diff --git a/ydb/core/tx/columnshard/counters.h b/ydb/core/tx/columnshard/counters.h index 1801a18767..f862fb2ccd 100644 --- a/ydb/core/tx/columnshard/counters.h +++ b/ydb/core/tx/columnshard/counters.h @@ -9,16 +9,25 @@ private: YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PortionBytes); YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterBytes); YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterPortionsCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyPortionsCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyPortionsBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterPortionsCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterPortionsBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilteredRowsCount); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, UsefulFilterBytes); - YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, UsefulPostFilterBytes); + + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, AssembleFilterCount); + + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyCount); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyFetchedBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterOnlyUsefulBytes); + + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterCount); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, EmptyFilterFetchedBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, OriginalRowsCount); - + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilteredRowsCount); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, SkippedBytes); + + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesCount); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterFetchedBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesFilterUsefulBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterFetchedBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, TwoPhasesPostFilterUsefulBytes); public: TScanCounters(const TString& module = "Scan"); }; diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index b8aa8a4336..776760b3dd 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -48,6 +48,20 @@ bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const { return true; } +ui64 TBatch::GetFetchBytes(const std::set<ui32>* columnIds) { + ui64 result = 0; + for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { + if (columnIds && !columnIds->contains(rec.ColumnId)) { + continue; + } + Y_VERIFY(rec.Portion == Portion); + Y_VERIFY(rec.Valid()); + Y_VERIFY(Granule == rec.Granule); + result += rec.BlobRange.Size; + } + return result; +} + void TBatch::Reset(const std::set<ui32>* columnIds) { if (!columnIds) { CurrentColumnIds.reset(); diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index 475ec5323d..d6dd76809c 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -44,6 +44,7 @@ public: bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; void Reset(const std::set<ui32>* columnIds); + ui64 GetFetchBytes(const std::set<ui32>* columnIds); void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch); void InitBatch(std::shared_ptr<arrow::RecordBatch> batch); diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index e41445afab..c013266d6f 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -15,6 +15,9 @@ bool IDataTasksProcessor::ITask::DoExecute() { bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) const { if (OwnerOperator) { OwnerOperator->ReplyReceived(); + if (OwnerOperator->IsStopped()) { + return true; + } } return DoApply(indexedDataRead); } diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h index f9d7adacca..efb808cdac 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -79,6 +79,10 @@ public: return Object && Object->InWaiting(); } + bool IsStopped() const { + return Object && Object->IsStopped(); + } + void Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task); }; diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index f11ddbe863..0354d5d4f2 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -41,21 +41,30 @@ bool TAssembleFilter::DoApply(TIndexedReadData& owner) const { Y_VERIFY(OriginalCount); owner.GetCounters().GetOriginalRowsCount()->Add(OriginalCount); batch.InitFilter(Filter, FilteredBatch); + owner.GetCounters().GetAssembleFilterCount()->Add(1); if (!FilteredBatch || FilteredBatch->num_rows() == 0) { - owner.GetCounters().GetEmptyFilterPortionsCount()->Add(1); - owner.GetCounters().GetEmptyFilterPortionsBytes()->Add(batch.GetFetchedBytes()); + owner.GetCounters().GetEmptyFilterCount()->Add(1); + owner.GetCounters().GetEmptyFilterFetchedBytes()->Add(batch.GetFetchedBytes()); + owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns())); batch.InitBatch(FilteredBatch); } else { owner.GetCounters().GetFilteredRowsCount()->Add(FilteredBatch->num_rows()); - owner.GetCounters().GetUsefulFilterBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount); if (batch.AskedColumnsAlready(owner.GetPostFilterColumns())) { - owner.GetCounters().GetFilterOnlyPortionsCount()->Add(1); - owner.GetCounters().GetFilterOnlyPortionsBytes()->Add(batch.GetFetchedBytes()); + owner.GetCounters().GetFilterOnlyCount()->Add(1); + owner.GetCounters().GetFilterOnlyFetchedBytes()->Add(batch.GetFetchedBytes()); + owner.GetCounters().GetFilterOnlyUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount); + owner.GetCounters().GetSkippedBytes()->Add(batch.GetFetchBytes(&owner.GetPostFilterColumns())); + batch.InitBatch(FilteredBatch); } else { - owner.GetCounters().GetPostFilterPortionsCount()->Add(1); + owner.GetCounters().GetTwoPhasesFilterFetchedBytes()->Add(batch.GetFetchedBytes()); + owner.GetCounters().GetTwoPhasesFilterUsefulBytes()->Add(batch.GetFetchedBytes() * FilteredBatch->num_rows() / OriginalCount); + batch.Reset(&owner.GetPostFilterColumns()); - owner.GetCounters().GetUsefulPostFilterBytes()->Add(batch.GetWaitingBytes() * FilteredBatch->num_rows() / OriginalCount); + + owner.GetCounters().GetTwoPhasesCount()->Add(1); + owner.GetCounters().GetTwoPhasesPostFilterFetchedBytes()->Add(batch.GetWaitingBytes()); + owner.GetCounters().GetTwoPhasesPostFilterUsefulBytes()->Add(batch.GetWaitingBytes() * FilteredBatch->num_rows() / OriginalCount); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") ("filtered_count", FilteredBatch->num_rows()) ("blobs_count", batch.GetWaitingBlobs().size()) |