summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-10-07 12:47:03 +0300
committerivanmorozov <[email protected]>2023-10-07 13:08:47 +0300
commit90a597111f17a27d88c309936517627ea8083acc (patch)
tree540d1c061d8e624360b34209cf08f66a13ce1130
parent1e41b4156c3b2021f9ea315f1b4a5e1660d35f10 (diff)
KIKIMR-19505: correct signals, logging and control indexation tasks
-rw-r--r--ydb/core/tx/columnshard/background_controller.cpp33
-rw-r--r--ydb/core/tx/columnshard/background_controller.h14
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp42
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.cpp26
-rw-r--r--ydb/core/tx/columnshard/counters/common/client.h18
-rw-r--r--ydb/core/tx/columnshard/counters/common_data.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h16
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/counters.cpp5
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/counters.h7
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/task.cpp10
-rw-r--r--ydb/core/tx/columnshard/resource_subscriber/task.h11
15 files changed, 132 insertions, 64 deletions
diff --git a/ydb/core/tx/columnshard/background_controller.cpp b/ydb/core/tx/columnshard/background_controller.cpp
index 1cdc6ba6e77..62a2b52bc48 100644
--- a/ydb/core/tx/columnshard/background_controller.cpp
+++ b/ydb/core/tx/columnshard/background_controller.cpp
@@ -4,11 +4,8 @@
namespace NKikimr::NColumnShard {
void TBackgroundController::StartTtl(const NOlap::TColumnEngineChanges& changes) {
- const NOlap::TTTLColumnEngineChanges* ttlChanges = dynamic_cast<const NOlap::TTTLColumnEngineChanges*>(&changes);
- Y_VERIFY(ttlChanges);
Y_VERIFY(TtlPortions.empty());
-
- TtlPortions = ttlChanges->GetTouchedPortions();
+ TtlPortions = changes.GetTouchedPortions();
}
bool TBackgroundController::StartCompaction(const NOlap::TPlanCompactionInfo& info, const NOlap::TColumnEngineChanges& changes) {
@@ -46,10 +43,34 @@ void TBackgroundController::CheckDeadlines() {
}
}
-void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& /*changes*/) {
- ++ActiveIndexing;
+void TBackgroundController::CheckDeadlinesIndexation() {
+ for (auto&& i : ActiveIndexationTasks) {
+ if (TMonotonic::Now() - i.second > NOlap::TCompactionLimits::CompactionTimeout) {
+ AFL_EMERG(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("task_id", i.first);
+ Y_VERIFY_DEBUG(false);
+ }
+ }
+}
+
+void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& changes) {
+ Y_VERIFY(ActiveIndexationTasks.emplace(changes.GetTaskIdentifier(), TMonotonic::Now()).second);
+}
+
+void TBackgroundController::FinishIndexing(const NOlap::TColumnEngineChanges& changes) {
+ Y_VERIFY(ActiveIndexationTasks.erase(changes.GetTaskIdentifier()));
}
+TString TBackgroundController::DebugStringIndexation() const {
+ TStringBuilder sb;
+ sb << "{";
+ sb << "task_ids=";
+ for (auto&& i : ActiveIndexationTasks) {
+ sb << i.first << ",";
+ }
+ sb << ";";
+ sb << "}";
+ return sb;
+}
TString TBackgroundActivity::DebugString() const {
return TStringBuilder()
diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h
index 6fba6a4cd5a..94dec1e5db9 100644
--- a/ydb/core/tx/columnshard/background_controller.h
+++ b/ydb/core/tx/columnshard/background_controller.h
@@ -46,7 +46,7 @@ private:
class TBackgroundController {
private:
- i64 ActiveIndexing = 0;
+ THashMap<TString, TMonotonic> ActiveIndexationTasks;
using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>;
TCurrentCompaction ActiveCompactionInfo;
@@ -60,6 +60,7 @@ public:
THashSet<NOlap::TPortionAddress> GetConflictCompactionPortions() const;
void CheckDeadlines();
+ void CheckDeadlinesIndexation();
bool StartCompaction(const NOlap::TPlanCompactionInfo& info, const NOlap::TColumnEngineChanges& changes);
void FinishCompaction(const NOlap::TPlanCompactionInfo& info) {
@@ -74,15 +75,10 @@ public:
}
void StartIndexing(const NOlap::TColumnEngineChanges& changes);
- void FinishIndexing() {
- --ActiveIndexing;
- Y_VERIFY(ActiveIndexing >= 0);
- }
- bool IsIndexingActive() const {
- return ActiveIndexing;
- }
+ void FinishIndexing(const NOlap::TColumnEngineChanges& changes);
+ TString DebugStringIndexation() const;
i64 GetIndexingActiveCount() const {
- return ActiveIndexing;
+ return ActiveIndexationTasks.size();
}
void StartCleanup() {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index d8df79027e5..ec75118f3ad 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -742,17 +742,13 @@ public:
};
void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dataToIndex, const i64 bytesToIndex) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)
- ("has_more", bytesToIndex >= Limits.MaxInsertBytes);
if (bytesToIndex < Limits.MinInsertBytes && dataToIndex.size() < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size());
-
if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size());
++SkippedIndexations;
return;
}
}
-
CSCounters.IndexationInput(bytesToIndex);
SkippedIndexations = 0;
@@ -772,15 +768,21 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing);
- const TString taskName = indexChanges->GetTaskIdentifier();
+ const TString externalTaskId = indexChanges->GetTaskIdentifier();
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "indexation")("bytes", bytesToIndex)("blobs_count", dataToIndex.size())("max_limit", (i64)Limits.MaxInsertBytes)
+ ("has_more", bytesToIndex >= Limits.MaxInsertBytes)("external_task_id", externalTaskId);
+
NOlap::NResourceBroker::NSubscribe::ITask::Start(
ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(BlobsReadActor,
- std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, memoryNeed, taskName, InsertTaskSubscription));
+ std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters), 0, memoryNeed, externalTaskId, InsertTaskSubscription));
}
void TColumnShard::SetupIndexation() {
- if (BackgroundController.IsIndexingActive()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress")("count", BackgroundController.GetIndexingActiveCount())("insert_overload_size", InsertTable->GetCountersCommitted().Bytes);
+ BackgroundController.CheckDeadlinesIndexation();
+ if (BackgroundController.GetIndexingActiveCount()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_indexation")("reason", "in_progress")
+ ("count", BackgroundController.GetIndexingActiveCount())("insert_overload_size", InsertTable->GetCountersCommitted().Bytes)
+ ("indexing_debug", BackgroundController.DebugStringIndexation());
return;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_indexation_tasks")("insert_overload_size", InsertTable->GetCountersCommitted().Bytes);
@@ -803,7 +805,9 @@ void TColumnShard::SetupIndexation() {
}
}
}
- StartIndexTask(std::move(dataToIndex), bytesToIndex);
+ if (dataToIndex.size()) {
+ StartIndexTask(std::move(dataToIndex), bytesToIndex);
+ }
}
void TColumnShard::SetupCompaction() {
@@ -831,11 +835,7 @@ void TColumnShard::SetupCompaction() {
bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, const bool force) {
CSCounters.OnSetupTtl();
if (BackgroundController.IsTtlActive()) {
- LOG_S_DEBUG("TTL already in progress at tablet " << TabletID());
- return false;
- }
- if (ActiveEvictions) {
- LOG_S_DEBUG("Do not start TTL while eviction is in progress at tablet " << TabletID());
+ ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress");
return false;
}
if (force) {
@@ -843,20 +843,19 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
}
THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
for (auto&& i : eviction) {
- LOG_S_DEBUG("Prepare TTL evicting path " << i.first << " with " << i.second.GetDebugString()
- << " at tablet " << TabletID());
+ ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString());
}
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, BackgroundController.GetConflictTTLPortions());
if (!indexChanges) {
- LOG_S_INFO("Cannot prepare TTL at tablet " << TabletID());
+ ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
return false;
}
const bool needWrites = indexChanges->NeedConstruction();
- LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID());
+ ACFL_DEBUG("background", "ttl")("need_writes", needWrites);
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
@@ -873,7 +872,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
void TColumnShard::SetupCleanup() {
CSCounters.OnSetupCleanup();
if (BackgroundController.IsCleanupActive()) {
- LOG_S_DEBUG("Cleanup already in progress at tablet " << TabletID());
+ ACFL_DEBUG("background", "ttl")("skip_reason", "in_progress");
return;
}
@@ -882,10 +881,11 @@ void TColumnShard::SetupCleanup() {
auto changes =
TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS);
if (!changes) {
- LOG_S_INFO("Cannot prepare cleanup at tablet " << TabletID());
+ ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
return;
}
+ ACFL_DEBUG("background", "ttl")("changes_info", changes->DebugString());
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp
index c451356cfea..cd4f9549cba 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.cpp
+++ b/ydb/core/tx/columnshard/counters/common/agent.cpp
@@ -16,23 +16,33 @@ bool TValueAggregationAgent::CalcAggregationsAndClean(i64& sum, i64& minValue, i
return false;
}
sum = 0;
- minValue = Values.front()->GetValue();
- maxValue = Values.front()->GetValue();
+ const TInstant now = TInstant::Now();
+ auto minValueLocal = Values.front()->GetValue(now);
+ auto maxValueLocal = Values.front()->GetValue(now);
for (auto it = Values.begin(); it != Values.end();) {
if (it->use_count() == 1) {
it = Values.erase(it);
} else {
- const i64 v = (*it)->GetValue();
- sum += v;
- if (minValue > v) {
- minValue = v;
+ const std::optional<i64> v = (*it)->GetValue(now);
+ if (!v) {
+ ++it;
+ continue;
}
- if (maxValue < v) {
- maxValue = v;
+ sum += *v;
+ if (!minValueLocal || *minValueLocal > *v) {
+ minValueLocal = *v;
+ }
+ if (!maxValueLocal || *maxValueLocal < *v) {
+ maxValueLocal = *v;
}
++it;
}
}
+ if (!maxValueLocal) {
+ return false;
+ }
+ minValue = maxValueLocal.value_or(0);
+ maxValue = maxValueLocal.value_or(0);
return true;
}
diff --git a/ydb/core/tx/columnshard/counters/common/client.h b/ydb/core/tx/columnshard/counters/common/client.h
index 2437c3decbd..4bd2e279e1d 100644
--- a/ydb/core/tx/columnshard/counters/common/client.h
+++ b/ydb/core/tx/columnshard/counters/common/client.h
@@ -1,10 +1,12 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
+#include <util/datetime/base.h>
#include <util/system/types.h>
#include <util/generic/noncopyable.h>
#include <list>
#include <memory>
#include <atomic>
+#include <optional>
namespace NKikimr::NColumnShard {
class TValueAggregationAgent;
@@ -12,17 +14,29 @@ class TValueAggregationAgent;
class TValueAggregationClient: TNonCopyable {
private:
std::atomic<i64> Value;
+ std::optional<TInstant> DeadlineActuality;
public:
TValueAggregationClient() {
Value = 0;
}
- i64 GetValue() const {
+ std::optional<i64> GetValue(const TInstant reqInstant) const {
+ if (!DeadlineActuality || *DeadlineActuality > reqInstant) {
+ return Value;
+ } else {
+ return {};
+ }
+ }
+
+ i64 GetValueSimple() const {
return Value;
}
- void SetValue(const i64 value) {
+ void SetValue(const i64 value, const std::optional<TInstant> d = {}) {
Value = value;
+ if (d) {
+ DeadlineActuality = d;
+ }
}
void Add(const i64 v) {
diff --git a/ydb/core/tx/columnshard/counters/common_data.h b/ydb/core/tx/columnshard/counters/common_data.h
index fa00742cd9b..13c793d36d5 100644
--- a/ydb/core/tx/columnshard/counters/common_data.h
+++ b/ydb/core/tx/columnshard/counters/common_data.h
@@ -22,7 +22,7 @@ public:
TDataOwnerSignals(const TString& module, const TString dataName);
i64 GetDataSize() const {
- return DataSize->GetValue();
+ return DataSize->GetValueSimple();
}
void Add(const ui64 size, const bool load) const {
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index b280e7a1b5c..19a42befcbd 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -61,7 +61,7 @@ void TInsertColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard
}
void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
- self.BackgroundController.FinishIndexing();
+ self.BackgroundController.FinishIndexing(*this);
}
TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 2f70988033d..bf5d41474d0 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -403,7 +403,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
const TInstant maxTtlPortionInstant = *mpiOpt;
const TDuration d = maxTtlPortionInstant - *expireTimestampOpt;
keep = !!d;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "keep_detect")("max", maxTtlPortionInstant.Seconds())("expire", expireTimestampOpt->Seconds());
if (d && dWaiting > d) {
dWaiting = d;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h
index b81871b673b..e43f9acda67 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/counters.h
@@ -58,7 +58,7 @@ private:
public:
std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule;
i64 GetSmallCounts() const {
- return SmallPortionsByGranule->GetValue();
+ return SmallPortionsByGranule->GetValueSimple();
}
TCounters() {
@@ -84,13 +84,13 @@ public:
void OnAddSmallPortion() {
SmallPortionsCount->Add(1);
SmallPortionsByGranule->Add(1);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("add_opt_count", SmallPortionsByGranule->GetValueSimple())("counter", (ui64)SmallPortionsByGranule.get());
}
void OnRemoveSmallPortion() {
SmallPortionsCount->Sub(1);
SmallPortionsByGranule->Remove(1);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("remove_opt_count", SmallPortionsByGranule->GetValue())("counter", (ui64)SmallPortionsByGranule.get());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("remove_opt_count", SmallPortionsByGranule->GetValueSimple())("counter", (ui64)SmallPortionsByGranule.get());
}
};
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h
index be808941e87..c0c3e2ca3d2 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/counters.h
@@ -15,6 +15,8 @@ private:
std::shared_ptr<NColumnShard::TValueAggregationAgent> CriticalRecordsCount;
std::shared_ptr<NColumnShard::TValueAggregationAgent> NormalRecordsCount;
+
+ std::shared_ptr<NColumnShard::TValueAggregationAgent> OldestCriticalActuality;
public:
TGlobalCounters()
: TBase("LevelsStorageOptimizer")
@@ -22,9 +24,14 @@ public:
SmallPortionsCount = TBase::GetValue("SmallPortions/Count");
CriticalRecordsCount = TBase::GetValueAutoAggregations("Granule/CriticalRecord/Count");
NormalRecordsCount = TBase::GetValueAutoAggregations("Granule/NormalRecord/Count");
+ OldestCriticalActuality = TBase::GetValueAutoAggregations("Granule/ActualityMs");
SmallPortionsCountByGranule = TBase::GetValueAutoAggregations("Granule/SmallPortions/Count");
}
+ static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildOldestCriticalActualityAggregation() {
+ return Singleton<TGlobalCounters>()->OldestCriticalActuality->GetClient();
+ }
+
static std::shared_ptr<NColumnShard::TValueAggregationClient> BuildClientSmallPortionsAggregation() {
return Singleton<TGlobalCounters>()->SmallPortionsCountByGranule->GetClient();
}
@@ -48,11 +55,13 @@ private:
std::shared_ptr<NColumnShard::TValueAggregationClient> CriticalRecordsCount;
std::shared_ptr<NColumnShard::TValueAggregationClient> NormalRecordsCount;
+ std::shared_ptr<NColumnShard::TValueAggregationClient> OldestCriticalActuality;
+
std::shared_ptr<NColumnShard::TValueGuard> SmallPortionsCount;
std::shared_ptr<NColumnShard::TValueAggregationClient> SmallPortionsByGranule;
public:
i64 GetSmallCounts() const {
- return SmallPortionsByGranule->GetValue();
+ return SmallPortionsByGranule->GetValueSimple();
}
TCounters() {
@@ -60,6 +69,11 @@ public:
NormalRecordsCount = TGlobalCounters::BuildNormalRecordsCountAggregation();
SmallPortionsCount = TGlobalCounters::BuildSmallPortionsGuard();
SmallPortionsByGranule = TGlobalCounters::BuildClientSmallPortionsAggregation();
+ OldestCriticalActuality = TGlobalCounters::BuildOldestCriticalActualityAggregation();
+ }
+
+ void OnMinProblemSnapshot(const TDuration d) {
+ OldestCriticalActuality->SetValue(d.MilliSeconds(), TInstant::Now() + TDuration::Seconds(10));
}
void OnAddCriticalCount(const ui32 count) {
diff --git a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
index afae8e4beb6..ce5897d9098 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
+++ b/ydb/core/tx/columnshard/resource_subscriber/actor.cpp
@@ -19,7 +19,7 @@ void TActor::Handle(TEvStartTask::TPtr& ev) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ask_resources")("task", task->DebugString());
Tasks.emplace(++Counter, task);
Send(NKikimr::NResourceBroker::MakeResourceBrokerID(), new NKikimr::NResourceBroker::TEvResourceBroker::TEvSubmitTask(
- task->GetName(),
+ task->GetExternalTaskId(),
{{task->GetCPUAllocation(), task->GetMemoryAllocation()}},
task->GetType(),
task->GetPriority(),
diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
index be0794b9f69..785ca04ba8f 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
+++ b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp
@@ -16,8 +16,11 @@ TSubscriberTypeCounters::TSubscriberTypeCounters(const TSubscriberCounters& owne
{
DeepSubGroup("ResourceType", resourceType);
- RequestsCount = TBase::GetDeriviative("Requests/Count");
RequestBytes = TBase::GetDeriviative("Requests/Bytes");
+ RequestsCount = TBase::GetDeriviative("Requests/Count");
+
+ BytesRequested = TBase::GetValueAutoAggregationsClient("Requested/Bytes");
+ CountRequested = TBase::GetValueAutoAggregationsClient("Requested/Count");
RepliesCount = TBase::GetDeriviative("Replies/Count");
ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.h b/ydb/core/tx/columnshard/resource_subscriber/counters.h
index f6fafbeacaf..230222f8ffa 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/counters.h
+++ b/ydb/core/tx/columnshard/resource_subscriber/counters.h
@@ -12,23 +12,30 @@ private:
using TBase = NColumnShard::TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
+ std::shared_ptr<NColumnShard::TValueAggregationClient> CountRequested;
+ std::shared_ptr<NColumnShard::TValueAggregationClient> BytesRequested;
NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, BytesAllocated);
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TValueAggregationClient>, CountAllocated);
+
public:
TSubscriberTypeCounters(const TSubscriberCounters& owner, const TString& resourceType);
void OnRequest(const ui64 bytes) const {
RequestsCount->Add(1);
RequestBytes->Add(bytes);
+ CountRequested->Add(1);
+ BytesRequested->Add(bytes);
}
void OnReply(const ui64 bytes) const {
RepliesCount->Add(1);
ReplyBytes->Add(bytes);
+ CountRequested->Remove(1);
+ BytesRequested->Remove(bytes);
}
};
diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.cpp b/ydb/core/tx/columnshard/resource_subscriber/task.cpp
index df1a7650def..053a4d4431d 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/task.cpp
+++ b/ydb/core/tx/columnshard/resource_subscriber/task.cpp
@@ -6,7 +6,8 @@
namespace NKikimr::NOlap::NResourceBroker::NSubscribe {
void ITask::OnAllocationSuccess(const ui64 taskId, const NActors::TActorId& senderId) {
- DoOnAllocationSuccess(std::make_shared<TResourcesGuard>(taskId, *this, senderId, Context));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "resource_allocated")("external_task_id", ExternalTaskId)("mem", MemoryAllocation)("cpu", CPUAllocation);
+ DoOnAllocationSuccess(std::make_shared<TResourcesGuard>(taskId, ExternalTaskId, *this, senderId, Context));
}
void ITask::Start(const NActors::TActorId& actorId, const std::shared_ptr<ITask>& task) {
@@ -17,21 +18,22 @@ TResourcesGuard::~TResourcesGuard() {
if (!NActors::TlsActivationContext) {
return;
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "free_resources")("task_id", TaskId)("external_task_id", ExternalTaskId)("mem", Memory)("cpu", Cpu);
auto ev = std::make_unique<IEventHandle>(NKikimr::NResourceBroker::MakeResourceBrokerID(), Sender, new NKikimr::NResourceBroker::TEvResourceBroker::TEvFinishTask(TaskId));
NActors::TActorContext::AsActorContext().Send(std::move(ev));
Context.GetCounters()->GetBytesAllocated()->Remove(Memory);
}
-TResourcesGuard::TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context)
+TResourcesGuard::TResourcesGuard(const ui64 taskId, const TString& externalTaskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context)
: TaskId(taskId)
+ , ExternalTaskId(externalTaskId)
, Sender(sender)
, Memory(task.GetMemoryAllocation())
, Cpu(task.GetCPUAllocation())
, Context(context)
{
Context.GetCounters()->GetBytesAllocated()->Add(Memory);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "allocate_resources")("task_id", TaskId)("mem", Memory)("cpu", Cpu);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "allocate_resources")("external_task_id", ExternalTaskId)("task_id", TaskId)("mem", Memory)("cpu", Cpu);
}
}
diff --git a/ydb/core/tx/columnshard/resource_subscriber/task.h b/ydb/core/tx/columnshard/resource_subscriber/task.h
index 39ef7356f90..850bdb828ae 100644
--- a/ydb/core/tx/columnshard/resource_subscriber/task.h
+++ b/ydb/core/tx/columnshard/resource_subscriber/task.h
@@ -21,12 +21,13 @@ public:
class TResourcesGuard: public NColumnShard::TMonitoringObjectsCounter<TResourcesGuard> {
private:
const ui64 TaskId;
+ const TString ExternalTaskId;
const NActors::TActorId Sender;
const ui64 Memory;
const ui32 Cpu;
const TTaskContext Context;
public:
- TResourcesGuard(const ui64 taskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context);
+ TResourcesGuard(const ui64 taskId, const TString& externalTaskId, const ITask& task, const NActors::TActorId& sender, const TTaskContext& context);
~TResourcesGuard();
};
@@ -34,17 +35,17 @@ class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> {
private:
YDB_READONLY(ui32, CPUAllocation, 0);
YDB_READONLY(ui64, MemoryAllocation, 0);
- YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY_DEF(TString, ExternalTaskId);
YDB_READONLY_DEF(TString, Type);
YDB_ACCESSOR(ui64, Priority, 0);
TTaskContext Context;
protected:
virtual void DoOnAllocationSuccess(const std::shared_ptr<TResourcesGuard>& guard) = 0;
public:
- ITask(const ui32 cpu, const ui64 memory, const TString& name, const TTaskContext& context)
+ ITask(const ui32 cpu, const ui64 memory, const TString& externalTaskId, const TTaskContext& context)
: CPUAllocation(cpu)
, MemoryAllocation(memory)
- , Name(name)
+ , ExternalTaskId(externalTaskId)
, Type(context.GetTypeName())
, Context(context)
{
@@ -56,7 +57,7 @@ public:
}
TString DebugString() const {
- return TStringBuilder() << "cpu=" << CPUAllocation << ";mem=" << MemoryAllocation << ";name=" << Name << ";type=" << Type << ";priority=" << Priority << ";";
+ return TStringBuilder() << "cpu=" << CPUAllocation << ";mem=" << MemoryAllocation << ";external_task_id=" << ExternalTaskId << ";type=" << Type << ";priority=" << Priority << ";";
}
virtual ~ITask() = default;