diff options
author | ivanmorozov <[email protected]> | 2023-10-07 12:47:03 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-10-07 13:08:47 +0300 |
commit | 90a597111f17a27d88c309936517627ea8083acc (patch) | |
tree | 540d1c061d8e624360b34209cf08f66a13ce1130 | |
parent | 1e41b4156c3b2021f9ea315f1b4a5e1660d35f10 (diff) |
KIKIMR-19505: correct signals, logging and control indexation tasks
15 files changed, 132 insertions, 64 deletions
diff --git a/ydb/core/tx/columnshard/background_controller.cpp b/ydb/core/tx/columnshard/background_controller.cpp index 1cdc6ba6e77..62a2b52bc48 100644 --- a/ydb/core/tx/columnshard/background_controller.cpp +++ b/ydb/core/tx/columnshard/background_controller.cpp @@ -4,11 +4,8 @@ namespace NKikimr::NColumnShard { void TBackgroundController::StartTtl(const NOlap::TColumnEngineChanges& changes) { - const NOlap::TTTLColumnEngineChanges* ttlChanges = dynamic_cast<const NOlap::TTTLColumnEngineChanges*>(&changes); - Y_VERIFY(ttlChanges); Y_VERIFY(TtlPortions.empty()); - - TtlPortions = ttlChanges->GetTouchedPortions(); + TtlPortions = changes.GetTouchedPortions(); } bool TBackgroundController::StartCompaction(const NOlap::TPlanCompactionInfo& info, const NOlap::TColumnEngineChanges& changes) { @@ -46,10 +43,34 @@ void TBackgroundController::CheckDeadlines() { } } -void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& /*changes*/) { - ++ActiveIndexing; +void TBackgroundController::CheckDeadlinesIndexation() { + for (auto&& i : ActiveIndexationTasks) { + if (TMonotonic::Now() - i.second > NOlap::TCompactionLimits::CompactionTimeout) { + AFL_EMERG(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("task_id", i.first); + Y_VERIFY_DEBUG(false); + } + } +} + +void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& changes) { + Y_VERIFY(ActiveIndexationTasks.emplace(changes.GetTaskIdentifier(), TMonotonic::Now()).second); +} + +void TBackgroundController::FinishIndexing(const NOlap::TColumnEngineChanges& changes) { + Y_VERIFY(ActiveIndexationTasks.erase(changes.GetTaskIdentifier())); } +TString TBackgroundController::DebugStringIndexation() const { + TStringBuilder sb; + sb << "{"; + sb << "task_ids="; + for (auto&& i : ActiveIndexationTasks) { + sb << i.first << ","; + } + sb << ";"; + sb << "}"; + return sb; +} TString TBackgroundActivity::DebugString() const { return TStringBuilder() diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h index 6fba6a4cd5a..94dec1e5db9 100644 --- a/ydb/core/tx/columnshard/background_controller.h +++ b/ydb/core/tx/columnshard/background_controller.h @@ -46,7 +46,7 @@ private: class TBackgroundController { private: - i64 ActiveIndexing = 0; + THashMap<TString, TMonotonic> ActiveIndexationTasks; using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>; TCurrentCompaction ActiveCompactionInfo; @@ -60,6 +60,7 @@ public: THashSet<NOlap::TPortionAddress> GetConflictCompactionPortions() const; void CheckDeadlines(); + void CheckDeadlinesIndexation(); bool StartCompaction(const NOlap::TPlanCompactionInfo& info, const NOlap::TColumnEngineChanges& changes); void FinishCompaction(const NOlap::TPlanCompactionInfo& info) { @@ -74,15 +75,10 @@ public: } void StartIndexing(const NOlap::TColumnEngineChanges& changes); - void FinishIndexing() { - --ActiveIndexing; - Y_VERIFY(ActiveIndexing >= 0); - } - bool IsIndexingActive() const { - return ActiveIndexing; - } + void FinishIndexing(const NOlap::TColumnEngineChanges& changes); + TString DebugStringIndexation() const; i64 GetIndexingActiveCount() const { - return ActiveIndexing; + return ActiveIndexationTasks.size(); } void StartCleanup() { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index d8df79027e5..ec75118f3ad 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -742,17 +742,13 @@ public: }; void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes) - ("has_more", bytesToIndex >= Limits.MaxInsertBytes); if (bytesToIndex < Limits.MinInsertBytes && dataToIndex.size() < TLimits::MIN_SMALL_BLOBS_TO_INSERT) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size()); - if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size()); ++SkippedIndexations; return; } } - CSCounters.IndexationInput(bytesToIndex); SkippedIndexations = 0; @@ -772,15 +768,21 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing); - const TString taskName = indexChanges->GetTaskIdentifier(); + const TString externalTaskId = indexChanges->GetTaskIdentifier(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes) + ("has_more", bytesToIndex >= Limits.MaxInsertBytes)("external_task_id", externalTaskId); + NOlap::NResourceBroker::NSubscribe::ITask::Start( ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(BlobsReadActor, - std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, memoryNeed, taskName, InsertTaskSubscription)); + std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, memoryNeed, externalTaskId, InsertTaskSubscription)); } void TColumnShard::SetupIndexation() { - if (BackgroundController.IsIndexingActive()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress")("count", BackgroundController.GetIndexingActiveCount())("insert_overload_size", InsertTable->GetCountersCommitted().Bytes); + BackgroundController.CheckDeadlinesIndexation(); + if (BackgroundController.GetIndexingActiveCount()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress") + ("count", BackgroundController.GetIndexingActiveCount())("insert_overload_size", InsertTable->GetCountersCommitted().Bytes) + ("indexing_debug", BackgroundController.DebugStringIndexation()); return; } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_indexation_tasks")("insert_overload_size", InsertTable->GetCountersCommitted().Bytes); @@ -803,7 +805,9 @@ void TColumnShard::SetupIndexation() { } } } - StartIndexTask(std::move(dataToIndex), bytesToIndex); + if (dataToIndex.size()) { + StartIndexTask(std::move(dataToIndex), bytesToIndex); + } } void TColumnShard::SetupCompaction() { @@ -831,11 +835,7 @@ void TColumnShard::SetupCompaction() { bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, const bool force) { CSCounters.OnSetupTtl(); if (BackgroundController.IsTtlActive()) { - LOG_S_DEBUG("TTL already in progress at tablet " << TabletID()); - return false; - } - if (ActiveEvictions) { - LOG_S_DEBUG("Do not start TTL while eviction is in progress at tablet " << TabletID()); + ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress"); return false; } if (force) { @@ -843,20 +843,19 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con } THashMap<ui64, NOlap::TTiering> eviction = pathTtls; for (auto&& i : eviction) { - LOG_S_DEBUG("Prepare TTL evicting path " << i.first << " with " << i.second.GetDebugString() - << " at tablet " << TabletID()); + ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString()); } auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, BackgroundController.GetConflictTTLPortions()); if (!indexChanges) { - LOG_S_INFO("Cannot prepare TTL at tablet " << TabletID()); + ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes"); return false; } const bool needWrites = indexChanges->NeedConstruction(); - LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID()); + ACFL_DEBUG("background", "ttl")("need_writes", needWrites); indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); @@ -873,7 +872,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con void TColumnShard::SetupCleanup() { CSCounters.OnSetupCleanup(); if (BackgroundController.IsCleanupActive()) { - LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID()); + ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress"); return; } @@ -882,10 +881,11 @@ void TColumnShard::SetupCleanup() { auto changes = TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS); if (!changes) { - LOG_S_INFO("Cannot prepare cleanup at tablet " << TabletID()); + ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes"); return; } + ACFL_DEBUG("background", "ttl")("changes_info", changes->DebugString()); auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false); ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp index c451356cfea..cd4f9549cba 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.cpp +++ b/ydb/core/tx/columnshard/counters/common/agent.cpp @@ -16,23 +16,33 @@ bool TValueAggregationAgent::CalcAggregationsAndClean(i64& sum, i64& minValue, i return false; } sum = 0; - minValue = Values.front()->GetValue(); - maxValue = Values.front()->GetValue(); + const TInstant now = TInstant::Now(); + auto minValueLocal = Values.front()->GetValue(now); + auto maxValueLocal = Values.front()->GetValue(now); for (auto it = Values.begin(); it != Values.end();) { if (it->use_count() == 1) { it = Values.erase(it); } else { - const i64 v = (*it)->GetValue(); - sum += v; - if (minValue > v) { - minValue = v; + const std::optional<i64> v = (*it)->GetValue(now); + if (!v) { + ++it; + continue; } - if (maxValue < v) { - maxValue = v; + sum += *v; + if (!minValueLocal || *minValueLocal > *v) { + minValueLocal = *v; + } + if (!maxValueLocal || *maxValueLocal < *v) { + maxValueLocal = *v; } ++it; } } + if (!maxValueLocal) { + return false; + } + minValue = maxValueLocal.value_or(0); + maxValue = maxValueLocal.value_or(0); return true; } diff --git a/ydb/core/tx/columnshard/counters/common/client.h b/ydb/core/tx/columnshard/counters/common/client.h index 2437c3decbd..4bd2e279e1d 100644 --- a/ydb/core/tx/columnshard/counters/common/client.h +++ b/ydb/core/tx/columnshard/counters/common/client.h @@ -1,10 +1,12 @@ #pragma once #include <ydb/library/accessor/accessor.h> +#include <util/datetime/base.h> #include <util/system/types.h> #include <util/generic/noncopyable.h> #include <list> #include <memory> #include <atomic> +#include <optional> namespace NKikimr::NColumnShard { class TValueAggregationAgent; @@ -12,17 +14,29 @@ class TValueAggregationAgent; class TValueAggregationClient: TNonCopyable { private: std::atomic<i64> Value; + std::optional<TInstant> DeadlineActuality; public: TValueAggregationClient() { Value = 0; } - i64 GetValue() const { + std::optional<i64> GetValue(const TInstant reqInstant) const { + if (!DeadlineActuality || *DeadlineActuality > reqInstant) { + return Value; + } else { + return {}; + } + } + + i64 GetValueSimple() const { return Value; } - void SetValue(const i64 value) { + void SetValue(const i64 value, const std::optional<TInstant> d = {}) { Value = value; + if (d) { + DeadlineActuality = d; + } } void Add(const i64 v) { diff --git a/ydb/core/tx/columnshard/counters/common_data.h b/ydb/core/tx/columnshard/counters/common_data.h index fa00742cd9b..13c793d36d5 100644 --- a/ydb/core/tx/columnshard/counters/common_data.h +++ b/ydb/core/tx/columnshard/counters/common_data.h @@ -22,7 +22,7 @@ public: TDataOwnerSignals(const TString& module, const TString dataName); i64 GetDataSize() const { - return DataSize->GetValue(); + return DataSize->GetValueSimple(); } void Add(const ui64 size, const bool load) const { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index b280e7a1b5c..19a42befcbd 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -61,7 +61,7 @@ void TInsertColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard } void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) { - self.BackgroundController.FinishIndexing(); + self.BackgroundController.FinishIndexing(*this); } TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 2f70988033d..bf5d41474d0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -403,7 +403,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering const TInstant maxTtlPortionInstant = *mpiOpt; const TDuration d = maxTtlPortionInstant - *expireTimestampOpt; keep = !!d; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds()); if (d && dWaiting > d) { dWaiting = d; } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h index b81871b673b..e43f9acda67 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h @@ -58,7 +58,7 @@ private: public: std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule; i64 GetSmallCounts() const { - return SmallPortionsByGranule->GetValue(); + return SmallPortionsByGranule->GetValueSimple(); } TCounters() { @@ -84,13 +84,13 @@ public: void OnAddSmallPortion() { SmallPortionsCount->Add(1); SmallPortionsByGranule->Add(1); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_opt_count", SmallPortionsByGranule->GetValueSimple())("counter", (ui64)SmallPortionsByGranule.get()); } void OnRemoveSmallPortion() { SmallPortionsCount->Sub(1); SmallPortionsByGranule->Remove(1); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("remove_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("remove_opt_count", SmallPortionsByGranule->GetValueSimple())("counter", (ui64)SmallPortionsByGranule.get()); } }; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h index be808941e87..c0c3e2ca3d2 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h @@ -15,6 +15,8 @@ private: std::shared_ptr<NColumnShard::TValueAggregationAgent> CriticalRecordsCount; std::shared_ptr<NColumnShard::TValueAggregationAgent> NormalRecordsCount; + + std::shared_ptr<NColumnShard::TValueAggregationAgent> OldestCriticalActuality; public: TGlobalCounters() : TBase("LevelsStorageOptimizer") @@ -22,9 +24,14 @@ public: SmallPortionsCount = TBase::GetValue("SmallPortions/Count"); CriticalRecordsCount = TBase::GetValueAutoAggregations("Granule/CriticalRecord/Count"); NormalRecordsCount = TBase::GetValueAutoAggregations("Granule/NormalRecord/Count"); + OldestCriticalActuality = TBase::GetValueAutoAggregations("Granule/ActualityMs"); SmallPortionsCountByGranule = TBase::GetValueAutoAggregations("Granule/SmallPortions/Count"); } + static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildOldestCriticalActualityAggregation() { + return Singleton<TGlobalCounters>()->OldestCriticalActuality->GetClient(); + } + static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildClientSmallPortionsAggregation() { return Singleton<TGlobalCounters>()->SmallPortionsCountByGranule->GetClient(); } @@ -48,11 +55,13 @@ private: std::shared_ptr<NColumnShard::TValueAggregationClient> CriticalRecordsCount; std::shared_ptr<NColumnShard::TValueAggregationClient> NormalRecordsCount; + std::shared_ptr<NColumnShard::TValueAggregationClient> OldestCriticalActuality; + std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount; std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule; public: i64 GetSmallCounts() const { - return SmallPortionsByGranule->GetValue(); + return SmallPortionsByGranule->GetValueSimple(); } TCounters() { @@ -60,6 +69,11 @@ public: NormalRecordsCount = TGlobalCounters::BuildNormalRecordsCountAggregation(); SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard(); SmallPortionsByGranule = TGlobalCounters::BuildClientSmallPortionsAggregation(); + OldestCriticalActuality = TGlobalCounters::BuildOldestCriticalActualityAggregation(); + } + + void OnMinProblemSnapshot(const TDuration d) { + OldestCriticalActuality->SetValue(d.MilliSeconds(), TInstant::Now() + TDuration::Seconds(10)); } void OnAddCriticalCount(const ui32 count) { diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp index afae8e4beb6..ce5897d9098 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp +++ b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp @@ -19,7 +19,7 @@ void TActor::Handle(TEvStartTask::TPtr& ev) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ask_resources")("task", task->DebugString()); Tasks.emplace(++Counter, task); Send(NKikimr::NResourceBroker::MakeResourceBrokerID(), new NKikimr::NResourceBroker::TEvResourceBroker::TEvSubmitTask( - task->GetName(), + task->GetExternalTaskId(), {{task->GetCPUAllocation(), task->GetMemoryAllocation()}}, task->GetType(), task->GetPriority(), diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp index be0794b9f69..785ca04ba8f 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp +++ b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp @@ -16,8 +16,11 @@ TSubscriberTypeCounters::TSubscriberTypeCounters(const TSubscriberCounters& owne { DeepSubGroup("ResourceType", resourceType); - RequestsCount = TBase::GetDeriviative("Requests/Count"); RequestBytes = TBase::GetDeriviative("Requests/Bytes"); + RequestsCount = TBase::GetDeriviative("Requests/Count"); + + BytesRequested = TBase::GetValueAutoAggregationsClient("Requested/Bytes"); + CountRequested = TBase::GetValueAutoAggregationsClient("Requested/Count"); RepliesCount = TBase::GetDeriviative("Replies/Count"); ReplyBytes = TBase::GetDeriviative("Replies/Bytes"); diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.h b/ydb/core/tx/columnshard/resource_subscriber/counters.h index f6fafbeacaf..230222f8ffa 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/counters.h +++ b/ydb/core/tx/columnshard/resource_subscriber/counters.h @@ -12,23 +12,30 @@ private: using TBase = NColumnShard::TCommonCountersOwner; NMonitoring::TDynamicCounters::TCounterPtr RequestsCount; NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; + std::shared_ptr<NColumnShard::TValueAggregationClient> CountRequested; + std::shared_ptr<NColumnShard::TValueAggregationClient> BytesRequested; NMonitoring::TDynamicCounters::TCounterPtr RepliesCount; NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes; YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, BytesAllocated); YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, CountAllocated); + public: TSubscriberTypeCounters(const TSubscriberCounters& owner, const TString& resourceType); void OnRequest(const ui64 bytes) const { RequestsCount->Add(1); RequestBytes->Add(bytes); + CountRequested->Add(1); + BytesRequested->Add(bytes); } void OnReply(const ui64 bytes) const { RepliesCount->Add(1); ReplyBytes->Add(bytes); + CountRequested->Remove(1); + BytesRequested->Remove(bytes); } }; diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.cpp b/ydb/core/tx/columnshard/resource_subscriber/task.cpp index df1a7650def..053a4d4431d 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/task.cpp +++ b/ydb/core/tx/columnshard/resource_subscriber/task.cpp @@ -6,7 +6,8 @@ namespace NKikimr::NOlap::NResourceBroker::NSubscribe { void ITask::OnAllocationSuccess(const ui64 taskId, const NActors::TActorId& senderId) { - DoOnAllocationSuccess(std::make_shared<TResourcesGuard>(taskId, *this, senderId, Context)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "resource_allocated")("external_task_id", ExternalTaskId)("mem", MemoryAllocation)("cpu", CPUAllocation); + DoOnAllocationSuccess(std::make_shared<TResourcesGuard>(taskId, ExternalTaskId, *this, senderId, Context)); } void ITask::Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>& task) { @@ -17,21 +18,22 @@ TResourcesGuard::~TResourcesGuard() { if (!NActors::TlsActivationContext) { return; } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("external_task_id", ExternalTaskId)("mem", Memory)("cpu", Cpu); auto ev = std::make_unique<IEventHandle>(NKikimr::NResourceBroker::MakeResourceBrokerID(), Sender, new NKikimr::NResourceBroker::TEvResourceBroker::TEvFinishTask(TaskId)); NActors::TActorContext::AsActorContext().Send(std::move(ev)); Context.GetCounters()->GetBytesAllocated()->Remove(Memory); } -TResourcesGuard::TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context) +TResourcesGuard::TResourcesGuard(const ui64 taskId, const TString& externalTaskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context) : TaskId(taskId) + , ExternalTaskId(externalTaskId) , Sender(sender) , Memory(task.GetMemoryAllocation()) , Cpu(task.GetCPUAllocation()) , Context(context) { Context.GetCounters()->GetBytesAllocated()->Add(Memory); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "allocate_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "allocate_resources")("external_task_id", ExternalTaskId)("task_id", TaskId)("mem", Memory)("cpu", Cpu); } } diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.h b/ydb/core/tx/columnshard/resource_subscriber/task.h index 39ef7356f90..850bdb828ae 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/task.h +++ b/ydb/core/tx/columnshard/resource_subscriber/task.h @@ -21,12 +21,13 @@ public: class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter<TResourcesGuard> { private: const ui64 TaskId; + const TString ExternalTaskId; const NActors::TActorId Sender; const ui64 Memory; const ui32 Cpu; const TTaskContext Context; public: - TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context); + TResourcesGuard(const ui64 taskId, const TString& externalTaskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context); ~TResourcesGuard(); }; @@ -34,17 +35,17 @@ class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> { private: YDB_READONLY(ui32, CPUAllocation, 0); YDB_READONLY(ui64, MemoryAllocation, 0); - YDB_READONLY_DEF(TString, Name); + YDB_READONLY_DEF(TString, ExternalTaskId); YDB_READONLY_DEF(TString, Type); YDB_ACCESSOR(ui64, Priority, 0); TTaskContext Context; protected: virtual void DoOnAllocationSuccess(const std::shared_ptr<TResourcesGuard>& guard) = 0; public: - ITask(const ui32 cpu, const ui64 memory, const TString& name, const TTaskContext& context) + ITask(const ui32 cpu, const ui64 memory, const TString& externalTaskId, const TTaskContext& context) : CPUAllocation(cpu) , MemoryAllocation(memory) - , Name(name) + , ExternalTaskId(externalTaskId) , Type(context.GetTypeName()) , Context(context) { @@ -56,7 +57,7 @@ public: } TString DebugString() const { - return TStringBuilder() << "cpu=" << CPUAllocation << ";mem=" << MemoryAllocation << ";name=" << Name << ";type=" << Type << ";priority=" << Priority << ";"; + return TStringBuilder() << "cpu=" << CPUAllocation << ";mem=" << MemoryAllocation << ";external_task_id=" << ExternalTaskId << ";type=" << Type << ";priority=" << Priority << ";"; } virtual ~ITask() = default; |