diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-06 21:22:09 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-06 21:22:09 +0300 |
commit | f18d76db543cf768f466baf6e44549a6556daec6 (patch) | |
tree | 5d2c4f51facd39d595c180aa946e434655a9816c | |
parent | d0919e63bf76e5117a2544b75b9ea3ed3dac7b77 (diff) | |
download | ydb-f18d76db543cf768f466baf6e44549a6556daec6.tar.gz |
correct signals
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.cpp | 58 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/blob_manager.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/agent.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/agent.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/common/owner.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/indexation.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/counters/indexation.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_logic_logs.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h | 3 |
9 files changed, 66 insertions, 37 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index b653f1b8e2..f1e7819a06 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -20,6 +20,7 @@ TLogoBlobID ParseLogoBlobId(TString blobId) { struct TBlobBatch::TBatchInfo : TNonCopyable { TIntrusivePtr<TTabletStorageInfo> TabletInfo; TAllocatedGenStepConstPtr GenStepRef; + const TBlobsManagerCounters Counters; const ui32 Gen; const ui32 Step; const ui32 Channel; @@ -30,15 +31,16 @@ struct TBlobBatch::TBatchInfo : TNonCopyable { ui64 TotalSizeBytes; std::vector<TString> SmallBlobs; - TBatchInfo(TIntrusivePtr<TTabletStorageInfo> tabletInfo, TAllocatedGenStepConstPtr genStep, ui32 channel) + TBatchInfo(TIntrusivePtr<TTabletStorageInfo> tabletInfo, TAllocatedGenStepConstPtr genStep, ui32 channel, const TBlobsManagerCounters& counters) : TabletInfo(tabletInfo) , GenStepRef(genStep) + , Counters(counters) , Gen(std::get<0>(GenStepRef->GenStep)) , Step(std::get<1>(GenStepRef->GenStep)) , Channel(channel) , InFlightCount(0) - , TotalSizeBytes(0) - {} + , TotalSizeBytes(0) { + } TUnifiedBlobId NextBlobId(ui32 blobSize) { BlobSizes.push_back(blobSize); @@ -56,8 +58,9 @@ struct TBlobBatch::TBatchInfo : TNonCopyable { TUnifiedBlobId AddSmallBlob(const TString& data) { // NOTE: small blobs are not included into TotalSizeBytes + Counters.OnAddSmallBlob(data.size()); SmallBlobs.push_back(data); - return MakeSmallBlobId(SmallBlobs.size()-1); + return MakeSmallBlobId(SmallBlobs.size() - 1); } TUnifiedBlobId MakeSmallBlobId(ui32 i) const { @@ -102,9 +105,11 @@ TUnifiedBlobId TBlobBatch::SendWriteBlobRequest(const TString& blobData, TInstan void TBlobBatch::OnBlobWriteResult(TEvBlobStorage::TEvPutResult::TPtr& ev) { TLogoBlobID blobId = ev->Get()->Id; + BatchInfo->Counters.OnPutResult(blobId.BlobSize()); Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "The caller must handle unsuccessful status"); Y_VERIFY(BatchInfo); Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str()); + BatchInfo->InFlight[blobId.Cookie()] = false; --BatchInfo->InFlightCount; Y_VERIFY(BatchInfo->InFlightCount >= 0); @@ -164,13 +169,16 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { for (const auto& unifiedBlobId : blobsToDelete) { if (unifiedBlobId.IsSmallBlob()) { + BlobsManagerCounters.OnDeleteSmallBlob(unifiedBlobId.BlobSize()); SmallBlobsToDelete.insert(unifiedBlobId); } else if (unifiedBlobId.IsDsBlob()) { BlobsToDelete.insert(unifiedBlobId.GetLogoBlobId()); + BlobsManagerCounters.OnDeleteBlobMarker(unifiedBlobId.BlobSize()); } else { Y_FAIL("Unexpected blob id: %s", unifiedBlobId.ToStringNew().c_str()); } } + BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); // Build the list of steps that cannot be garbage collected before Keep flag is set on the blobs THashSet<TGenStep> genStepsWithBlobsToKeep; @@ -183,11 +191,13 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { LOG_S_WARN("BlobManager at tablet " << TabletInfo->TabletID << " Load not keeped blob " << unifiedBlobId.ToStringNew() << " collected by GenStep: " << std::get<0>(LastCollectedGenStep) << ":" << std::get<1>(LastCollectedGenStep)); + BlobsManagerCounters.OnBrokenKeep(unifiedBlobId.BlobSize()); KeepsToErase.emplace_back(unifiedBlobId); continue; } BlobsToKeep.insert(blobId); + BlobsManagerCounters.OnKeepMarker(blobId.BlobSize()); // Keep + DontKeep (probably in different gen:steps) // GC could go through it to a greater LastCollectedGenStep @@ -197,6 +207,7 @@ bool TBlobManager::LoadState(IBlobManagerDb& db) { genStepsWithBlobsToKeep.insert(genStep); } + BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); AllocatedGenSteps.clear(); for (const auto& gs : genStepsWithBlobsToKeep) { @@ -227,7 +238,7 @@ bool TBlobManager::NeedStorageCG() const { return false; } - if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{CurrentGen, CurrentStep}) { + if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{ CurrentGen, CurrentStep }) { return false; } @@ -276,7 +287,8 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: } PreviousGCTime = AppData()->TimeProvider->Now(); - + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep)); + BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep)); const ui32 channelIdx = BLOB_CHANNEL; Y_VERIFY(PerGroupGCListsInFlight.empty()); @@ -305,9 +317,8 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation()); PerGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt); } - if (BlobsToKeep.begin() != keepBlobIt) { - BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); - } + BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); + BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); // Add all blobs to delete auto blobIt = BlobsToDelete.begin(); @@ -319,9 +330,7 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation()); TGCLists& gl = PerGroupGCListsInFlight[blobGroup]; bool skipDontKeep = false; - if (gl.KeepList.contains(*blobIt)) { - // Remove the blob from keep list if its also in the delete list - gl.KeepList.erase(*blobIt); + if (gl.KeepList.erase(*blobIt)) { // Skipped blobs still need to be deleted from BlobsToKeep table KeepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt)); @@ -337,12 +346,14 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: } } if (!skipDontKeep) { + BlobsManagerCounters.OnCollectDropExplicit(blobIt->BlobSize()); gl.DontKeepList.insert(*blobIt); + } else { + BlobsManagerCounters.OnCollectDropImplicit(blobIt->BlobSize()); } } - if (BlobsToDelete.begin() != blobIt) { - BlobsToDelete.erase(BlobsToDelete.begin(), blobIt); - } + BlobsToDelete.erase(BlobsToDelete.begin(), blobIt); + BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); } CollectGenStepInFlight = newCollectGenStep; @@ -352,6 +363,9 @@ THashMap<ui32, std::unique_ptr<TEvBlobStorage::TEvCollectGarbage>> TBlobManager: { for (const auto& gl : PerGroupGCListsInFlight) { ui32 group = gl.first; + for (auto&& i : gl.second.KeepList) { + BlobsManagerCounters.OnCollectKeep(i.BlobSize()); + } requests[group] = std::make_unique<TEvBlobStorage::TEvCollectGarbage>( TabletInfo->TabletID, CurrentGen, PerGenerationCounter, channelIdx, true, @@ -449,7 +463,7 @@ TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) { ++CurrentStep; TAllocatedGenStepConstPtr genStepRef = new TAllocatedGenStep({CurrentGen, CurrentStep}); AllocatedGenSteps.push_back(genStepRef); - auto batchInfo = std::make_unique<TBlobBatch::TBatchInfo>(TabletInfo, genStepRef, channel); + auto batchInfo = std::make_unique<TBlobBatch::TBatchInfo>(TabletInfo, genStepRef, channel, BlobsManagerCounters); return TBlobBatch(std::move(batchInfo)); } @@ -475,9 +489,11 @@ void TBlobManager::SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { "Trying to keep blob %s that could be already collected by edge barrier (%" PRIu32 ":%" PRIu32 ")", blobId.ToStringNew().c_str(), std::get<0>(edgeGenStep), std::get<1>(edgeGenStep)); + BlobsManagerCounters.OnKeepMarker(logoblobId.BlobSize()); BlobsToKeep.insert(std::move(logoblobId)); db.AddBlobToKeep(blobId); } + BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); // Save all small blobs for (ui32 i = 0; i < blobBatch.BatchInfo->SmallBlobs.size(); ++i) { @@ -497,6 +513,7 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) ++CountersUpdate.BlobsDeleted; if (blobId.IsSmallBlob()) { + BlobsManagerCounters.OnDeleteSmallBlob(blobId.BlobSize()); if (BlobsUseCount.contains(blobId) == 0) { DeleteSmallBlob(blobId, db); } else { @@ -515,9 +532,13 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) if (BlobsUseCount.contains(blobId) == 0) { LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delete Blob " << blobId); TLogoBlobID logoBlobId = blobId.GetLogoBlobId(); - BlobsToDelete.insert(logoBlobId); + if (BlobsToDelete.emplace(logoBlobId).second) { + BlobsManagerCounters.OnDeleteBlobMarker(blobId.BlobSize()); + BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); + } NBlobCache::ForgetBlob(blobId); } else { + BlobsManagerCounters.OnDeleteBlobDelayedMarker(blobId.BlobSize()); LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delay Delete Blob " << blobId); BlobsToDeleteDelayed.insert(blobId.GetLogoBlobId()); } @@ -712,12 +733,15 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) { LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delayed Small Blob " << blobId << " is no longer in use" ); SmallBlobsToDelete.insert(blobId); + BlobsManagerCounters.OnDeleteSmallBlob(blobId.BlobSize()); } } else { TLogoBlobID logoBlobId = blobId.GetLogoBlobId(); if (BlobsToDeleteDelayed.erase(logoBlobId)) { LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delete Delayed Blob " << blobId); BlobsToDelete.insert(logoBlobId); + BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); + BlobsManagerCounters.OnDeleteBlobMarker(logoBlobId.BlobSize()); NBlobCache::ForgetBlob(blobId); } } diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index f9922a928d..c780077f37 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -1,6 +1,7 @@ #pragma once #include "blob.h" +#include "counters/blobs_manager.h" #include <ydb/core/tx/columnshard/inflight_request_tracker.h> #include <ydb/core/tablet_flat/flat_executor.h> @@ -192,6 +193,8 @@ private: TGenStep CollectGenStepInFlight = {0, 0}; bool FirstGC = true; + const TBlobsManagerCounters BlobsManagerCounters = TBlobsManagerCounters("BlobsManager"); + // Stores counter updates since last call to GetCountersUpdate() // Then the counters are reset and start accumulating new delta TBlobManagerCounters CountersUpdate; @@ -205,6 +208,10 @@ private: public: TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen); + const TBlobsManagerCounters& GetCounters() const { + return BlobsManagerCounters; + } + void RegisterControls(NKikimr::TControlBoard& icb); // Loads the state at startup diff --git a/ydb/core/tx/columnshard/counters/common/agent.cpp b/ydb/core/tx/columnshard/counters/common/agent.cpp index 558934e8f9..b86c4d92fc 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.cpp +++ b/ydb/core/tx/columnshard/counters/common/agent.cpp @@ -52,4 +52,10 @@ std::shared_ptr<NKikimr::NColumnShard::TValueAggregationClient> TValueAggregatio return std::make_shared<TValueAggregationClient>(selfPtr); } +i64* TValueAggregationAgent::RegisterValue(const i64 zeroValue /*= 0*/) { + TGuard<TMutex> g(Mutex); + Values.emplace_back(zeroValue); + return &Values.back(); +} + } diff --git a/ydb/core/tx/columnshard/counters/common/agent.h b/ydb/core/tx/columnshard/counters/common/agent.h index 306cc4a421..9aa2b908eb 100644 --- a/ydb/core/tx/columnshard/counters/common/agent.h +++ b/ydb/core/tx/columnshard/counters/common/agent.h @@ -22,8 +22,6 @@ public: class TValueAggregationAgent: TNonCopyable { private: - friend class TRegularSignalBuilderActor; - friend class TValueAggregationClient; ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalSum; ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMin; ::NMonitoring::TDynamicCounters::TCounterPtr ValueSignalMax; @@ -34,15 +32,10 @@ private: bool CalcAggregations(i64& minValue, i64& maxValue) const; std::optional<TSignalAggregations> GetAggregations() const; - i64* RegisterValue(const i64 zeroValue = 0) { - TGuard<TMutex> g(Mutex); - Values.emplace_back(zeroValue); - return &Values.back(); - } - public: TValueAggregationAgent(const TString& signalName, const TCommonCountersOwner& signalsOwner); + i64* RegisterValue(const i64 zeroValue = 0); void ResendStatus() const; std::shared_ptr<TValueAggregationClient> GetClient(std::shared_ptr<TValueAggregationAgent> selfPtr); diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h index 172e94860b..22dd7d11ca 100644 --- a/ydb/core/tx/columnshard/counters/common/owner.h +++ b/ydb/core/tx/columnshard/counters/common/owner.h @@ -10,17 +10,17 @@ namespace NKikimr::NColumnShard { class TCommonCountersOwner { private: - friend class TValueAggregationAgent; ::NMonitoring::TDynamicCounterPtr SubGroup; const TString ModuleId; TString NormalizeSignalName(const TString& name) const; protected: - NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const; - NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const; - NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const; std::shared_ptr<TValueAggregationAgent> GetValueAutoAggregations(const TString& name) const; std::shared_ptr<TValueAggregationClient> GetValueAutoAggregationsClient(const TString& name) const; public: + NMonitoring::TDynamicCounters::TCounterPtr GetValue(const TString& name) const; + NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const; + NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const; + TCommonCountersOwner(const TString& module); }; diff --git a/ydb/core/tx/columnshard/counters/indexation.cpp b/ydb/core/tx/columnshard/counters/indexation.cpp index 705c0a26ba..194bdf0bb5 100644 --- a/ydb/core/tx/columnshard/counters/indexation.cpp +++ b/ydb/core/tx/columnshard/counters/indexation.cpp @@ -30,7 +30,6 @@ TIndexationCounters::TIndexationCounters(const TString& module) CompactionDuration = TBase::GetHistogram("CompactionDuration", NMonitoring::ExponentialHistogram(18, 2, 20)); HistogramCompactionInputBytes = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024)); CompactionInputBytes = TBase::GetDeriviative("CompactionInput/Bytes"); - CompactionInputSize = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024)); CompactionExceptions = TBase::GetDeriviative("Exceptions/Count"); CompactionFails = TBase::GetDeriviative("CompactionFails/Count"); diff --git a/ydb/core/tx/columnshard/counters/indexation.h b/ydb/core/tx/columnshard/counters/indexation.h index 6a1c1e8842..ac449a93b2 100644 --- a/ydb/core/tx/columnshard/counters/indexation.h +++ b/ydb/core/tx/columnshard/counters/indexation.h @@ -37,17 +37,16 @@ public: NMonitoring::TDynamicCounters::TCounterPtr TooSmallBlobFinish; NMonitoring::TDynamicCounters::TCounterPtr TooSmallBlobStart; - NMonitoring::THistogramPtr CompactionInputSize; NMonitoring::THistogramPtr CompactionDuration; NMonitoring::TDynamicCounters::TCounterPtr CompactionExceptions; NMonitoring::TDynamicCounters::TCounterPtr CompactionFails; TIndexationCounters(const TString& module); -// void CompactionInputSize(const ui64 size) const { -// HistogramCompactionInputBytes->Collect(size); -// CompactionInputBytes->Add(size); -// } + void CompactionInputSize(const ui64 size) const { + HistogramCompactionInputBytes->Collect(size); + CompactionInputBytes->Add(size); + } }; } diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h index f517a12387..2ccb7067a3 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.h +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h @@ -37,7 +37,7 @@ public: for (auto&& i : indexChanges->Blobs) { readBytes += i.first.Size; } - Counters.CompactionInputSize->Collect(readBytes); + Counters.CompactionInputSize(readBytes); } const TInstant start = TInstant::Now(); TConclusion<std::vector<TString>> result = DoApply(indexChanges); diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h index 63118535b5..56f8c4bc66 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h @@ -7,12 +7,13 @@ namespace NKikimr::NOlap { class TInsertionSummary { private: + friend class TPathInfo; + const NColumnShard::TInsertTableCounters Counters; YDB_READONLY(i64, CommittedSize, 0); YDB_READONLY(i64, InsertedSize, 0); std::map<ui64, std::set<const TPathInfo*>> Priorities; THashMap<ui64, TPathInfo> PathInfo; - friend class TPathInfo; void RemovePriority(const TPathInfo& pathInfo) noexcept; void AddPriority(const TPathInfo& pathInfo) noexcept; |