diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-20 12:37:43 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-20 12:37:43 +0300 |
commit | 6d5b731fb288d85199d5d3fe2055a29b06c832ca (patch) | |
tree | b5cc2175d11aa4d676bda9e38165f74817ae7234 | |
parent | 89c20775700cc1ef3f2ea9c9854fa37942ee5fdd (diff) | |
download | ydb-6d5b731fb288d85199d5d3fe2055a29b06c832ca.tar.gz |
additional signals
26 files changed, 267 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 230533e6fb..46fc3fc0b4 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NColumnShard { TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, - NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters) + NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TConcreteScanCounters& scanCounters) : ReadMetadata(readMetadata) , IndexedData(ReadMetadata, false, scanCounters, processor) , DataTasksProcessor(processor) diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 1cd7117daf..26ebae045e 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -36,9 +36,9 @@ private: ui64 ItemsRead = 0; const i64 MaxRowsInBatch = 5000; NColumnShard::TDataTasksProcessorContainer DataTasksProcessor; - NColumnShard::TScanCounters ScanCounters; + NColumnShard::TConcreteScanCounters ScanCounters; public: - TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters); + TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TConcreteScanCounters& scanCounters); ~TColumnShardScanIterator(); virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 92a188a5a8..2ae0630759 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -46,7 +46,7 @@ private: constexpr ui64 INIT_BATCH_ROWS = 1000; -constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100 * 1024 * 1024; +constexpr i64 DEFAULT_READ_AHEAD_BYTES = 200 * 1024 * 1024; constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10); constexpr TDuration SCAN_HARD_TIMEOUT_GAP = TDuration::Seconds(5); @@ -87,8 +87,7 @@ public: , ReadMetadataRanges(std::move(readMetadataList)) , ReadMetadataIndex(0) , Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)) - , ScanCountersPool(scanCountersPool) - { + , ScanCountersPool(scanCountersPool) { KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema(); } @@ -138,10 +137,15 @@ private: if (!blobRange.BlobId.IsValid()) { break; } + ScanCountersPool.Aggregations->AddFlightReadInfo(blobRange.Size); ++InFlightReads; InFlightReadBytes += blobRange.Size; ranges[blobRange.BlobId].emplace_back(blobRange); } + Y_UNUSED(MaxReadAheadBytes); + if (InFlightReadBytes >= MaxReadAheadBytes) { + ScanCountersPool.OnReadingOverloaded(); + } if (!ranges.size()) { return true; } @@ -210,6 +214,7 @@ private: auto& event = *ev->Get(); const auto& blobRange = event.BlobRange; + ScanCountersPool.Aggregations->RemoveFlightReadInfo(blobRange.Size); Stats.BlobReceived(blobRange, event.FromCache, event.ConstructTime); if (event.Status != NKikimrProto::EReplyStatus::OK) { @@ -587,7 +592,7 @@ private: const TSerializedTableRange TableRange; const TSmallVec<bool> SkipNullKeys; const TInstant Deadline; - TScanCounters ScanCountersPool; + TConcreteScanCounters ScanCountersPool; TActorId TimeoutActorId; TMaybe<TString> AbortReason; diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index ece547bca1..b9772b1633 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -39,7 +39,7 @@ IActor* CreateReadActor(ui64 tabletId, NOlap::TReadMetadata::TConstPtr readMetadata, const TInstant& deadline, const TActorId& columnShardActorId, - ui64 requestCookie, const TScanCounters& counters); + ui64 requestCookie, const TConcreteScanCounters& counters); IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); @@ -121,6 +121,7 @@ class TColumnShard friend class TTxNotifyTxCompletion; friend class TTxPlanStep; friend class TTxWrite; + friend class TTxCleanInserted; friend class TTxReadBase; friend class TTxRead; friend class TTxScan; diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp index ea431b5943..c451356cfe 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.cpp +++ b/ydb/core/tx/columnshard/counters/common/agent.cpp @@ -4,28 +4,33 @@ namespace NKikimr::NColumnShard { TValueAggregationAgent::TValueAggregationAgent(const TString& signalName, const TCommonCountersOwner& signalsOwner) - : ValueSignalSum(signalsOwner.GetValue("SUM/" + signalName)) - , ValueSignalMin(signalsOwner.GetValue("MIN/" + signalName)) - , ValueSignalMax(signalsOwner.GetValue("MAX/" + signalName)) + : ValueSignalSum(signalsOwner.GetAggregationValue("SUM/" + signalName)) + , ValueSignalMin(signalsOwner.GetAggregationValue("MIN/" + signalName)) + , ValueSignalMax(signalsOwner.GetAggregationValue("MAX/" + signalName)) { } -bool TValueAggregationAgent::CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const { +bool TValueAggregationAgent::CalcAggregationsAndClean(i64& sum, i64& minValue, i64& maxValue) const { if (Values.empty()) { return false; } sum = 0; minValue = Values.front()->GetValue(); maxValue = Values.front()->GetValue(); - for (auto&& i : Values) { - const i64 v = i->GetValue(); - sum += v; - if (minValue > v) { - minValue = v; - } - if (maxValue < v) { - maxValue = v; + for (auto it = Values.begin(); it != Values.end();) { + if (it->use_count() == 1) { + it = Values.erase(it); + } else { + const i64 v = (*it)->GetValue(); + sum += v; + if (minValue > v) { + minValue = v; + } + if (maxValue < v) { + maxValue = v; + } + ++it; } } return true; @@ -35,7 +40,7 @@ std::optional<NKikimr::NColumnShard::TSignalAggregations> TValueAggregationAgent i64 sum; i64 min; i64 max; - if (!CalcAggregations(sum, min, max)) { + if (!CalcAggregationsAndClean(sum, min, max)) { return {}; } return TSignalAggregations(sum, min, max); @@ -48,20 +53,16 @@ void TValueAggregationAgent::ResendStatus() const { ValueSignalMin->Set(aggr->Min); ValueSignalMax->Set(aggr->Max); ValueSignalSum->Set(aggr->Sum); + } else { + ValueSignalMin->Set(0); + ValueSignalMax->Set(0); + ValueSignalSum->Set(0); } } -std::shared_ptr<NKikimr::NColumnShard::TValueAggregationClient> TValueAggregationAgent::GetClient(std::shared_ptr<TValueAggregationAgent> selfPtr) { - TGuard<TMutex> g(Mutex); - auto it = Values.emplace(Values.end(), nullptr); - auto result = std::make_shared<TValueAggregationClient>(selfPtr, it); - *it = result.get(); - return result; -} - -void TValueAggregationAgent::UnregisterClient(std::list<TValueAggregationClient*>::iterator it) { +std::shared_ptr<NKikimr::NColumnShard::TValueAggregationClient> TValueAggregationAgent::GetClient() { TGuard<TMutex> g(Mutex); - Values.erase(it); + return *Values.emplace(Values.end(), std::make_shared<TValueAggregationClient>()); } } diff --git a/ydb/core/tx/columnshard/counters/common/agent.h b/ydb/core/tx/columnshard/counters/common/agent.h index 7446b4fdff..cd21058e0d 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.h +++ b/ydb/core/tx/columnshard/counters/common/agent.h @@ -25,10 +25,10 @@ private: ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalSum; ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMin; ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMax; - std::list<TValueAggregationClient*> Values; + mutable std::list<std::shared_ptr<TValueAggregationClient>> Values; TMutex Mutex; - bool CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const; + bool CalcAggregationsAndClean(i64& sum, i64& minValue, i64& maxValue) const; std::optional<TSignalAggregations> GetAggregations() const; public: @@ -36,7 +36,7 @@ public: void ResendStatus() const; void UnregisterClient(std::list<TValueAggregationClient*>::iterator it); - std::shared_ptr<TValueAggregationClient> GetClient(std::shared_ptr<TValueAggregationAgent> selfPtr); + std::shared_ptr<TValueAggregationClient> GetClient(); }; } diff --git a/ydb/core/tx/columnshard/counters/common/client.cpp b/ydb/core/tx/columnshard/counters/common/client.cpp index 76718b96a7..54d3b42d30 100644 --- a/ydb/core/tx/columnshard/counters/common/client.cpp +++ b/ydb/core/tx/columnshard/counters/common/client.cpp @@ -1,17 +1,5 @@ #include "client.h" -#include "agent.h" namespace NKikimr::NColumnShard { -TValueAggregationClient::TValueAggregationClient(std::shared_ptr<TValueAggregationAgent> owner, std::list<TValueAggregationClient*>::iterator it) - : Owner(owner) - , PositionIterator(it) -{ - -} - -TValueAggregationClient::~TValueAggregationClient() { - Owner->UnregisterClient(PositionIterator); -} - } diff --git a/ydb/core/tx/columnshard/counters/common/client.h b/ydb/core/tx/columnshard/counters/common/client.h index f935af1ddf..632aa24817 100644 --- a/ydb/core/tx/columnshard/counters/common/client.h +++ b/ydb/core/tx/columnshard/counters/common/client.h @@ -10,12 +10,14 @@ class TValueAggregationAgent; class TValueAggregationClient: TNonCopyable { private: - std::shared_ptr<TValueAggregationAgent> Owner; - std::list<TValueAggregationClient*>::iterator PositionIterator; YDB_ACCESSOR(i64, Value, 0); public: - TValueAggregationClient(std::shared_ptr<TValueAggregationAgent> owner, std::list<TValueAggregationClient*>::iterator it); - ~TValueAggregationClient(); + void Add(const i64 v) { + Value += v; + } + void Remove(const i64 v) { + Value -= v; + } }; } diff --git a/ydb/core/tx/columnshard/counters/common/owner.cpp b/ydb/core/tx/columnshard/counters/common/owner.cpp index 03b50ecd1c..7e98e08417 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.cpp +++ b/ydb/core/tx/columnshard/counters/common/owner.cpp @@ -6,11 +6,15 @@ namespace NKikimr::NColumnShard { NMonitoring::TDynamicCounters::TCounterPtr TCommonCountersOwner::GetDeriviative(const TString& name) const { - return SubGroup->GetCounter(NormalizeSignalName(ModuleId + "/Deriviative/" + name), true); + return SubGroup->GetCounter(NormalizeSignalName("Deriviative/" + name), true); } NMonitoring::TDynamicCounters::TCounterPtr TCommonCountersOwner::GetValue(const TString& name) const { - return SubGroup->GetCounter(NormalizeSignalName(ModuleId + "/Value/" + name), false); + return SubGroup->GetCounter(NormalizeSignalName("Value/" + name), false); +} + +NMonitoring::TDynamicCounters::TCounterPtr TCommonCountersOwner::GetAggregationValue(const TString& name) const { + return SubGroup->GetCounter(NormalizeSignalName("Aggregation/" + name), false); } TString TCommonCountersOwner::NormalizeSignalName(const TString& name) const { @@ -21,16 +25,16 @@ TCommonCountersOwner::TCommonCountersOwner(const TString& module, TIntrusivePtr< : ModuleId(module) { if (baseSignals) { - SubGroup = baseSignals->GetSubgroup("common_module", module); + SubGroup = baseSignals->GetSubgroup("module_id", module); } else if (NActors::TlsActivationContext) { - SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard")->GetSubgroup("common_module", module); + SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard")->GetSubgroup("module_id", module); } else { SubGroup = new NMonitoring::TDynamicCounters(); } } NMonitoring::THistogramPtr TCommonCountersOwner::GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const { - return SubGroup->GetHistogram(NormalizeSignalName(ModuleId + "/Histogram/" + name), std::move(hCollector)); + return SubGroup->GetHistogram(NormalizeSignalName("Histogram/" + name), std::move(hCollector)); } std::shared_ptr<TValueAggregationAgent> TCommonCountersOwner::GetValueAutoAggregations(const TString& name) const { @@ -39,7 +43,7 @@ std::shared_ptr<TValueAggregationAgent> TCommonCountersOwner::GetValueAutoAggreg std::shared_ptr<TValueAggregationClient> TCommonCountersOwner::GetValueAutoAggregationsClient(const TString& name) const { std::shared_ptr<TValueAggregationAgent> agent = NPrivate::TAggregationsController::GetAggregation(name, *this); - return agent->GetClient(agent); + return agent->GetClient(); } } diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h index 85921b960e..a39b4b5005 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.h +++ b/ydb/core/tx/columnshard/counters/common/owner.h @@ -17,6 +17,11 @@ protected: std::shared_ptr<TValueAggregationAgent> GetValueAutoAggregations(const TString& name) const; std::shared_ptr<TValueAggregationClient> GetValueAutoAggregationsClient(const TString& name) const; public: + const TString& GetModuleId() const { + return ModuleId; + } + + NMonitoring::TDynamicCounters::TCounterPtr GetAggregationValue(const TString& name) const; NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const; NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const; NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const; diff --git a/ydb/core/tx/columnshard/counters/common/private.cpp b/ydb/core/tx/columnshard/counters/common/private.cpp index 4fc7cc4062..00010b6b91 100644 --- a/ydb/core/tx/columnshard/counters/common/private.cpp +++ b/ydb/core/tx/columnshard/counters/common/private.cpp @@ -2,6 +2,7 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/hfunc.h> +#include <util/folder/path.h> namespace NKikimr::NColumnShard::NPrivate { namespace { @@ -44,9 +45,10 @@ private: public: std::shared_ptr<TValueAggregationAgent> GetAggregation(const TString& signalName, const TCommonCountersOwner& signalsOwner) { TGuard<TMutex> g(Mutex); - auto it = Agents.find(signalName); + const TString agentId = TFsPath(signalsOwner.GetModuleId() + "/" + signalName).Fix().GetPath(); + auto it = Agents.find(agentId); if (it == Agents.end()) { - it = Agents.emplace(signalName, std::make_shared<TValueAggregationAgent>(signalName, signalsOwner)).first; + it = Agents.emplace(agentId, std::make_shared<TValueAggregationAgent>(signalName, signalsOwner)).first; if (NActors::TlsActivationContext) { NActors::TActivationContext::Register(new TRegularSignalBuilderActor(it->second)); } diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h index 6bed61e0f1..f689e9a460 100644 --- a/ydb/core/tx/columnshard/counters/engine_logs.h +++ b/ydb/core/tx/columnshard/counters/engine_logs.h @@ -57,7 +57,7 @@ public: } TDataClassCounters RegisterClient() const { - return TDataClassCounters(PortionsSize->GetClient(PortionsSize), PortionsCount->GetClient(PortionsCount)); + return TDataClassCounters(PortionsSize->GetClient(), PortionsCount->GetClient()); } }; diff --git a/ydb/core/tx/columnshard/counters/insert_table.h b/ydb/core/tx/columnshard/counters/insert_table.h index 92a31cb9ea..52996e1901 100644 --- a/ydb/core/tx/columnshard/counters/insert_table.h +++ b/ydb/core/tx/columnshard/counters/insert_table.h @@ -36,7 +36,7 @@ public: } TPathIdClientCounters GetClient() const { - return TPathIdClientCounters(PathIdBytes->GetClient(PathIdBytes), PathIdChunks->GetClient(PathIdChunks)); + return TPathIdClientCounters(PathIdBytes->GetClient(), PathIdChunks->GetClient()); } }; diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index 4274e871bd..43331d18b3 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -4,31 +4,37 @@ namespace NKikimr::NColumnShard { -TScanCounters::TScanCounters(const TString& module) { - ::NMonitoring::TDynamicCounterPtr subGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard"); - - PortionBytes = subGroup->GetCounter(module + "/PortionBytes", true); - FilterBytes = subGroup->GetCounter(module + "/FilterBytes", true); - PostFilterBytes = subGroup->GetCounter(module + "/PostFilterBytes", true); - - AssembleFilterCount = subGroup->GetCounter(module + "/AssembleFilterCount", true); - - FilterOnlyCount = subGroup->GetCounter(module + "/FilterOnlyCount", true); - FilterOnlyFetchedBytes = subGroup->GetCounter(module + "/FilterOnlyFetchedBytes", true); - FilterOnlyUsefulBytes = subGroup->GetCounter(module + "/FilterOnlyUsefulBytes", true); - - EmptyFilterCount = subGroup->GetCounter(module + "/EmptyFilterCount", true); - EmptyFilterFetchedBytes = subGroup->GetCounter(module + "/EmptyFilterFetchedBytes", true); - - OriginalRowsCount = subGroup->GetCounter(module + "/OriginalRowsCount", true); - FilteredRowsCount = subGroup->GetCounter(module + "/FilteredRowsCount", true); - SkippedBytes = subGroup->GetCounter(module + "/SkippedBytes", true); +TScanCounters::TScanCounters(const TString& module) + : TBase(module) + , ProcessingOverload(TBase::GetDeriviative("ProcessingOverload")) + , ReadingOverload(TBase::GetDeriviative("ReadingOverload")) + , PortionBytes(TBase::GetDeriviative("PortionBytes")) + , FilterBytes(TBase::GetDeriviative("FilterBytes")) + , PostFilterBytes(TBase::GetDeriviative("PostFilterBytes")) + + , AssembleFilterCount(TBase::GetDeriviative("AssembleFilterCount")) + + , FilterOnlyCount(TBase::GetDeriviative("FilterOnlyCount")) + , FilterOnlyFetchedBytes(TBase::GetDeriviative("FilterOnlyFetchedBytes")) + , FilterOnlyUsefulBytes(TBase::GetDeriviative("FilterOnlyUsefulBytes")) + + , EmptyFilterCount(TBase::GetDeriviative("EmptyFilterCount")) + , EmptyFilterFetchedBytes(TBase::GetDeriviative("EmptyFilterFetchedBytes")) + + , OriginalRowsCount(TBase::GetDeriviative("OriginalRowsCount")) + , FilteredRowsCount(TBase::GetDeriviative("FilteredRowsCount")) + , SkippedBytes(TBase::GetDeriviative("SkippedBytes")) + + , TwoPhasesCount(TBase::GetDeriviative("TwoPhasesCount")) + , TwoPhasesFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesFilterFetchedBytes")) + , TwoPhasesFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesFilterUsefulBytes")) + , TwoPhasesPostFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesPostFilterFetchedBytes")) + , TwoPhasesPostFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesPostFilterUsefulBytes")) +{ +} - TwoPhasesCount = subGroup->GetCounter(module + "/TwoPhasesCount", true); - TwoPhasesFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesFilterFetchedBytes", true); - TwoPhasesFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesFilterUsefulBytes", true); - TwoPhasesPostFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterFetchedBytes", true); - TwoPhasesPostFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterUsefulBytes", true); +std::shared_ptr<NKikimr::NColumnShard::TScanAggregations> TScanCounters::BuildAggregations() { + return std::make_shared<TScanAggregations>(GetModuleId()); } } diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index 1a204a21b9..60f0365cb3 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -1,9 +1,98 @@ #pragma once +#include "common/owner.h" #include <library/cpp/monlib/dynamic_counters/counters.h> namespace NKikimr::NColumnShard { -struct TScanCounters { +class TScanDataClassAggregation: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightBytes; + NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightCount; + std::shared_ptr<TValueAggregationClient> InFlightBytes; + std::shared_ptr<TValueAggregationClient> InFlightCount; +public: + TScanDataClassAggregation(const TString& moduleId, const TString& signalId) + : TBase(moduleId) + { + DeriviativeInFlightCount = TBase::GetDeriviative(signalId + "/Count"); + DeriviativeInFlightBytes = TBase::GetDeriviative(signalId + "/Bytes"); + InFlightCount = TBase::GetValueAutoAggregationsClient(signalId + "/Count"); + InFlightBytes = TBase::GetValueAutoAggregationsClient(signalId + "/Bytes"); + } + + void AddFullInfo(const ui64 size) { + DeriviativeInFlightCount->Add(1); + DeriviativeInFlightBytes->Add(size); + InFlightCount->Add(1); + InFlightBytes->Add(size); + } + + void RemoveFullInfo(const ui64 size) { + InFlightCount->Remove(1); + InFlightBytes->Remove(size); + } + + void AddCount() { + DeriviativeInFlightCount->Add(1); + InFlightCount->Add(1); + } + + void AddBytes(const ui64 size) { + DeriviativeInFlightBytes->Add(size); + InFlightBytes->Add(size); + } +}; + +class TScanAggregations { +private: + using TBase = TCommonCountersOwner; + TScanDataClassAggregation ReadBlobs; + TScanDataClassAggregation GranulesProcessing; + TScanDataClassAggregation GranulesReady; +public: + TScanAggregations(const TString& moduleId) + : ReadBlobs(moduleId, "InFlight/Blobs/Read") + , GranulesProcessing(moduleId, "InFlight/Granules/Processing") + , GranulesReady(moduleId, "InFlight/Granules/Ready") + { + } + + void AddFlightReadInfo(const ui64 size) { + ReadBlobs.AddFullInfo(size); + } + + void RemoveFlightReadInfo(const ui64 size) { + ReadBlobs.RemoveFullInfo(size); + } + + void AddGranuleProcessing() { + GranulesProcessing.AddCount(); + } + + void AddGranuleProcessingBytes(const ui64 size) { + GranulesProcessing.AddBytes(size); + } + + void RemoveGranuleProcessingInfo(const ui64 size) { + GranulesProcessing.RemoveFullInfo(size); + } + + void AddGranuleReady(const ui64 size) { + GranulesReady.AddFullInfo(size); + } + + void RemoveGranuleReady(const ui64 size) { + GranulesReady.RemoveFullInfo(size); + } +}; + +class TScanCounters: public TCommonCountersOwner { +private: + using TBase = TCommonCountersOwner; + NMonitoring::TDynamicCounters::TCounterPtr ProcessingOverload; + NMonitoring::TDynamicCounters::TCounterPtr ReadingOverload; +public: NMonitoring::TDynamicCounters::TCounterPtr PortionBytes; NMonitoring::TDynamicCounters::TCounterPtr FilterBytes; NMonitoring::TDynamicCounters::TCounterPtr PostFilterBytes; @@ -28,6 +117,29 @@ struct TScanCounters { NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes; TScanCounters(const TString& module = "Scan"); + + void OnProcessingOverloaded() { + ProcessingOverload->Add(1); + } + void OnReadingOverloaded() { + ReadingOverload->Add(1); + } + + std::shared_ptr<TScanAggregations> BuildAggregations(); +}; + +class TConcreteScanCounters: public TScanCounters { +private: + using TBase = TScanCounters; +public: + std::shared_ptr<TScanAggregations> Aggregations; + + TConcreteScanCounters(const TScanCounters& counters) + : TBase(counters) + , Aggregations(TBase::BuildAggregations()) + { + + } }; } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index e4f30779a3..49d7454ce7 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -444,7 +444,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco } TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, - const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor) + const bool internalRead, const NColumnShard::TConcreteScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor) : Counters(counters) , TasksProcessor(tasksProcessor) , ReadMetadata(readMetadata) diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 4d2f03e39b..44bc085053 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -21,7 +21,7 @@ class TIndexedReadData { private: std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; - NColumnShard::TScanCounters Counters; + NColumnShard::TConcreteScanCounters Counters; NColumnShard::TDataTasksProcessorContainer TasksProcessor; TFetchBlobsQueue FetchBlobsQueue; NOlap::TReadMetadata::TConstPtr ReadMetadata; @@ -36,9 +36,9 @@ private: public: TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, - const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor); + const bool internalRead, const NColumnShard::TConcreteScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor); - const NColumnShard::TScanCounters& GetCounters() const noexcept { + const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { return Counters; } @@ -97,9 +97,13 @@ public: TBlobRange NextBlob() { Y_VERIFY(GranulesContext); auto* f = FetchBlobsQueue.front(); - if (f && GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { + if (!f) { + return TBlobRange(); + } + if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { return FetchBlobsQueue.pop_front(); } else { + Counters.OnProcessingOverloaded(); return TBlobRange(); } } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 6b1aa735c1..a93ecf5e35 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -8,6 +8,8 @@ namespace NKikimr::NOlap::NIndexedReader { TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading) : ReadMetadata(readMetadata) , InternalReading(internalReading) + , Processing(owner.GetCounters()) + , Result(owner.GetCounters()) , Owner(owner) , Counters(owner.GetCounters()) { diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 18c13f7e10..b599068bc8 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -25,7 +25,7 @@ private: std::set<ui32> FilterStageColumns; std::set<ui32> UsedColumns; IOrderPolicy::TPtr SortingPolicy; - NColumnShard::TScanCounters Counters; + NColumnShard::TConcreteScanCounters Counters; bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const; static constexpr ui32 GranulesCountProcessingLimit = 16; diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp index 7bdbd3033e..552ec7c9b0 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp @@ -6,6 +6,9 @@ TGranule::TPtr TResultController::ExtractFirst() { TGranule::TPtr result; if (GranulesToOut.size()) { result = GranulesToOut.begin()->second; + Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize()); + BlobsSize -= result->GetBlobsDataSize(); + Y_VERIFY(BlobsSize >= 0); GranulesToOut.erase(GranulesToOut.begin()); } return result; @@ -15,6 +18,7 @@ void TResultController::AddResult(TGranule::TPtr granule) { Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second); Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second); BlobsSize += granule->GetBlobsDataSize(); + Counters.Aggregations->AddGranuleReady(granule->GetBlobsDataSize()); } TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) { @@ -24,6 +28,7 @@ TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) { } TGranule::TPtr result = it->second; GranulesToOut.erase(it); + Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize()); BlobsSize -= result->GetBlobsDataSize(); Y_VERIFY(BlobsSize >= 0); return result; diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h index ceb8561646..f9ff56172c 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h +++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/core/tx/columnshard/engines/reader/granule.h> +#include <ydb/core/tx/columnshard/counters/scan.h> namespace NKikimr::NOlap::NIndexedReader { @@ -8,7 +9,14 @@ protected: THashMap<ui64, TGranule::TPtr> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; i64 BlobsSize = 0; + const NColumnShard::TConcreteScanCounters Counters; public: + TResultController(const NColumnShard::TConcreteScanCounters& counters) + : Counters(counters) + { + + } + void Clear() { GranulesToOut.clear(); BlobsSize = 0; diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp index 5ca25b37a2..d843ae7301 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp @@ -42,6 +42,7 @@ TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId) BlobsSize -= result->GetBlobsDataSize(); Y_VERIFY(BlobsSize >= 0); GranulesWaiting.erase(it); + Counters.Aggregations->RemoveGranuleProcessingInfo(result->GetBlobsDataSize()); return result; } @@ -61,7 +62,21 @@ TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) { TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) { Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second); + Counters.Aggregations->AddGranuleProcessing(); return g; } +void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) { + Counters.Aggregations->AddGranuleProcessingBytes(range.Size); + GranulesInProcessing.emplace(granuleId); + BlobsSize += range.Size; +} + +void TProcessingController::Abort() { + GranulesWaiting.clear(); + GranulesInProcessing.clear(); + Counters.Aggregations->RemoveGranuleProcessingInfo(BlobsSize); + BlobsSize = 0; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h index d6c9c5f213..4514010abc 100644 --- a/ydb/core/tx/columnshard/engines/reader/processing_context.h +++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h @@ -1,5 +1,6 @@ #pragma once #include "granule.h" +#include <ydb/core/tx/columnshard/counters/scan.h> namespace NKikimr::NOlap::NIndexedReader { @@ -9,7 +10,13 @@ private: std::set<ui64> GranulesInProcessing; i64 BlobsSize = 0; bool NotIndexedBatchesInitialized = false; + const NColumnShard::TConcreteScanCounters Counters; public: + TProcessingController(const NColumnShard::TConcreteScanCounters& counters) + : Counters(counters) + { + } + void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches); NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address); @@ -18,11 +25,7 @@ public: return GranulesInProcessing.contains(granuleId); } - void Abort() { - GranulesWaiting.clear(); - GranulesInProcessing.clear(); - BlobsSize = 0; - } + void Abort(); ui64 GetBlobsSize() const { return BlobsSize; @@ -32,10 +35,7 @@ public: return GranulesInProcessing.size(); } - void StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) { - GranulesInProcessing.emplace(granuleId); - BlobsSize += range.Size; - } + void StartBlobProcessing(const ui64 granuleId, const TBlobRange& range); TGranule::TPtr ExtractReadyVerified(const ui64 granuleId); diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 4a23fbbdfd..994e5056d3 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -34,7 +34,7 @@ std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const T return BatchCache.Get(blobId); } -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const { +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const { return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); } @@ -176,7 +176,7 @@ std::vector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYq return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns); } -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const { +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TConcreteScanCounters& /*scanCounters*/) const { return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this()); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 8d6f843444..6e85b5c871 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -116,7 +116,7 @@ public: virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0; virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0; - virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0; + virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const = 0; // TODO: can this only be done for base class? friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) { @@ -260,7 +260,7 @@ public: return SelectInfo->Stats().Blobs; } - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const override; void Dump(IOutputStream& out) const override { out << "columns: " << GetSchemaColumnsCount() @@ -301,7 +301,7 @@ public: std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override; - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const override; }; } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 04e0761279..3b379bb2ad 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -29,7 +29,7 @@ public: NOlap::TReadMetadata::TConstPtr readMetadata, const TInstant& deadline, const TActorId& columnShardActorId, - ui64 requestCookie, const TScanCounters& counters) + ui64 requestCookie, const TConcreteScanCounters& counters) : TabletId(tabletId) , DstActor(dstActor) , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) @@ -294,7 +294,7 @@ IActor* CreateReadActor(ui64 tabletId, NOlap::TReadMetadata::TConstPtr readMetadata, const TInstant& deadline, const TActorId& columnShardActorId, - ui64 requestCookie, const TScanCounters& counters) + ui64 requestCookie, const TConcreteScanCounters& counters) { return new TReadActor(tabletId, dstActor, std::move(event), readMetadata, deadline, columnShardActorId, requestCookie, counters); |