aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-20 12:37:43 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-20 12:37:43 +0300
commit6d5b731fb288d85199d5d3fe2055a29b06c832ca (patch)
treeb5cc2175d11aa4d676bda9e38165f74817ae7234
parent89c20775700cc1ef3f2ea9c9854fa37942ee5fdd (diff)
downloadydb-6d5b731fb288d85199d5d3fe2055a29b06c832ca.tar.gz
additional signals
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp13
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h3
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.cpp47
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.h6
-rw-r--r--ydb/core/tx/columnshard/counters/common/client.cpp12
-rw-r--r--ydb/core/tx/columnshard/counters/common/client.h10
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.cpp16
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.h5
-rw-r--r--ydb/core/tx/columnshard/counters/common/private.cpp6
-rw-r--r--ydb/core/tx/columnshard/counters/engine_logs.h2
-rw-r--r--ydb/core/tx/columnshard/counters/insert_table.h2
-rw-r--r--ydb/core/tx/columnshard/counters/scan.cpp54
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h114
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/result.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/processing_context.h18
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h6
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp4
26 files changed, 267 insertions, 107 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 230533e6fb..46fc3fc0b4 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -5,7 +5,7 @@
namespace NKikimr::NColumnShard {
TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata,
- NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters)
+ NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TConcreteScanCounters& scanCounters)
: ReadMetadata(readMetadata)
, IndexedData(ReadMetadata, false, scanCounters, processor)
, DataTasksProcessor(processor)
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 1cd7117daf..26ebae045e 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -36,9 +36,9 @@ private:
ui64 ItemsRead = 0;
const i64 MaxRowsInBatch = 5000;
NColumnShard::TDataTasksProcessorContainer DataTasksProcessor;
- NColumnShard::TScanCounters ScanCounters;
+ NColumnShard::TConcreteScanCounters ScanCounters;
public:
- TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters);
+ TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TConcreteScanCounters& scanCounters);
~TColumnShardScanIterator();
virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override;
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 92a188a5a8..2ae0630759 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -46,7 +46,7 @@ private:
constexpr ui64 INIT_BATCH_ROWS = 1000;
-constexpr i64 DEFAULT_READ_AHEAD_BYTES = 100 * 1024 * 1024;
+constexpr i64 DEFAULT_READ_AHEAD_BYTES = 200 * 1024 * 1024;
constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10);
constexpr TDuration SCAN_HARD_TIMEOUT_GAP = TDuration::Seconds(5);
@@ -87,8 +87,7 @@ public:
, ReadMetadataRanges(std::move(readMetadataList))
, ReadMetadataIndex(0)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
- , ScanCountersPool(scanCountersPool)
- {
+ , ScanCountersPool(scanCountersPool) {
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
}
@@ -138,10 +137,15 @@ private:
if (!blobRange.BlobId.IsValid()) {
break;
}
+ ScanCountersPool.Aggregations->AddFlightReadInfo(blobRange.Size);
++InFlightReads;
InFlightReadBytes += blobRange.Size;
ranges[blobRange.BlobId].emplace_back(blobRange);
}
+ Y_UNUSED(MaxReadAheadBytes);
+ if (InFlightReadBytes >= MaxReadAheadBytes) {
+ ScanCountersPool.OnReadingOverloaded();
+ }
if (!ranges.size()) {
return true;
}
@@ -210,6 +214,7 @@ private:
auto& event = *ev->Get();
const auto& blobRange = event.BlobRange;
+ ScanCountersPool.Aggregations->RemoveFlightReadInfo(blobRange.Size);
Stats.BlobReceived(blobRange, event.FromCache, event.ConstructTime);
if (event.Status != NKikimrProto::EReplyStatus::OK) {
@@ -587,7 +592,7 @@ private:
const TSerializedTableRange TableRange;
const TSmallVec<bool> SkipNullKeys;
const TInstant Deadline;
- TScanCounters ScanCountersPool;
+ TConcreteScanCounters ScanCountersPool;
TActorId TimeoutActorId;
TMaybe<TString> AbortReason;
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index ece547bca1..b9772b1633 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -39,7 +39,7 @@ IActor* CreateReadActor(ui64 tabletId,
NOlap::TReadMetadata::TConstPtr readMetadata,
const TInstant& deadline,
const TActorId& columnShardActorId,
- ui64 requestCookie, const TScanCounters& counters);
+ ui64 requestCookie, const TConcreteScanCounters& counters);
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
@@ -121,6 +121,7 @@ class TColumnShard
friend class TTxNotifyTxCompletion;
friend class TTxPlanStep;
friend class TTxWrite;
+ friend class TTxCleanInserted;
friend class TTxReadBase;
friend class TTxRead;
friend class TTxScan;
diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp
index ea431b5943..c451356cfe 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.cpp
+++ b/ydb/core/tx/columnshard/counters/common/agent.cpp
@@ -4,28 +4,33 @@
namespace NKikimr::NColumnShard {
TValueAggregationAgent::TValueAggregationAgent(const TString& signalName, const TCommonCountersOwner& signalsOwner)
- : ValueSignalSum(signalsOwner.GetValue("SUM/" + signalName))
- , ValueSignalMin(signalsOwner.GetValue("MIN/" + signalName))
- , ValueSignalMax(signalsOwner.GetValue("MAX/" + signalName))
+ : ValueSignalSum(signalsOwner.GetAggregationValue("SUM/" + signalName))
+ , ValueSignalMin(signalsOwner.GetAggregationValue("MIN/" + signalName))
+ , ValueSignalMax(signalsOwner.GetAggregationValue("MAX/" + signalName))
{
}
-bool TValueAggregationAgent::CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const {
+bool TValueAggregationAgent::CalcAggregationsAndClean(i64& sum, i64& minValue, i64& maxValue) const {
if (Values.empty()) {
return false;
}
sum = 0;
minValue = Values.front()->GetValue();
maxValue = Values.front()->GetValue();
- for (auto&& i : Values) {
- const i64 v = i->GetValue();
- sum += v;
- if (minValue > v) {
- minValue = v;
- }
- if (maxValue < v) {
- maxValue = v;
+ for (auto it = Values.begin(); it != Values.end();) {
+ if (it->use_count() == 1) {
+ it = Values.erase(it);
+ } else {
+ const i64 v = (*it)->GetValue();
+ sum += v;
+ if (minValue > v) {
+ minValue = v;
+ }
+ if (maxValue < v) {
+ maxValue = v;
+ }
+ ++it;
}
}
return true;
@@ -35,7 +40,7 @@ std::optional<NKikimr::NColumnShard::TSignalAggregations> TValueAggregationAgent
i64 sum;
i64 min;
i64 max;
- if (!CalcAggregations(sum, min, max)) {
+ if (!CalcAggregationsAndClean(sum, min, max)) {
return {};
}
return TSignalAggregations(sum, min, max);
@@ -48,20 +53,16 @@ void TValueAggregationAgent::ResendStatus() const {
ValueSignalMin->Set(aggr->Min);
ValueSignalMax->Set(aggr->Max);
ValueSignalSum->Set(aggr->Sum);
+ } else {
+ ValueSignalMin->Set(0);
+ ValueSignalMax->Set(0);
+ ValueSignalSum->Set(0);
}
}
-std::shared_ptr<NKikimr::NColumnShard::TValueAggregationClient> TValueAggregationAgent::GetClient(std::shared_ptr<TValueAggregationAgent> selfPtr) {
- TGuard<TMutex> g(Mutex);
- auto it = Values.emplace(Values.end(), nullptr);
- auto result = std::make_shared<TValueAggregationClient>(selfPtr, it);
- *it = result.get();
- return result;
-}
-
-void TValueAggregationAgent::UnregisterClient(std::list<TValueAggregationClient*>::iterator it) {
+std::shared_ptr<NKikimr::NColumnShard::TValueAggregationClient> TValueAggregationAgent::GetClient() {
TGuard<TMutex> g(Mutex);
- Values.erase(it);
+ return *Values.emplace(Values.end(), std::make_shared<TValueAggregationClient>());
}
}
diff --git a/ydb/core/tx/columnshard/counters/common/agent.h b/ydb/core/tx/columnshard/counters/common/agent.h
index 7446b4fdff..cd21058e0d 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.h
+++ b/ydb/core/tx/columnshard/counters/common/agent.h
@@ -25,10 +25,10 @@ private:
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalSum;
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMin;
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMax;
- std::list<TValueAggregationClient*> Values;
+ mutable std::list<std::shared_ptr<TValueAggregationClient>> Values;
TMutex Mutex;
- bool CalcAggregations(i64& sum, i64& minValue, i64& maxValue) const;
+ bool CalcAggregationsAndClean(i64& sum, i64& minValue, i64& maxValue) const;
std::optional<TSignalAggregations> GetAggregations() const;
public:
@@ -36,7 +36,7 @@ public:
void ResendStatus() const;
void UnregisterClient(std::list<TValueAggregationClient*>::iterator it);
- std::shared_ptr<TValueAggregationClient> GetClient(std::shared_ptr<TValueAggregationAgent> selfPtr);
+ std::shared_ptr<TValueAggregationClient> GetClient();
};
}
diff --git a/ydb/core/tx/columnshard/counters/common/client.cpp b/ydb/core/tx/columnshard/counters/common/client.cpp
index 76718b96a7..54d3b42d30 100644
--- a/ydb/core/tx/columnshard/counters/common/client.cpp
+++ b/ydb/core/tx/columnshard/counters/common/client.cpp
@@ -1,17 +1,5 @@
#include "client.h"
-#include "agent.h"
namespace NKikimr::NColumnShard {
-TValueAggregationClient::TValueAggregationClient(std::shared_ptr<TValueAggregationAgent> owner, std::list<TValueAggregationClient*>::iterator it)
- : Owner(owner)
- , PositionIterator(it)
-{
-
-}
-
-TValueAggregationClient::~TValueAggregationClient() {
- Owner->UnregisterClient(PositionIterator);
-}
-
}
diff --git a/ydb/core/tx/columnshard/counters/common/client.h b/ydb/core/tx/columnshard/counters/common/client.h
index f935af1ddf..632aa24817 100644
--- a/ydb/core/tx/columnshard/counters/common/client.h
+++ b/ydb/core/tx/columnshard/counters/common/client.h
@@ -10,12 +10,14 @@ class TValueAggregationAgent;
class TValueAggregationClient: TNonCopyable {
private:
- std::shared_ptr<TValueAggregationAgent> Owner;
- std::list<TValueAggregationClient*>::iterator PositionIterator;
YDB_ACCESSOR(i64, Value, 0);
public:
- TValueAggregationClient(std::shared_ptr<TValueAggregationAgent> owner, std::list<TValueAggregationClient*>::iterator it);
- ~TValueAggregationClient();
+ void Add(const i64 v) {
+ Value += v;
+ }
+ void Remove(const i64 v) {
+ Value -= v;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/counters/common/owner.cpp b/ydb/core/tx/columnshard/counters/common/owner.cpp
index 03b50ecd1c..7e98e08417 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.cpp
+++ b/ydb/core/tx/columnshard/counters/common/owner.cpp
@@ -6,11 +6,15 @@
namespace NKikimr::NColumnShard {
NMonitoring::TDynamicCounters::TCounterPtr TCommonCountersOwner::GetDeriviative(const TString& name) const {
- return SubGroup->GetCounter(NormalizeSignalName(ModuleId + "/Deriviative/" + name), true);
+ return SubGroup->GetCounter(NormalizeSignalName("Deriviative/" + name), true);
}
NMonitoring::TDynamicCounters::TCounterPtr TCommonCountersOwner::GetValue(const TString& name) const {
- return SubGroup->GetCounter(NormalizeSignalName(ModuleId + "/Value/" + name), false);
+ return SubGroup->GetCounter(NormalizeSignalName("Value/" + name), false);
+}
+
+NMonitoring::TDynamicCounters::TCounterPtr TCommonCountersOwner::GetAggregationValue(const TString& name) const {
+ return SubGroup->GetCounter(NormalizeSignalName("Aggregation/" + name), false);
}
TString TCommonCountersOwner::NormalizeSignalName(const TString& name) const {
@@ -21,16 +25,16 @@ TCommonCountersOwner::TCommonCountersOwner(const TString& module, TIntrusivePtr<
: ModuleId(module)
{
if (baseSignals) {
- SubGroup = baseSignals->GetSubgroup("common_module", module);
+ SubGroup = baseSignals->GetSubgroup("module_id", module);
} else if (NActors::TlsActivationContext) {
- SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard")->GetSubgroup("common_module", module);
+ SubGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard")->GetSubgroup("module_id", module);
} else {
SubGroup = new NMonitoring::TDynamicCounters();
}
}
NMonitoring::THistogramPtr TCommonCountersOwner::GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const {
- return SubGroup->GetHistogram(NormalizeSignalName(ModuleId + "/Histogram/" + name), std::move(hCollector));
+ return SubGroup->GetHistogram(NormalizeSignalName("Histogram/" + name), std::move(hCollector));
}
std::shared_ptr<TValueAggregationAgent> TCommonCountersOwner::GetValueAutoAggregations(const TString& name) const {
@@ -39,7 +43,7 @@ std::shared_ptr<TValueAggregationAgent> TCommonCountersOwner::GetValueAutoAggreg
std::shared_ptr<TValueAggregationClient> TCommonCountersOwner::GetValueAutoAggregationsClient(const TString& name) const {
std::shared_ptr<TValueAggregationAgent> agent = NPrivate::TAggregationsController::GetAggregation(name, *this);
- return agent->GetClient(agent);
+ return agent->GetClient();
}
}
diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h
index 85921b960e..a39b4b5005 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.h
+++ b/ydb/core/tx/columnshard/counters/common/owner.h
@@ -17,6 +17,11 @@ protected:
std::shared_ptr<TValueAggregationAgent> GetValueAutoAggregations(const TString& name) const;
std::shared_ptr<TValueAggregationClient> GetValueAutoAggregationsClient(const TString& name) const;
public:
+ const TString& GetModuleId() const {
+ return ModuleId;
+ }
+
+ NMonitoring::TDynamicCounters::TCounterPtr GetAggregationValue(const TString& name) const;
NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const;
NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const;
NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const;
diff --git a/ydb/core/tx/columnshard/counters/common/private.cpp b/ydb/core/tx/columnshard/counters/common/private.cpp
index 4fc7cc4062..00010b6b91 100644
--- a/ydb/core/tx/columnshard/counters/common/private.cpp
+++ b/ydb/core/tx/columnshard/counters/common/private.cpp
@@ -2,6 +2,7 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <util/folder/path.h>
namespace NKikimr::NColumnShard::NPrivate {
namespace {
@@ -44,9 +45,10 @@ private:
public:
std::shared_ptr<TValueAggregationAgent> GetAggregation(const TString& signalName, const TCommonCountersOwner& signalsOwner) {
TGuard<TMutex> g(Mutex);
- auto it = Agents.find(signalName);
+ const TString agentId = TFsPath(signalsOwner.GetModuleId() + "/" + signalName).Fix().GetPath();
+ auto it = Agents.find(agentId);
if (it == Agents.end()) {
- it = Agents.emplace(signalName, std::make_shared<TValueAggregationAgent>(signalName, signalsOwner)).first;
+ it = Agents.emplace(agentId, std::make_shared<TValueAggregationAgent>(signalName, signalsOwner)).first;
if (NActors::TlsActivationContext) {
NActors::TActivationContext::Register(new TRegularSignalBuilderActor(it->second));
}
diff --git a/ydb/core/tx/columnshard/counters/engine_logs.h b/ydb/core/tx/columnshard/counters/engine_logs.h
index 6bed61e0f1..f689e9a460 100644
--- a/ydb/core/tx/columnshard/counters/engine_logs.h
+++ b/ydb/core/tx/columnshard/counters/engine_logs.h
@@ -57,7 +57,7 @@ public:
}
TDataClassCounters RegisterClient() const {
- return TDataClassCounters(PortionsSize->GetClient(PortionsSize), PortionsCount->GetClient(PortionsCount));
+ return TDataClassCounters(PortionsSize->GetClient(), PortionsCount->GetClient());
}
};
diff --git a/ydb/core/tx/columnshard/counters/insert_table.h b/ydb/core/tx/columnshard/counters/insert_table.h
index 92a31cb9ea..52996e1901 100644
--- a/ydb/core/tx/columnshard/counters/insert_table.h
+++ b/ydb/core/tx/columnshard/counters/insert_table.h
@@ -36,7 +36,7 @@ public:
}
TPathIdClientCounters GetClient() const {
- return TPathIdClientCounters(PathIdBytes->GetClient(PathIdBytes), PathIdChunks->GetClient(PathIdChunks));
+ return TPathIdClientCounters(PathIdBytes->GetClient(), PathIdChunks->GetClient());
}
};
diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp
index 4274e871bd..43331d18b3 100644
--- a/ydb/core/tx/columnshard/counters/scan.cpp
+++ b/ydb/core/tx/columnshard/counters/scan.cpp
@@ -4,31 +4,37 @@
namespace NKikimr::NColumnShard {
-TScanCounters::TScanCounters(const TString& module) {
- ::NMonitoring::TDynamicCounterPtr subGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard");
-
- PortionBytes = subGroup->GetCounter(module + "/PortionBytes", true);
- FilterBytes = subGroup->GetCounter(module + "/FilterBytes", true);
- PostFilterBytes = subGroup->GetCounter(module + "/PostFilterBytes", true);
-
- AssembleFilterCount = subGroup->GetCounter(module + "/AssembleFilterCount", true);
-
- FilterOnlyCount = subGroup->GetCounter(module + "/FilterOnlyCount", true);
- FilterOnlyFetchedBytes = subGroup->GetCounter(module + "/FilterOnlyFetchedBytes", true);
- FilterOnlyUsefulBytes = subGroup->GetCounter(module + "/FilterOnlyUsefulBytes", true);
-
- EmptyFilterCount = subGroup->GetCounter(module + "/EmptyFilterCount", true);
- EmptyFilterFetchedBytes = subGroup->GetCounter(module + "/EmptyFilterFetchedBytes", true);
-
- OriginalRowsCount = subGroup->GetCounter(module + "/OriginalRowsCount", true);
- FilteredRowsCount = subGroup->GetCounter(module + "/FilteredRowsCount", true);
- SkippedBytes = subGroup->GetCounter(module + "/SkippedBytes", true);
+TScanCounters::TScanCounters(const TString& module)
+ : TBase(module)
+ , ProcessingOverload(TBase::GetDeriviative("ProcessingOverload"))
+ , ReadingOverload(TBase::GetDeriviative("ReadingOverload"))
+ , PortionBytes(TBase::GetDeriviative("PortionBytes"))
+ , FilterBytes(TBase::GetDeriviative("FilterBytes"))
+ , PostFilterBytes(TBase::GetDeriviative("PostFilterBytes"))
+
+ , AssembleFilterCount(TBase::GetDeriviative("AssembleFilterCount"))
+
+ , FilterOnlyCount(TBase::GetDeriviative("FilterOnlyCount"))
+ , FilterOnlyFetchedBytes(TBase::GetDeriviative("FilterOnlyFetchedBytes"))
+ , FilterOnlyUsefulBytes(TBase::GetDeriviative("FilterOnlyUsefulBytes"))
+
+ , EmptyFilterCount(TBase::GetDeriviative("EmptyFilterCount"))
+ , EmptyFilterFetchedBytes(TBase::GetDeriviative("EmptyFilterFetchedBytes"))
+
+ , OriginalRowsCount(TBase::GetDeriviative("OriginalRowsCount"))
+ , FilteredRowsCount(TBase::GetDeriviative("FilteredRowsCount"))
+ , SkippedBytes(TBase::GetDeriviative("SkippedBytes"))
+
+ , TwoPhasesCount(TBase::GetDeriviative("TwoPhasesCount"))
+ , TwoPhasesFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesFilterFetchedBytes"))
+ , TwoPhasesFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesFilterUsefulBytes"))
+ , TwoPhasesPostFilterFetchedBytes(TBase::GetDeriviative("TwoPhasesPostFilterFetchedBytes"))
+ , TwoPhasesPostFilterUsefulBytes(TBase::GetDeriviative("TwoPhasesPostFilterUsefulBytes"))
+{
+}
- TwoPhasesCount = subGroup->GetCounter(module + "/TwoPhasesCount", true);
- TwoPhasesFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesFilterFetchedBytes", true);
- TwoPhasesFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesFilterUsefulBytes", true);
- TwoPhasesPostFilterFetchedBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterFetchedBytes", true);
- TwoPhasesPostFilterUsefulBytes = subGroup->GetCounter(module + "/TwoPhasesPostFilterUsefulBytes", true);
+std::shared_ptr<NKikimr::NColumnShard::TScanAggregations> TScanCounters::BuildAggregations() {
+ return std::make_shared<TScanAggregations>(GetModuleId());
}
}
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index 1a204a21b9..60f0365cb3 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -1,9 +1,98 @@
#pragma once
+#include "common/owner.h"
#include <library/cpp/monlib/dynamic_counters/counters.h>
namespace NKikimr::NColumnShard {
-struct TScanCounters {
+class TScanDataClassAggregation: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr DeriviativeInFlightCount;
+ std::shared_ptr<TValueAggregationClient> InFlightBytes;
+ std::shared_ptr<TValueAggregationClient> InFlightCount;
+public:
+ TScanDataClassAggregation(const TString& moduleId, const TString& signalId)
+ : TBase(moduleId)
+ {
+ DeriviativeInFlightCount = TBase::GetDeriviative(signalId + "/Count");
+ DeriviativeInFlightBytes = TBase::GetDeriviative(signalId + "/Bytes");
+ InFlightCount = TBase::GetValueAutoAggregationsClient(signalId + "/Count");
+ InFlightBytes = TBase::GetValueAutoAggregationsClient(signalId + "/Bytes");
+ }
+
+ void AddFullInfo(const ui64 size) {
+ DeriviativeInFlightCount->Add(1);
+ DeriviativeInFlightBytes->Add(size);
+ InFlightCount->Add(1);
+ InFlightBytes->Add(size);
+ }
+
+ void RemoveFullInfo(const ui64 size) {
+ InFlightCount->Remove(1);
+ InFlightBytes->Remove(size);
+ }
+
+ void AddCount() {
+ DeriviativeInFlightCount->Add(1);
+ InFlightCount->Add(1);
+ }
+
+ void AddBytes(const ui64 size) {
+ DeriviativeInFlightBytes->Add(size);
+ InFlightBytes->Add(size);
+ }
+};
+
+class TScanAggregations {
+private:
+ using TBase = TCommonCountersOwner;
+ TScanDataClassAggregation ReadBlobs;
+ TScanDataClassAggregation GranulesProcessing;
+ TScanDataClassAggregation GranulesReady;
+public:
+ TScanAggregations(const TString& moduleId)
+ : ReadBlobs(moduleId, "InFlight/Blobs/Read")
+ , GranulesProcessing(moduleId, "InFlight/Granules/Processing")
+ , GranulesReady(moduleId, "InFlight/Granules/Ready")
+ {
+ }
+
+ void AddFlightReadInfo(const ui64 size) {
+ ReadBlobs.AddFullInfo(size);
+ }
+
+ void RemoveFlightReadInfo(const ui64 size) {
+ ReadBlobs.RemoveFullInfo(size);
+ }
+
+ void AddGranuleProcessing() {
+ GranulesProcessing.AddCount();
+ }
+
+ void AddGranuleProcessingBytes(const ui64 size) {
+ GranulesProcessing.AddBytes(size);
+ }
+
+ void RemoveGranuleProcessingInfo(const ui64 size) {
+ GranulesProcessing.RemoveFullInfo(size);
+ }
+
+ void AddGranuleReady(const ui64 size) {
+ GranulesReady.AddFullInfo(size);
+ }
+
+ void RemoveGranuleReady(const ui64 size) {
+ GranulesReady.RemoveFullInfo(size);
+ }
+};
+
+class TScanCounters: public TCommonCountersOwner {
+private:
+ using TBase = TCommonCountersOwner;
+ NMonitoring::TDynamicCounters::TCounterPtr ProcessingOverload;
+ NMonitoring::TDynamicCounters::TCounterPtr ReadingOverload;
+public:
NMonitoring::TDynamicCounters::TCounterPtr PortionBytes;
NMonitoring::TDynamicCounters::TCounterPtr FilterBytes;
NMonitoring::TDynamicCounters::TCounterPtr PostFilterBytes;
@@ -28,6 +117,29 @@ struct TScanCounters {
NMonitoring::TDynamicCounters::TCounterPtr TwoPhasesPostFilterUsefulBytes;
TScanCounters(const TString& module = "Scan");
+
+ void OnProcessingOverloaded() {
+ ProcessingOverload->Add(1);
+ }
+ void OnReadingOverloaded() {
+ ReadingOverload->Add(1);
+ }
+
+ std::shared_ptr<TScanAggregations> BuildAggregations();
+};
+
+class TConcreteScanCounters: public TScanCounters {
+private:
+ using TBase = TScanCounters;
+public:
+ std::shared_ptr<TScanAggregations> Aggregations;
+
+ TConcreteScanCounters(const TScanCounters& counters)
+ : TBase(counters)
+ , Aggregations(TBase::BuildAggregations())
+ {
+
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index e4f30779a3..49d7454ce7 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -444,7 +444,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
}
TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata,
- const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor)
+ const bool internalRead, const NColumnShard::TConcreteScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor)
: Counters(counters)
, TasksProcessor(tasksProcessor)
, ReadMetadata(readMetadata)
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 4d2f03e39b..44bc085053 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -21,7 +21,7 @@ class TIndexedReadData {
private:
std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext;
- NColumnShard::TScanCounters Counters;
+ NColumnShard::TConcreteScanCounters Counters;
NColumnShard::TDataTasksProcessorContainer TasksProcessor;
TFetchBlobsQueue FetchBlobsQueue;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
@@ -36,9 +36,9 @@ private:
public:
TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata,
- const bool internalRead, const NColumnShard::TScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor);
+ const bool internalRead, const NColumnShard::TConcreteScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor);
- const NColumnShard::TScanCounters& GetCounters() const noexcept {
+ const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
return Counters;
}
@@ -97,9 +97,13 @@ public:
TBlobRange NextBlob() {
Y_VERIFY(GranulesContext);
auto* f = FetchBlobsQueue.front();
- if (f && GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
+ if (!f) {
+ return TBlobRange();
+ }
+ if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) {
return FetchBlobsQueue.pop_front();
} else {
+ Counters.OnProcessingOverloaded();
return TBlobRange();
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
index 6b1aa735c1..a93ecf5e35 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp
@@ -8,6 +8,8 @@ namespace NKikimr::NOlap::NIndexedReader {
TGranulesFillingContext::TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData& owner, const bool internalReading)
: ReadMetadata(readMetadata)
, InternalReading(internalReading)
+ , Processing(owner.GetCounters())
+ , Result(owner.GetCounters())
, Owner(owner)
, Counters(owner.GetCounters())
{
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index 18c13f7e10..b599068bc8 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -25,7 +25,7 @@ private:
std::set<ui32> FilterStageColumns;
std::set<ui32> UsedColumns;
IOrderPolicy::TPtr SortingPolicy;
- NColumnShard::TScanCounters Counters;
+ NColumnShard::TConcreteScanCounters Counters;
bool PredictEmptyAfterFilter(const TPortionInfo& portionInfo) const;
static constexpr ui32 GranulesCountProcessingLimit = 16;
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
index 7bdbd3033e..552ec7c9b0 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.cpp
@@ -6,6 +6,9 @@ TGranule::TPtr TResultController::ExtractFirst() {
TGranule::TPtr result;
if (GranulesToOut.size()) {
result = GranulesToOut.begin()->second;
+ Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize());
+ BlobsSize -= result->GetBlobsDataSize();
+ Y_VERIFY(BlobsSize >= 0);
GranulesToOut.erase(GranulesToOut.begin());
}
return result;
@@ -15,6 +18,7 @@ void TResultController::AddResult(TGranule::TPtr granule) {
Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second);
Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second);
BlobsSize += granule->GetBlobsDataSize();
+ Counters.Aggregations->AddGranuleReady(granule->GetBlobsDataSize());
}
TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) {
@@ -24,6 +28,7 @@ TGranule::TPtr TResultController::ExtractResult(const ui64 granuleId) {
}
TGranule::TPtr result = it->second;
GranulesToOut.erase(it);
+ Counters.Aggregations->RemoveGranuleReady(result->GetBlobsDataSize());
BlobsSize -= result->GetBlobsDataSize();
Y_VERIFY(BlobsSize >= 0);
return result;
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/result.h b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
index ceb8561646..f9ff56172c 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/result.h
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/result.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/reader/granule.h>
+#include <ydb/core/tx/columnshard/counters/scan.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -8,7 +9,14 @@ protected:
THashMap<ui64, TGranule::TPtr> GranulesToOut;
std::set<ui64> ReadyGranulesAccumulator;
i64 BlobsSize = 0;
+ const NColumnShard::TConcreteScanCounters Counters;
public:
+ TResultController(const NColumnShard::TConcreteScanCounters& counters)
+ : Counters(counters)
+ {
+
+ }
+
void Clear() {
GranulesToOut.clear();
BlobsSize = 0;
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
index 5ca25b37a2..d843ae7301 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.cpp
@@ -42,6 +42,7 @@ TGranule::TPtr TProcessingController::ExtractReadyVerified(const ui64 granuleId)
BlobsSize -= result->GetBlobsDataSize();
Y_VERIFY(BlobsSize >= 0);
GranulesWaiting.erase(it);
+ Counters.Aggregations->RemoveGranuleProcessingInfo(result->GetBlobsDataSize());
return result;
}
@@ -61,7 +62,21 @@ TGranule::TPtr TProcessingController::GetGranule(const ui64 granuleId) {
TGranule::TPtr TProcessingController::InsertGranule(TGranule::TPtr g) {
Y_VERIFY(GranulesWaiting.emplace(g->GetGranuleId(), g).second);
+ Counters.Aggregations->AddGranuleProcessing();
return g;
}
+void TProcessingController::StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) {
+ Counters.Aggregations->AddGranuleProcessingBytes(range.Size);
+ GranulesInProcessing.emplace(granuleId);
+ BlobsSize += range.Size;
+}
+
+void TProcessingController::Abort() {
+ GranulesWaiting.clear();
+ GranulesInProcessing.clear();
+ Counters.Aggregations->RemoveGranuleProcessingInfo(BlobsSize);
+ BlobsSize = 0;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/processing_context.h b/ydb/core/tx/columnshard/engines/reader/processing_context.h
index d6c9c5f213..4514010abc 100644
--- a/ydb/core/tx/columnshard/engines/reader/processing_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/processing_context.h
@@ -1,5 +1,6 @@
#pragma once
#include "granule.h"
+#include <ydb/core/tx/columnshard/counters/scan.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -9,7 +10,13 @@ private:
std::set<ui64> GranulesInProcessing;
i64 BlobsSize = 0;
bool NotIndexedBatchesInitialized = false;
+ const NColumnShard::TConcreteScanCounters Counters;
public:
+ TProcessingController(const NColumnShard::TConcreteScanCounters& counters)
+ : Counters(counters)
+ {
+ }
+
void DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches);
NIndexedReader::TBatch* GetBatchInfo(const TBatchAddress& address);
@@ -18,11 +25,7 @@ public:
return GranulesInProcessing.contains(granuleId);
}
- void Abort() {
- GranulesWaiting.clear();
- GranulesInProcessing.clear();
- BlobsSize = 0;
- }
+ void Abort();
ui64 GetBlobsSize() const {
return BlobsSize;
@@ -32,10 +35,7 @@ public:
return GranulesInProcessing.size();
}
- void StartBlobProcessing(const ui64 granuleId, const TBlobRange& range) {
- GranulesInProcessing.emplace(granuleId);
- BlobsSize += range.Size;
- }
+ void StartBlobProcessing(const ui64 granuleId, const TBlobRange& range);
TGranule::TPtr ExtractReadyVerified(const ui64 granuleId);
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 4a23fbbdfd..994e5056d3 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -34,7 +34,7 @@ std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const T
return BatchCache.Get(blobId);
}
-std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const {
+std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const {
return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters);
}
@@ -176,7 +176,7 @@ std::vector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYq
return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns);
}
-std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const {
+std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TConcreteScanCounters& /*scanCounters*/) const {
return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this());
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 8d6f843444..6e85b5c871 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -116,7 +116,7 @@ public:
virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0;
virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0;
- virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0;
+ virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const = 0;
// TODO: can this only be done for base class?
friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) {
@@ -260,7 +260,7 @@ public:
return SelectInfo->Stats().Blobs;
}
- std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
+ std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const override;
void Dump(IOutputStream& out) const override {
out << "columns: " << GetSchemaColumnsCount()
@@ -301,7 +301,7 @@ public:
std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;
- std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
+ std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const override;
};
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 04e0761279..3b379bb2ad 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -29,7 +29,7 @@ public:
NOlap::TReadMetadata::TConstPtr readMetadata,
const TInstant& deadline,
const TActorId& columnShardActorId,
- ui64 requestCookie, const TScanCounters& counters)
+ ui64 requestCookie, const TConcreteScanCounters& counters)
: TabletId(tabletId)
, DstActor(dstActor)
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
@@ -294,7 +294,7 @@ IActor* CreateReadActor(ui64 tabletId,
NOlap::TReadMetadata::TConstPtr readMetadata,
const TInstant& deadline,
const TActorId& columnShardActorId,
- ui64 requestCookie, const TScanCounters& counters)
+ ui64 requestCookie, const TConcreteScanCounters& counters)
{
return new TReadActor(tabletId, dstActor, std::move(event), readMetadata,
deadline, columnShardActorId, requestCookie, counters);