aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-06-06 21:22:09 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-06-06 21:22:09 +0300
commitf18d76db543cf768f466baf6e44549a6556daec6 (patch)
tree5d2c4f51facd39d595c180aa946e434655a9816c
parentd0919e63bf76e5117a2544b75b9ea3ed3dac7b77 (diff)
downloadydb-f18d76db543cf768f466baf6e44549a6556daec6.tar.gz
correct signals
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp58
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h7
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.cpp6
-rw-r--r--ydb/core/tx/columnshard/counters/common/agent.h9
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.h8
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.cpp1
-rw-r--r--ydb/core/tx/columnshard/counters/indexation.h9
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h3
9 files changed, 66 insertions, 37 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index b653f1b8e2..f1e7819a06 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -20,6 +20,7 @@ TLogoBlobID ParseLogoBlobId(TString blobId) {
struct TBlobBatch::TBatchInfo : TNonCopyable {
TIntrusivePtr<TTabletStorageInfo> TabletInfo;
TAllocatedGenStepConstPtr GenStepRef;
+ const TBlobsManagerCounters Counters;
const ui32 Gen;
const ui32 Step;
const ui32 Channel;
@@ -30,15 +31,16 @@ struct TBlobBatch::TBatchInfo : TNonCopyable {
ui64 TotalSizeBytes;
std::vector<TString> SmallBlobs;
- TBatchInfo(TIntrusivePtr<TTabletStorageInfo> tabletInfo, TAllocatedGenStepConstPtr genStep, ui32 channel)
+ TBatchInfo(TIntrusivePtr<TTabletStorageInfo> tabletInfo, TAllocatedGenStepConstPtr genStep, ui32 channel, const TBlobsManagerCounters& counters)
: TabletInfo(tabletInfo)
, GenStepRef(genStep)
+ , Counters(counters)
, Gen(std::get<0>(GenStepRef->GenStep))
, Step(std::get<1>(GenStepRef->GenStep))
, Channel(channel)
, InFlightCount(0)
- , TotalSizeBytes(0)
- {}
+ , TotalSizeBytes(0) {
+ }
TUnifiedBlobId NextBlobId(ui32 blobSize) {
BlobSizes.push_back(blobSize);
@@ -56,8 +58,9 @@ struct TBlobBatch::TBatchInfo : TNonCopyable {
TUnifiedBlobId AddSmallBlob(const TString& data) {
// NOTE: small blobs are not included into TotalSizeBytes
+ Counters.OnAddSmallBlob(data.size());
SmallBlobs.push_back(data);
- return MakeSmallBlobId(SmallBlobs.size()-1);
+ return MakeSmallBlobId(SmallBlobs.size() - 1);
}
TUnifiedBlobId MakeSmallBlobId(ui32 i) const {
@@ -102,9 +105,11 @@ TUnifiedBlobId TBlobBatch::SendWriteBlobRequest(const TString& blobData, TInstan
void TBlobBatch::OnBlobWriteResult(TEvBlobStorage::TEvPutResult::TPtr& ev) {
TLogoBlobID blobId = ev->Get()->Id;
+ BatchInfo->Counters.OnPutResult(blobId.BlobSize());
Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "The caller must handle unsuccessful status");
Y_VERIFY(BatchInfo);
Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str());
+
BatchInfo->InFlight[blobId.Cookie()] = false;
--BatchInfo->InFlightCount;
Y_VERIFY(BatchInfo->InFlightCount >= 0);
@@ -164,13 +169,16 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
for (const auto& unifiedBlobId : blobsToDelete) {
if (unifiedBlobId.IsSmallBlob()) {
+ BlobsManagerCounters.OnDeleteSmallBlob(unifiedBlobId.BlobSize());
SmallBlobsToDelete.insert(unifiedBlobId);
} else if (unifiedBlobId.IsDsBlob()) {
BlobsToDelete.insert(unifiedBlobId.GetLogoBlobId());
+ BlobsManagerCounters.OnDeleteBlobMarker(unifiedBlobId.BlobSize());
} else {
Y_FAIL("Unexpected blob id: %s", unifiedBlobId.ToStringNew().c_str());
}
}
+ BlobsManagerCounters.OnBlobsDelete(BlobsToDelete);
// Build the list of steps that cannot be garbage collected before Keep flag is set on the blobs
THashSet<TGenStep> genStepsWithBlobsToKeep;
@@ -183,11 +191,13 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
LOG_S_WARN("BlobManager at tablet " << TabletInfo->TabletID
<< " Load not keeped blob " << unifiedBlobId.ToStringNew() << " collected by GenStep: "
<< std::get<0>(LastCollectedGenStep) << ":" << std::get<1>(LastCollectedGenStep));
+ BlobsManagerCounters.OnBrokenKeep(unifiedBlobId.BlobSize());
KeepsToErase.emplace_back(unifiedBlobId);
continue;
}
BlobsToKeep.insert(blobId);
+ BlobsManagerCounters.OnKeepMarker(blobId.BlobSize());
// Keep + DontKeep (probably in different gen:steps)
// GC could go through it to a greater LastCollectedGenStep
@@ -197,6 +207,7 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) {
genStepsWithBlobsToKeep.insert(genStep);
}
+ BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
AllocatedGenSteps.clear();
for (const auto& gs : genStepsWithBlobsToKeep) {
@@ -227,7 +238,7 @@ bool TBlobManager::NeedStorageCG() const {
return false;
}
- if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{CurrentGen, CurrentStep}) {
+ if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{ CurrentGen, CurrentStep }) {
return false;
}
@@ -276,7 +287,8 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
}
PreviousGCTime = AppData()->TimeProvider->Now();
-
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep));
+ BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep));
const ui32 channelIdx = BLOB_CHANNEL;
Y_VERIFY(PerGroupGCListsInFlight.empty());
@@ -305,9 +317,8 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation());
PerGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
}
- if (BlobsToKeep.begin() != keepBlobIt) {
- BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
- }
+ BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
+ BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
// Add all blobs to delete
auto blobIt = BlobsToDelete.begin();
@@ -319,9 +330,7 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation());
TGCLists& gl = PerGroupGCListsInFlight[blobGroup];
bool skipDontKeep = false;
- if (gl.KeepList.contains(*blobIt)) {
- // Remove the blob from keep list if its also in the delete list
- gl.KeepList.erase(*blobIt);
+ if (gl.KeepList.erase(*blobIt)) {
// Skipped blobs still need to be deleted from BlobsToKeep table
KeepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt));
@@ -337,12 +346,14 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
}
}
if (!skipDontKeep) {
+ BlobsManagerCounters.OnCollectDropExplicit(blobIt->BlobSize());
gl.DontKeepList.insert(*blobIt);
+ } else {
+ BlobsManagerCounters.OnCollectDropImplicit(blobIt->BlobSize());
}
}
- if (BlobsToDelete.begin() != blobIt) {
- BlobsToDelete.erase(BlobsToDelete.begin(), blobIt);
- }
+ BlobsToDelete.erase(BlobsToDelete.begin(), blobIt);
+ BlobsManagerCounters.OnBlobsDelete(BlobsToDelete);
}
CollectGenStepInFlight = newCollectGenStep;
@@ -352,6 +363,9 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager:
{
for (const auto& gl : PerGroupGCListsInFlight) {
ui32 group = gl.first;
+ for (auto&& i : gl.second.KeepList) {
+ BlobsManagerCounters.OnCollectKeep(i.BlobSize());
+ }
requests[group] = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(
TabletInfo->TabletID, CurrentGen, PerGenerationCounter,
channelIdx, true,
@@ -449,7 +463,7 @@ TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) {
++CurrentStep;
TAllocatedGenStepConstPtr genStepRef = new TAllocatedGenStep({CurrentGen, CurrentStep});
AllocatedGenSteps.push_back(genStepRef);
- auto batchInfo = std::make_unique<TBlobBatch::TBatchInfo>(TabletInfo, genStepRef, channel);
+ auto batchInfo = std::make_unique<TBlobBatch::TBatchInfo>(TabletInfo, genStepRef, channel, BlobsManagerCounters);
return TBlobBatch(std::move(batchInfo));
}
@@ -475,9 +489,11 @@ void TBlobManager::SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
"Trying to keep blob %s that could be already collected by edge barrier (%" PRIu32 ":%" PRIu32 ")",
blobId.ToStringNew().c_str(), std::get<0>(edgeGenStep), std::get<1>(edgeGenStep));
+ BlobsManagerCounters.OnKeepMarker(logoblobId.BlobSize());
BlobsToKeep.insert(std::move(logoblobId));
db.AddBlobToKeep(blobId);
}
+ BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
// Save all small blobs
for (ui32 i = 0; i < blobBatch.BatchInfo->SmallBlobs.size(); ++i) {
@@ -497,6 +513,7 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db)
++CountersUpdate.BlobsDeleted;
if (blobId.IsSmallBlob()) {
+ BlobsManagerCounters.OnDeleteSmallBlob(blobId.BlobSize());
if (BlobsUseCount.contains(blobId) == 0) {
DeleteSmallBlob(blobId, db);
} else {
@@ -515,9 +532,13 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db)
if (BlobsUseCount.contains(blobId) == 0) {
LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delete Blob " << blobId);
TLogoBlobID logoBlobId = blobId.GetLogoBlobId();
- BlobsToDelete.insert(logoBlobId);
+ if (BlobsToDelete.emplace(logoBlobId).second) {
+ BlobsManagerCounters.OnDeleteBlobMarker(blobId.BlobSize());
+ BlobsManagerCounters.OnBlobsDelete(BlobsToDelete);
+ }
NBlobCache::ForgetBlob(blobId);
} else {
+ BlobsManagerCounters.OnDeleteBlobDelayedMarker(blobId.BlobSize());
LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delay Delete Blob " << blobId);
BlobsToDeleteDelayed.insert(blobId.GetLogoBlobId());
}
@@ -712,12 +733,15 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) {
LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delayed Small Blob " << blobId
<< " is no longer in use" );
SmallBlobsToDelete.insert(blobId);
+ BlobsManagerCounters.OnDeleteSmallBlob(blobId.BlobSize());
}
} else {
TLogoBlobID logoBlobId = blobId.GetLogoBlobId();
if (BlobsToDeleteDelayed.erase(logoBlobId)) {
LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delete Delayed Blob " << blobId);
BlobsToDelete.insert(logoBlobId);
+ BlobsManagerCounters.OnBlobsDelete(BlobsToDelete);
+ BlobsManagerCounters.OnDeleteBlobMarker(logoBlobId.BlobSize());
NBlobCache::ForgetBlob(blobId);
}
}
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index f9922a928d..c780077f37 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -1,6 +1,7 @@
#pragma once
#include "blob.h"
+#include "counters/blobs_manager.h"
#include <ydb/core/tx/columnshard/inflight_request_tracker.h>
#include <ydb/core/tablet_flat/flat_executor.h>
@@ -192,6 +193,8 @@ private:
TGenStep CollectGenStepInFlight = {0, 0};
bool FirstGC = true;
+ const TBlobsManagerCounters BlobsManagerCounters = TBlobsManagerCounters("BlobsManager");
+
// Stores counter updates since last call to GetCountersUpdate()
// Then the counters are reset and start accumulating new delta
TBlobManagerCounters CountersUpdate;
@@ -205,6 +208,10 @@ private:
public:
TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen);
+ const TBlobsManagerCounters& GetCounters() const {
+ return BlobsManagerCounters;
+ }
+
void RegisterControls(NKikimr::TControlBoard& icb);
// Loads the state at startup
diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp
index 558934e8f9..b86c4d92fc 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.cpp
+++ b/ydb/core/tx/columnshard/counters/common/agent.cpp
@@ -52,4 +52,10 @@ std::shared_ptr<NKikimr::NColumnShard::TValueAggregationClient> TValueAggregatio
return std::make_shared<TValueAggregationClient>(selfPtr);
}
+i64* TValueAggregationAgent::RegisterValue(const i64 zeroValue /*= 0*/) {
+ TGuard<TMutex> g(Mutex);
+ Values.emplace_back(zeroValue);
+ return &Values.back();
+}
+
}
diff --git a/ydb/core/tx/columnshard/counters/common/agent.h b/ydb/core/tx/columnshard/counters/common/agent.h
index 306cc4a421..9aa2b908eb 100644
--- a/ydb/core/tx/columnshard/counters/common/agent.h
+++ b/ydb/core/tx/columnshard/counters/common/agent.h
@@ -22,8 +22,6 @@ public:
class TValueAggregationAgent: TNonCopyable {
private:
- friend class TRegularSignalBuilderActor;
- friend class TValueAggregationClient;
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalSum;
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMin;
::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMax;
@@ -34,15 +32,10 @@ private:
bool CalcAggregations(i64& minValue, i64& maxValue) const;
std::optional<TSignalAggregations> GetAggregations() const;
- i64* RegisterValue(const i64 zeroValue = 0) {
- TGuard<TMutex> g(Mutex);
- Values.emplace_back(zeroValue);
- return &Values.back();
- }
-
public:
TValueAggregationAgent(const TString& signalName, const TCommonCountersOwner& signalsOwner);
+ i64* RegisterValue(const i64 zeroValue = 0);
void ResendStatus() const;
std::shared_ptr<TValueAggregationClient> GetClient(std::shared_ptr<TValueAggregationAgent> selfPtr);
diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h
index 172e94860b..22dd7d11ca 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.h
+++ b/ydb/core/tx/columnshard/counters/common/owner.h
@@ -10,17 +10,17 @@ namespace NKikimr::NColumnShard {
class TCommonCountersOwner {
private:
- friend class TValueAggregationAgent;
::NMonitoring::TDynamicCounterPtr SubGroup;
const TString ModuleId;
TString NormalizeSignalName(const TString& name) const;
protected:
- NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const;
- NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const;
- NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const;
std::shared_ptr<TValueAggregationAgent> GetValueAutoAggregations(const TString& name) const;
std::shared_ptr<TValueAggregationClient> GetValueAutoAggregationsClient(const TString& name) const;
public:
+ NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const;
+ NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const;
+ NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const;
+
TCommonCountersOwner(const TString& module);
};
diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp
index 705c0a26ba..194bdf0bb5 100644
--- a/ydb/core/tx/columnshard/counters/indexation.cpp
+++ b/ydb/core/tx/columnshard/counters/indexation.cpp
@@ -30,7 +30,6 @@ TIndexationCounters::TIndexationCounters(const TString& module)
CompactionDuration = TBase::GetHistogram("CompactionDuration", NMonitoring::ExponentialHistogram(18, 2, 20));
HistogramCompactionInputBytes = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024));
CompactionInputBytes = TBase::GetDeriviative("CompactionInput/Bytes");
- CompactionInputSize = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024));
CompactionExceptions = TBase::GetDeriviative("Exceptions/Count");
CompactionFails = TBase::GetDeriviative("CompactionFails/Count");
diff --git a/ydb/core/tx/columnshard/counters/indexation.h b/ydb/core/tx/columnshard/counters/indexation.h
index 6a1c1e8842..ac449a93b2 100644
--- a/ydb/core/tx/columnshard/counters/indexation.h
+++ b/ydb/core/tx/columnshard/counters/indexation.h
@@ -37,17 +37,16 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr TooSmallBlobFinish;
NMonitoring::TDynamicCounters::TCounterPtr TooSmallBlobStart;
- NMonitoring::THistogramPtr CompactionInputSize;
NMonitoring::THistogramPtr CompactionDuration;
NMonitoring::TDynamicCounters::TCounterPtr CompactionExceptions;
NMonitoring::TDynamicCounters::TCounterPtr CompactionFails;
TIndexationCounters(const TString& module);
-// void CompactionInputSize(const ui64 size) const {
-// HistogramCompactionInputBytes->Collect(size);
-// CompactionInputBytes->Add(size);
-// }
+ void CompactionInputSize(const ui64 size) const {
+ HistogramCompactionInputBytes->Collect(size);
+ CompactionInputBytes->Add(size);
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index f517a12387..2ccb7067a3 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
@@ -37,7 +37,7 @@ public:
for (auto&& i : indexChanges->Blobs) {
readBytes += i.first.Size;
}
- Counters.CompactionInputSize->Collect(readBytes);
+ Counters.CompactionInputSize(readBytes);
}
const TInstant start = TInstant::Now();
TConclusion<std::vector<TString>> result = DoApply(indexChanges);
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
index 63118535b5..56f8c4bc66 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h
@@ -7,12 +7,13 @@ namespace NKikimr::NOlap {
class TInsertionSummary {
private:
+ friend class TPathInfo;
+
const NColumnShard::TInsertTableCounters Counters;
YDB_READONLY(i64, CommittedSize, 0);
YDB_READONLY(i64, InsertedSize, 0);
std::map<ui64, std::set<const TPathInfo*>> Priorities;
THashMap<ui64, TPathInfo> PathInfo;
- friend class TPathInfo;
void RemovePriority(const TPathInfo& pathInfo) noexcept;
void AddPriority(const TPathInfo& pathInfo) noexcept;