aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-01-17 12:31:38 +0300
committersnaury <snaury@ydb.tech>2023-01-17 12:31:38 +0300
commit851f4bde27aba22a2461a1c23a8960d1cee78069 (patch)
treefbf37122c731afe076ccc927f1d107662978aa4c
parent0306d61aa24ae38e3ae8c16dabb3c71b258d7833 (diff)
downloadydb-851f4bde27aba22a2461a1c23a8960d1cee78069.tar.gz
Process boot and commit io stats without sending extra messages
-rw-r--r--ydb/core/base/tablet.h9
-rw-r--r--ydb/core/tablet/tablet_sys.cpp4
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp139
-rw-r--r--ydb/core/tablet_flat/flat_executor.h11
4 files changed, 109 insertions, 54 deletions
diff --git a/ydb/core/base/tablet.h b/ydb/core/base/tablet.h
index 945d636088..e93a3c4454 100644
--- a/ydb/core/base/tablet.h
+++ b/ydb/core/base/tablet.h
@@ -656,6 +656,9 @@ struct TEvTablet {
TResourceProfilesPtr ResourceProfiles;
TSharedQuotaPtr TxCacheQuota;
+ NMetrics::TTabletThroughputRawValue GroupReadBytes;
+ NMetrics::TTabletIopsRawValue GroupReadOps;
+
TEvFBoot(ui64 tabletID, ui32 followerID, ui32 generation, TActorId launcher, const TEvFollowerUpdate &upd,
TIntrusivePtr<TTabletStorageInfo> info, TResourceProfilesPtr profiles = nullptr,
TSharedQuotaPtr txCacheQuota = nullptr)
@@ -671,7 +674,9 @@ struct TEvTablet {
TEvFBoot(ui64 tabletID, ui32 followerID, ui32 generation, TActorId launcher, TDependencyGraph *dependencyGraph,
TIntrusivePtr<TTabletStorageInfo> info, TResourceProfilesPtr profiles = nullptr,
- TSharedQuotaPtr txCacheQuota = nullptr)
+ TSharedQuotaPtr txCacheQuota = nullptr,
+ NMetrics::TTabletThroughputRawValue&& read = NMetrics::TTabletThroughputRawValue(),
+ NMetrics::TTabletIopsRawValue&& readOps = NMetrics::TTabletIopsRawValue())
: TabletID(tabletID)
, FollowerID(followerID)
, Generation(generation)
@@ -680,6 +685,8 @@ struct TEvTablet {
, TabletStorageInfo(info)
, ResourceProfiles(profiles)
, TxCacheQuota(txCacheQuota)
+ , GroupReadBytes(std::move(read))
+ , GroupReadOps(std::move(readOps))
{}
};
diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp
index ad2b25b372..3093463924 100644
--- a/ydb/core/tablet/tablet_sys.cpp
+++ b/ydb/core/tablet/tablet_sys.cpp
@@ -367,7 +367,9 @@ void TTablet::HandleByFollower(TEvTabletBase::TEvRebuildGraphResult::TPtr &ev) {
Send(UserTablet,
new TEvTablet::TEvFBoot(TabletID(), FollowerId, 0,
Launcher, msg->DependencyGraph.Get(), Info,
- ResourceProfiles, TxCacheQuota));
+ ResourceProfiles, TxCacheQuota,
+ std::move(msg->GroupReadBytes),
+ std::move(msg->GroupReadOps)));
Send(Launcher, new TEvTablet::TEvRestored(TabletID(), StateStorageInfo.KnownGeneration, UserTablet, true));
BLOG_TRACE("SBoot with rebuilt graph", "TSYS05");
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index d88c2c9b9c..5923c7c264 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -313,6 +313,8 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) {
auto loadedState = BootLogic->ExtractState();
BootLogic.Destroy();
+ Y_VERIFY(Counters, "Expected to have Counters initialized during Boot processing");
+
Y_VERIFY(!GcLogic);
Y_VERIFY(!LogicRedo);
Y_VERIFY(!LogicAlter);
@@ -321,11 +323,6 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) {
BorrowLogic = loadedState->Loans;
Y_VERIFY(!CompactionLogic);
- if (!Counters) {
- Counters = MakeHolder<TExecutorCounters>();
- CountersBaseline = MakeHolder<TExecutorCounters>();
- Counters->RememberCurrentStateAsBaseline(*CountersBaseline);
- }
CounterCacheFresh = new NMonitoring::TCounterForPtr;
CounterCacheStaging = new NMonitoring::TCounterForPtr;
@@ -361,7 +358,7 @@ void TExecutor::Active(const TActorContext &ctx) {
auto loadedState = BootLogic->ExtractState();
BootLogic.Destroy();
- Counters = MakeHolder<TExecutorCounters>();
+ Y_VERIFY(Counters, "Expected to have Counters initialized during Boot processing");
CommitManager = loadedState->CommitManager;
Database = loadedState->Database;
@@ -377,8 +374,6 @@ void TExecutor::Active(const TActorContext &ctx) {
CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(Logger.Get(), Broker.Get(), this, loadedState->Comp,
Sprintf("tablet-%" PRIu64, Owner->TabletID())));
- CountersBaseline = MakeHolder<TExecutorCounters>();
- Counters->RememberCurrentStateAsBaseline(*CountersBaseline);
LogicRedo->InstallCounters(Counters.Get(), nullptr);
CounterCacheFresh = new NMonitoring::TCounterForPtr;
@@ -631,6 +626,12 @@ void TExecutor::Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) {
Owner->Info()->TenantPathId, Stats->IsFollower, SelfId());
}
+ if (!Counters) {
+ Counters = MakeHolder<TExecutorCounters>();
+ CountersBaseline = MakeHolder<TExecutorCounters>();
+ Counters->RememberCurrentStateAsBaseline(*CountersBaseline);
+ }
+
RegisterTabletFlatProbes();
Become(&TThis::StateBoot);
@@ -649,20 +650,11 @@ void TExecutor::Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) {
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly));
- ui64 totalBytes = 0;
- for (auto& kv : msg->GroupReadBytes) {
- totalBytes += kv.second;
- }
-
- ui64 totalOps = 0;
- for (auto& kv : msg->GroupReadOps) {
- totalOps += kv.second;
- }
-
- Send(SelfId(), new NBlockIO::TEvStat(NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
- totalBytes, totalOps,
+ ProcessIoStats(
+ NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
std::move(msg->GroupReadBytes),
- std::move(msg->GroupReadOps)));
+ std::move(msg->GroupReadOps),
+ ctx);
const auto res = BootLogic->ReceiveBoot(ev, std::move(executorCaches));
return TranscriptBootOpResult(res, ctx);
@@ -673,6 +665,12 @@ void TExecutor::FollowerBoot(TEvTablet::TEvFBoot::TPtr &ev, const TActorContext
|| CurrentStateFunc() == &TThis::StateFollowerBoot
|| CurrentStateFunc() == &TThis::StateFollower);
+ if (!Counters) {
+ Counters = MakeHolder<TExecutorCounters>();
+ CountersBaseline = MakeHolder<TExecutorCounters>();
+ Counters->RememberCurrentStateAsBaseline(*CountersBaseline);
+ }
+
RegisterTabletFlatProbes();
Become(&TThis::StateFollowerBoot);
@@ -691,6 +689,13 @@ void TExecutor::FollowerBoot(TEvTablet::TEvFBoot::TPtr &ev, const TActorContext
auto executorCaches = CleanupState();
BootLogic.Reset(new TExecutorBootLogic(this, SelfId(), Owner->Info(), maxBootBytesInFly));
+
+ ProcessIoStats(
+ NBlockIO::EDir::Read, NBlockIO::EPriority::Fast,
+ std::move(msg->GroupReadBytes),
+ std::move(msg->GroupReadOps),
+ ctx);
+
const auto res = BootLogic->ReceiveFollowerBoot(ev, std::move(executorCaches));
return TranscriptFollowerBootOpResult(res, ctx);
}
@@ -2884,20 +2889,11 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
CheckYellow(std::move(msg->YellowMoveChannels), std::move(msg->YellowStopChannels));
- ui64 totalBytes = 0;
- for (auto& kv : msg->GroupWrittenBytes) {
- totalBytes += kv.second;
- }
-
- ui64 totalOps = 0;
- for (auto& kv : msg->GroupWrittenOps) {
- totalOps += kv.second;
- }
-
- Send(SelfId(), new NBlockIO::TEvStat(NBlockIO::EDir::Write, NBlockIO::EPriority::Fast,
- totalBytes, totalOps,
+ ProcessIoStats(
+ NBlockIO::EDir::Write, NBlockIO::EPriority::Fast,
std::move(msg->GroupWrittenBytes),
- std::move(msg->GroupWrittenOps)));
+ std::move(msg->GroupWrittenOps),
+ ctx);
ActiveTransaction = false;
PlanTransactionActivation();
@@ -3006,48 +3002,87 @@ void TExecutor::StartScan(ui64 task, TResource *cookie) noexcept
}
}
-void TExecutor::Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx) {
- auto *msg = ev->Get();
-
+void TExecutor::ProcessIoStats(
+ NBlockIO::EDir dir, NBlockIO::EPriority priority,
+ ui64 bytes, ui64 ops,
+ NBlockIO::TEvStat::TByCnGr&& groupBytes,
+ NBlockIO::TEvStat::TByCnGr&& groupOps,
+ const TActorContext& ctx)
+{
if (auto *metrics = ResourceMetrics.Get()) {
- auto &bandBytes = msg->Dir == NBlockIO::EDir::Read ? metrics->ReadThroughput : metrics->WriteThroughput;
+ auto &bandBytes = dir == NBlockIO::EDir::Read ? metrics->ReadThroughput : metrics->WriteThroughput;
- for (auto &it: msg->GroupBytes)
+ for (auto &it: groupBytes)
bandBytes[it.first].Increment(it.second, Time->Now());
- auto &bandOps = msg->Dir == NBlockIO::EDir::Read ? metrics->ReadIops : metrics->WriteIops;
+ auto &bandOps = dir == NBlockIO::EDir::Read ? metrics->ReadIops : metrics->WriteIops;
- for (auto &it: msg->GroupOps)
+ for (auto &it: groupOps)
bandOps[it.first].Increment(it.second, Time->Now());
metrics->TryUpdate(ctx);
}
- if (msg->Priority == NBlockIO::EPriority::Bulk) {
- switch (msg->Dir) {
+ if (priority == NBlockIO::EPriority::Bulk) {
+ switch (dir) {
case NBlockIO::EDir::Read:
- Counters->Cumulative()[TExecutorCounters::COMP_BYTES_READ].Increment(msg->Bytes);
- Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_READ].Increment(msg->Ops);
+ Counters->Cumulative()[TExecutorCounters::COMP_BYTES_READ].Increment(bytes);
+ Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_READ].Increment(ops);
break;
case NBlockIO::EDir::Write:
- Counters->Cumulative()[TExecutorCounters::COMP_BYTES_WRITTEN].Increment(msg->Bytes);
- Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_WRITTEN].Increment(msg->Ops);
+ Counters->Cumulative()[TExecutorCounters::COMP_BYTES_WRITTEN].Increment(bytes);
+ Counters->Cumulative()[TExecutorCounters::COMP_BLOBS_WRITTEN].Increment(ops);
break;
}
} else {
- switch (msg->Dir) {
+ switch (dir) {
case NBlockIO::EDir::Read:
- Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_READ].Increment(msg->Bytes);
- Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_READ].Increment(msg->Ops);
+ Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_READ].Increment(bytes);
+ Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_READ].Increment(ops);
break;
case NBlockIO::EDir::Write:
- Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_WRITTEN].Increment(msg->Bytes);
- Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_WRITTEN].Increment(msg->Ops);
+ Counters->Cumulative()[TExecutorCounters::TABLET_BYTES_WRITTEN].Increment(bytes);
+ Counters->Cumulative()[TExecutorCounters::TABLET_BLOBS_WRITTEN].Increment(ops);
break;
}
}
}
+void TExecutor::ProcessIoStats(
+ NBlockIO::EDir dir, NBlockIO::EPriority priority,
+ NBlockIO::TEvStat::TByCnGr&& groupBytes,
+ NBlockIO::TEvStat::TByCnGr&& groupOps,
+ const TActorContext& ctx)
+{
+ ui64 totalBytes = 0;
+ for (auto& kv : groupBytes) {
+ totalBytes += kv.second;
+ }
+
+ ui64 totalOps = 0;
+ for (auto& kv : groupOps) {
+ totalOps += kv.second;
+ }
+
+ ProcessIoStats(
+ dir, priority,
+ totalBytes, totalOps,
+ std::move(groupBytes),
+ std::move(groupOps),
+ ctx);
+}
+
+void TExecutor::Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx) {
+ auto *msg = ev->Get();
+
+ ProcessIoStats(
+ msg->Dir, msg->Priority,
+ msg->Bytes, msg->Ops,
+ std::move(msg->GroupBytes),
+ std::move(msg->GroupOps),
+ ctx);
+}
+
void TExecutor::UtilizeSubset(const NTable::TSubset &subset,
const NTable::NFwd::TSeen &seen,
THashSet<TLogoBlobID> reusedBundles,
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index a2426e6c4b..0dfe303ce4 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -555,6 +555,17 @@ class TExecutor
void Handle(NResourceBroker::TEvResourceBroker::TEvResourceAllocated::TPtr&);
void Handle(NOps::TEvScanStat::TPtr &ev, const TActorContext &ctx);
void Handle(NOps::TEvResult::TPtr &ev);
+ void ProcessIoStats(
+ NBlockIO::EDir dir, NBlockIO::EPriority priority,
+ ui64 bytes, ui64 ops,
+ NBlockIO::TEvStat::TByCnGr&& groupBytes,
+ NBlockIO::TEvStat::TByCnGr&& groupOps,
+ const TActorContext& ctx);
+ void ProcessIoStats(
+ NBlockIO::EDir dir, NBlockIO::EPriority priority,
+ NBlockIO::TEvStat::TByCnGr&& groupBytes,
+ NBlockIO::TEvStat::TByCnGr&& groupOps,
+ const TActorContext& ctx);
void Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx);
void Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled);
void Handle(TEvBlobStorage::TEvGetResult::TPtr&, const TActorContext&);