diff options
author | snaury <snaury@ydb.tech> | 2023-01-17 12:31:38 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-01-17 12:31:38 +0300 |
commit | 851f4bde27aba22a2461a1c23a8960d1cee78069 (patch) | |
tree | fbf37122c731afe076ccc927f1d107662978aa4c | |
parent | 0306d61aa24ae38e3ae8c16dabb3c71b258d7833 (diff) | |
download | ydb-851f4bde27aba22a2461a1c23a8960d1cee78069.tar.gz |
Process boot and commit io stats without sending extra messages
-rw-r--r-- | ydb/core/base/tablet.h | 9 | ||||
-rw-r--r-- | ydb/core/tablet/tablet_sys.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 139 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.h | 11 |
4 files changed, 109 insertions, 54 deletions
diff --git a/ydb/core/base/tablet.h b/ydb/core/base/tablet.h index 945d636088..e93a3c4454 100644 --- a/ydb/core/base/tablet.h +++ b/ydb/core/base/tablet.h @@ -656,6 +656,9 @@ struct TEvTablet { TResourceProfilesPtr ResourceProfiles; TSharedQuotaPtr TxCacheQuota; + NMetrics::TTabletThroughputRawValue GroupReadBytes; + NMetrics::TTabletIopsRawValue GroupReadOps; + TEvFBoot(ui64 tabletID, ui32 followerID, ui32 generation, TActorId launcher, const TEvFollowerUpdate &upd, TIntrusivePtr<TTabletStorageInfo> info, TResourceProfilesPtr profiles = nullptr, TSharedQuotaPtr txCacheQuota = nullptr) @@ -671,7 +674,9 @@ struct TEvTablet { TEvFBoot(ui64 tabletID, ui32 followerID, ui32 generation, TActorId launcher, TDependencyGraph *dependencyGraph, TIntrusivePtr<TTabletStorageInfo> info, TResourceProfilesPtr profiles = nullptr, - TSharedQuotaPtr txCacheQuota = nullptr) + TSharedQuotaPtr txCacheQuota = nullptr, + NMetrics::TTabletThroughputRawValue&& read = NMetrics::TTabletThroughputRawValue(), + NMetrics::TTabletIopsRawValue&& readOps = NMetrics::TTabletIopsRawValue()) : TabletID(tabletID) , FollowerID(followerID) , Generation(generation) @@ -680,6 +685,8 @@ struct TEvTablet { , TabletStorageInfo(info) , ResourceProfiles(profiles) , TxCacheQuota(txCacheQuota) + , GroupReadBytes(std::move(read)) + , GroupReadOps(std::move(readOps)) {} }; diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp index ad2b25b372..3093463924 100644 --- a/ydb/core/tablet/tablet_sys.cpp +++ b/ydb/core/tablet/tablet_sys.cpp @@ -367,7 +367,9 @@ void TTablet::HandleByFollower(TEvTabletBase::TEvRebuildGraphResult::TPtr &ev) { Send(UserTablet, new TEvTablet::TEvFBoot(TabletID(), FollowerId, 0, Launcher, msg->DependencyGraph.Get(), Info, - ResourceProfiles, TxCacheQuota)); + ResourceProfiles, TxCacheQuota, + std::move(msg->GroupReadBytes), + std::move(msg->GroupReadOps))); Send(Launcher, new TEvTablet::TEvRestored(TabletID(), StateStorageInfo.KnownGeneration, UserTablet, true)); BLOG_TRACE("SBoot with rebuilt graph", "TSYS05"); diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index d88c2c9b9c..5923c7c264 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -313,6 +313,8 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) { auto loadedState = BootLogic->ExtractState(); BootLogic.Destroy(); + Y_VERIFY(Counters, "Expected to have Counters initialized during Boot processing"); + Y_VERIFY(!GcLogic); Y_VERIFY(!LogicRedo); Y_VERIFY(!LogicAlter); @@ -321,11 +323,6 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) { BorrowLogic = loadedState->Loans; Y_VERIFY(!CompactionLogic); - if (!Counters) { - Counters = MakeHolder<TExecutorCounters>(); - CountersBaseline = MakeHolder<TExecutorCounters>(); - Counters->RememberCurrentStateAsBaseline(*CountersBaseline); - } CounterCacheFresh = new NMonitoring::TCounterForPtr; CounterCacheStaging = new NMonitoring::TCounterForPtr; @@ -361,7 +358,7 @@ void TExecutor::Active(const TActorContext &ctx) { auto loadedState = BootLogic->ExtractState(); BootLogic.Destroy(); - Counters = MakeHolder<TExecutorCounters>(); + Y_VERIFY(Counters, "Expected to have Counters initialized during Boot processing"); CommitManager = loadedState->CommitManager; Database = loadedState->Database; @@ -377,8 +374,6 @@ void TExecutor::Active(const TActorContext &ctx) { CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(Logger.Get(), Broker.Get(), this, loadedState->Comp, Sprintf("tablet-%" PRIu64, Owner->TabletID()))); - CountersBaseline = MakeHolder<TExecutorCounters>(); - Counters->RememberCurrentStateAsBaseline(*CountersBaseline); LogicRedo->InstallCounters(Counters.Get(), nullptr); CounterCacheFresh = new NMonitoring::TCounterForPtr; @@ -631,6 +626,12 @@ void TExecutor::Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) { Owner->Info()->TenantPathId, Stats->IsFollower, SelfId()); } + if (!Counters) { + Counters = MakeHolder<TExecutorCounters>(); + CountersBaseline = MakeHolder<TExecutorCounters>(); + Counters->RememberCurrentStateAsBaseline(*CountersBaseline); + } + RegisterTabletFlatProbes(); Become(&TThis::StateBoot); @@ -649,20 +650,11 @@ void TExecutor::Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) { BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly)); - ui64 totalBytes = 0; - for (auto& kv : msg->GroupReadBytes) { - totalBytes += kv.second; - } - - ui64 totalOps = 0; - for (auto& kv : msg->GroupReadOps) { - totalOps += kv.second; - } - - Send(SelfId(), new NBlockIO::TEvStat(NBlockIO::EDir::Read, NBlockIO::EPriority::Fast, - totalBytes, totalOps, + ProcessIoStats( + NBlockIO::EDir::Read, NBlockIO::EPriority::Fast, std::move(msg->GroupReadBytes), - std::move(msg->GroupReadOps))); + std::move(msg->GroupReadOps), + ctx); const auto res = BootLogic->ReceiveBoot(ev, std::move(executorCaches)); return TranscriptBootOpResult(res, ctx); @@ -673,6 +665,12 @@ void TExecutor::FollowerBoot(TEvTablet::TEvFBoot::TPtr &ev, const TActorContext || CurrentStateFunc() == &TThis::StateFollowerBoot || CurrentStateFunc() == &TThis::StateFollower); + if (!Counters) { + Counters = MakeHolder<TExecutorCounters>(); + CountersBaseline = MakeHolder<TExecutorCounters>(); + Counters->RememberCurrentStateAsBaseline(*CountersBaseline); + } + RegisterTabletFlatProbes(); Become(&TThis::StateFollowerBoot); @@ -691,6 +689,13 @@ void TExecutor::FollowerBoot(TEvTablet::TEvFBoot::TPtr &ev, const TActorContext auto executorCaches = CleanupState(); BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly)); + + ProcessIoStats( + NBlockIO::EDir::Read, NBlockIO::EPriority::Fast, + std::move(msg->GroupReadBytes), + std::move(msg->GroupReadOps), + ctx); + const auto res = BootLogic->ReceiveFollowerBoot(ev, std::move(executorCaches)); return TranscriptFollowerBootOpResult(res, ctx); } @@ -2884,20 +2889,11 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext CheckYellow(std::move(msg->YellowMoveChannels), std::move(msg->YellowStopChannels)); - ui64 totalBytes = 0; - for (auto& kv : msg->GroupWrittenBytes) { - totalBytes += kv.second; - } - - ui64 totalOps = 0; - for (auto& kv : msg->GroupWrittenOps) { - totalOps += kv.second; - } - - Send(SelfId(), new NBlockIO::TEvStat(NBlockIO::EDir::Write, NBlockIO::EPriority::Fast, - totalBytes, totalOps, + ProcessIoStats( + NBlockIO::EDir::Write, NBlockIO::EPriority::Fast, std::move(msg->GroupWrittenBytes), - std::move(msg->GroupWrittenOps))); + std::move(msg->GroupWrittenOps), + ctx); ActiveTransaction = false; PlanTransactionActivation(); @@ -3006,48 +3002,87 @@ void TExecutor::StartScan(ui64 task, TResource *cookie) noexcept } } -void TExecutor::Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx) { - auto *msg = ev->Get(); - +void TExecutor::ProcessIoStats( + NBlockIO::EDir dir, NBlockIO::EPriority priority, + ui64 bytes, ui64 ops, + NBlockIO::TEvStat::TByCnGr&& groupBytes, + NBlockIO::TEvStat::TByCnGr&& groupOps, + const TActorContext& ctx) +{ if (auto *metrics = ResourceMetrics.Get()) { - auto &bandBytes = msg->Dir == NBlockIO::EDir::Read ? metrics->ReadThroughput : metrics->WriteThroughput; + auto &bandBytes = dir == NBlockIO::EDir::Read ? metrics->ReadThroughput : metrics->WriteThroughput; - for (auto &it: msg->GroupBytes) + for (auto &it: groupBytes) bandBytes[it.first].Increment(it.second, Time->Now()); - auto &bandOps = msg->Dir == NBlockIO::EDir::Read ? metrics->ReadIops : metrics->WriteIops; + auto &bandOps = dir == NBlockIO::EDir::Read ? metrics->ReadIops : metrics->WriteIops; - for (auto &it: msg->GroupOps) + for (auto &it: groupOps) bandOps[it.first].Increment(it.second, Time->Now()); metrics->TryUpdate(ctx); } - if (msg->Priority == NBlockIO::EPriority::Bulk) { - switch (msg->Dir) { + if (priority == NBlockIO::EPriority::Bulk) { + switch (dir) { case NBlockIO::EDir::Read: - Counters->Cumulative()[TExecutorCounters::COMP_BYTES_READ].Increment(msg->Bytes); - Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_READ].Increment(msg->Ops); + Counters->Cumulative()[TExecutorCounters::COMP_BYTES_READ].Increment(bytes); + Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_READ].Increment(ops); break; case NBlockIO::EDir::Write: - Counters->Cumulative()[TExecutorCounters::COMP_BYTES_WRITTEN].Increment(msg->Bytes); - Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_WRITTEN].Increment(msg->Ops); + Counters->Cumulative()[TExecutorCounters::COMP_BYTES_WRITTEN].Increment(bytes); + Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_WRITTEN].Increment(ops); break; } } else { - switch (msg->Dir) { + switch (dir) { case NBlockIO::EDir::Read: - Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_READ].Increment(msg->Bytes); - Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_READ].Increment(msg->Ops); + Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_READ].Increment(bytes); + Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_READ].Increment(ops); break; case NBlockIO::EDir::Write: - Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_WRITTEN].Increment(msg->Bytes); - Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_WRITTEN].Increment(msg->Ops); + Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_WRITTEN].Increment(bytes); + Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_WRITTEN].Increment(ops); break; } } } +void TExecutor::ProcessIoStats( + NBlockIO::EDir dir, NBlockIO::EPriority priority, + NBlockIO::TEvStat::TByCnGr&& groupBytes, + NBlockIO::TEvStat::TByCnGr&& groupOps, + const TActorContext& ctx) +{ + ui64 totalBytes = 0; + for (auto& kv : groupBytes) { + totalBytes += kv.second; + } + + ui64 totalOps = 0; + for (auto& kv : groupOps) { + totalOps += kv.second; + } + + ProcessIoStats( + dir, priority, + totalBytes, totalOps, + std::move(groupBytes), + std::move(groupOps), + ctx); +} + +void TExecutor::Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx) { + auto *msg = ev->Get(); + + ProcessIoStats( + msg->Dir, msg->Priority, + msg->Bytes, msg->Ops, + std::move(msg->GroupBytes), + std::move(msg->GroupOps), + ctx); +} + void TExecutor::UtilizeSubset(const NTable::TSubset &subset, const NTable::NFwd::TSeen &seen, THashSet<TLogoBlobID> reusedBundles, diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index a2426e6c4b..0dfe303ce4 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -555,6 +555,17 @@ class TExecutor void Handle(NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr&); void Handle(NOps::TEvScanStat::TPtr &ev, const TActorContext &ctx); void Handle(NOps::TEvResult::TPtr &ev); + void ProcessIoStats( + NBlockIO::EDir dir, NBlockIO::EPriority priority, + ui64 bytes, ui64 ops, + NBlockIO::TEvStat::TByCnGr&& groupBytes, + NBlockIO::TEvStat::TByCnGr&& groupOps, + const TActorContext& ctx); + void ProcessIoStats( + NBlockIO::EDir dir, NBlockIO::EPriority priority, + NBlockIO::TEvStat::TByCnGr&& groupBytes, + NBlockIO::TEvStat::TByCnGr&& groupOps, + const TActorContext& ctx); void Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx); void Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled); void Handle(TEvBlobStorage::TEvGetResult::TPtr&, const TActorContext&); |