diff options
author | alexvru <alexvru@ydb.tech> | 2023-02-22 15:57:30 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-02-22 15:57:30 +0300 |
commit | 91daac67540e7734dc2e7c8f198eba48d7ea890e (patch) | |
tree | 8061819a1aa16eab0e0b938af1a9ef3a79be7a4f | |
parent | 3d9a59583ae857ecc45c709288c5d0e9972fe029 (diff) | |
download | ydb-91daac67540e7734dc2e7c8f198eba48d7ea890e.tar.gz |
Report read/write throughputs
-rw-r--r-- | ydb/core/blob_depot/agent/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 13 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/metrics.cpp | 16 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/read.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/events.h | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/group_metrics_exchange.cpp | 51 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 5 |
13 files changed, 102 insertions, 2 deletions
diff --git a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt index c24e4325cf..c18d69d6cf 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt @@ -22,6 +22,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/metrics.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt index ef14494980..37da1726ef 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/metrics.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux.txt b/ydb/core/blob_depot/agent/CMakeLists.linux.txt index ef14494980..37da1726ef 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.linux.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.linux.txt @@ -23,6 +23,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/metrics.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index 3f02eda753..ebf4fc6472 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -37,6 +37,7 @@ namespace NKikimr::NBlobDepot { HandleQueryWatchdog(); HandlePendingEventQueueWatchdog(); + HandlePushMetrics(); } IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) { diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 7b9c9c8b98..c998e1dfb5 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -167,6 +167,7 @@ namespace NKikimr::NBlobDepot { EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE), EvProcessPendingEvent, EvPendingEventQueueWatchdog, + EvPushMetrics, }; }; @@ -212,6 +213,8 @@ namespace NKikimr::NBlobDepot { cFunc(TEvPrivate::EvPendingEventQueueWatchdog, HandlePendingEventQueueWatchdog); cFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog); + + cFunc(TEvPrivate::EvPushMetrics, HandlePushMetrics); ) DeletePendingQueries.Clear(); @@ -481,6 +484,16 @@ namespace NKikimr::NBlobDepot { return EscapeC(key); } } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Metrics + + ui64 BytesRead = 0; + ui64 BytesWritten = 0; + ui64 LastBytesRead = 0; + ui64 LastBytesWritten = 0; + + void HandlePushMetrics(); }; #define BDEV_QUERY(MARKER, TEXT, ...) BDEV(MARKER, TEXT, (VG, Agent.VirtualGroupId), (BDT, Agent.TabletId), \ diff --git a/ydb/core/blob_depot/agent/metrics.cpp b/ydb/core/blob_depot/agent/metrics.cpp new file mode 100644 index 0000000000..30ffff5330 --- /dev/null +++ b/ydb/core/blob_depot/agent/metrics.cpp @@ -0,0 +1,16 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepotAgent::HandlePushMetrics() { + if (IsConnected) { + const ui64 bytesRead = BytesRead - std::exchange(LastBytesRead, BytesRead); + const ui64 bytesWritten = BytesWritten - std::exchange(LastBytesWritten, BytesWritten); + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvPushMetrics(bytesRead, bytesWritten)); + } + + TActivationContext::Schedule(TDuration::MilliSeconds(2500), new IEventHandle(TEvPrivate::EvPushMetrics, 0, SelfId(), + {}, nullptr, 0)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 0c682e105b..f577a74bfb 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -133,6 +133,7 @@ namespace NKikimr::NBlobDepot { TEvBlobStorage::TEvGet::TQuery query; query.Set(item.Id, item.Offset, item.Size); queriesPerGroup[item.GroupId].emplace_back(item.OutputOffset, query); + Agent.BytesRead += item.Size; } for (const auto& [groupId, queries] : queriesPerGroup) { diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 8185b5d568..9efff1c64f 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -124,6 +124,7 @@ namespace NKikimr::NBlobDepot { ev->ExtraBlockChecks.emplace_back(Request.Id.TabletID(), Request.Id.Generation()); BDEV_QUERY(BDEV10, "TEvPut_sendToProxy", (BlobSeqId, BlobSeqId), (GroupId, groupId), (BlobId, id)); Agent.SendToProxy(groupId, std::move(ev), this, nullptr); + Agent.BytesWritten += id.BlobSize(); ++PutsInFlight; }; diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 9a464fc5c1..aa81538680 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -86,6 +86,8 @@ namespace NKikimr::NBlobDepot { fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe); fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe); + hFunc(TEvBlobDepot::TEvPushMetrics, Handle); + cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); @@ -100,6 +102,7 @@ namespace NKikimr::NBlobDepot { cFunc(TEvPrivate::EvCommitCertainKeys, Data->HandleCommitCertainKeys); cFunc(TEvPrivate::EvDoGroupMetricsExchange, DoGroupMetricsExchange); hFunc(TEvBlobStorage::TEvControllerGroupMetricsExchange, Handle); + cFunc(TEvPrivate::EvUpdateThroughputs, UpdateThroughputs); default: if (!HandleDefaultEvents(ev, ctx)) { diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index c13e342ab2..fc0ab0ea7f 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -28,6 +28,7 @@ namespace NKikimr::NBlobDepot { EvDoGroupMetricsExchange, EvKickSpaceMonitor, EvProcessRegisterAgentQ, + EvUpdateThroughputs, }; }; @@ -177,6 +178,7 @@ namespace NKikimr::NBlobDepot { ProcessRegisterAgentQ(); KickSpaceMonitor(); StartDataLoad(); + UpdateThroughputs(); } void StartDataLoad(); @@ -308,8 +310,14 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Group metrics exchange + ui64 BytesRead = 0; + ui64 BytesWritten = 0; + std::deque<std::tuple<TMonotonic, ui64, ui64>> MetricsQ; + void DoGroupMetricsExchange(); void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev); + void Handle(TEvBlobDepot::TEvPushMetrics::TPtr ev); + void UpdateThroughputs(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Validation diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 501d78ae95..5f19c5dac0 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -27,6 +27,7 @@ namespace NKikimr { EvResolve, EvResolveResult, EvDiscardSpoiledBlobSeq, + EvPushMetrics, }; #define BLOBDEPOT_PARAM_ARG(ARG) std::optional<std::decay_t<decltype(Record.Get##ARG())>> param##ARG, @@ -71,6 +72,7 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve); BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason); BLOBDEPOT_EVENT_PB_NO_ARGS(EvDiscardSpoiledBlobSeq); + BLOBDEPOT_EVENT_PB(EvPushMetrics, BytesRead, BytesWritten); template<typename TEvent> struct TResponseFor {}; diff --git a/ydb/core/blob_depot/group_metrics_exchange.cpp b/ydb/core/blob_depot/group_metrics_exchange.cpp index 2b86d5edcf..f10696815a 100644 --- a/ydb/core/blob_depot/group_metrics_exchange.cpp +++ b/ydb/core/blob_depot/group_metrics_exchange.cpp @@ -86,8 +86,6 @@ namespace NKikimr::NBlobDepot { wb.SetGroupID(Config.GetVirtualGroupId()); wb.SetAllocatedSize(Data->GetTotalStoredDataSize()); wb.SetAvailableSize(params->GetAvailableSize()); - wb.SetReadThroughput(0); - wb.SetWriteThroughput(0); Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), ev.release()); params->SetAllocatedSize(Data->GetTotalStoredDataSize()); @@ -101,4 +99,53 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepot::Handle(TEvBlobDepot::TEvPushMetrics::TPtr ev) { + const auto& record = ev->Get()->Record; + BytesRead += record.GetBytesRead(); + BytesWritten += record.GetBytesWritten(); + MetricsQ.emplace_back(TActivationContext::Monotonic(), BytesRead, BytesWritten); + } + + void TBlobDepot::UpdateThroughputs() { + static constexpr TDuration Window = TDuration::Seconds(3); + + if (Config.HasVirtualGroupId() && !MetricsQ.empty()) { + const TMonotonic now = TActivationContext::Monotonic(); + const TMonotonic left = now - Window; + const auto comp = [](TMonotonic x, const auto& y) { return x < std::get<0>(y); }; + const auto it = std::upper_bound(MetricsQ.begin(), MetricsQ.end(), left, comp); + if (it != MetricsQ.begin()) { // interpolate + MetricsQ.erase(MetricsQ.begin(), std::prev(it)); // remove all obsolete entries + Y_VERIFY(MetricsQ.size() >= 2); + const auto& [xTimestamp, xRead, xWritten] = MetricsQ[0]; + auto& [yTimestamp, yRead, yWritten] = MetricsQ[1]; + Y_VERIFY(xTimestamp <= left && left < yTimestamp); + const ui64 scale = 1'000'000; + const ui64 factor = (left - xTimestamp).MicroSeconds() * scale / (yTimestamp - xTimestamp).MicroSeconds(); + yTimestamp = left; + yRead = xRead + (yRead - xRead) * factor / scale; + yWritten = xWritten + (yWritten - xWritten) * factor / scale; + MetricsQ.pop_front(); + } + + ui64 readThroughput = 0; + ui64 writeThroughput = 0; + const auto& [ts, read, written] = MetricsQ.front(); + if (ts + TDuration::Seconds(1) < now) { + readThroughput = (BytesRead - read) * 1'000'000 / (now - ts).MicroSeconds(); + writeThroughput = (BytesWritten - written) * 1'000'000 / (now - ts).MicroSeconds(); + } + + auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate>(); + auto& wb = ev->Record; + wb.SetGroupID(Config.GetVirtualGroupId()); + wb.SetReadThroughput(readThroughput); + wb.SetWriteThroughput(writeThroughput); + Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), ev.release()); + } + + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateThroughputs, 0, + SelfId(), {}, nullptr, 0)); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 2efe99467e..c7fe27a8ad 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -228,3 +228,8 @@ message TEvResolveResult { optional string ErrorReason = 2; repeated TResolvedKey ResolvedKeys = 3; } + +message TEvPushMetrics { + optional uint64 BytesRead = 1; // since last update + optional uint64 BytesWritten = 2; // since last update +} |