aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-02-22 15:57:30 +0300
committeralexvru <alexvru@ydb.tech>2023-02-22 15:57:30 +0300
commit91daac67540e7734dc2e7c8f198eba48d7ea890e (patch)
tree8061819a1aa16eab0e0b938af1a9ef3a79be7a4f
parent3d9a59583ae857ecc45c709288c5d0e9972fe029 (diff)
downloadydb-91daac67540e7734dc2e7c8f198eba48d7ea890e.tar.gz
Report read/write throughputs
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.linux.txt1
-rw-r--r--ydb/core/blob_depot/agent/agent.cpp1
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h13
-rw-r--r--ydb/core/blob_depot/agent/metrics.cpp16
-rw-r--r--ydb/core/blob_depot/agent/read.cpp1
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp1
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp3
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h8
-rw-r--r--ydb/core/blob_depot/events.h2
-rw-r--r--ydb/core/blob_depot/group_metrics_exchange.cpp51
-rw-r--r--ydb/core/protos/blob_depot.proto5
13 files changed, 102 insertions, 2 deletions
diff --git a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt
index c24e4325cf..c18d69d6cf 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt
@@ -22,6 +22,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/metrics.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt
index ef14494980..37da1726ef 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/metrics.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux.txt b/ydb/core/blob_depot/agent/CMakeLists.linux.txt
index ef14494980..37da1726ef 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.linux.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.linux.txt
@@ -23,6 +23,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/channel_kind.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/metrics.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/proxy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp
index 3f02eda753..ebf4fc6472 100644
--- a/ydb/core/blob_depot/agent/agent.cpp
+++ b/ydb/core/blob_depot/agent/agent.cpp
@@ -37,6 +37,7 @@ namespace NKikimr::NBlobDepot {
HandleQueryWatchdog();
HandlePendingEventQueueWatchdog();
+ HandlePushMetrics();
}
IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) {
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 7b9c9c8b98..c998e1dfb5 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -167,6 +167,7 @@ namespace NKikimr::NBlobDepot {
EvQueryWatchdog = EventSpaceBegin(TEvents::ES_PRIVATE),
EvProcessPendingEvent,
EvPendingEventQueueWatchdog,
+ EvPushMetrics,
};
};
@@ -212,6 +213,8 @@ namespace NKikimr::NBlobDepot {
cFunc(TEvPrivate::EvPendingEventQueueWatchdog, HandlePendingEventQueueWatchdog);
cFunc(TEvPrivate::EvQueryWatchdog, HandleQueryWatchdog);
+
+ cFunc(TEvPrivate::EvPushMetrics, HandlePushMetrics);
)
DeletePendingQueries.Clear();
@@ -481,6 +484,16 @@ namespace NKikimr::NBlobDepot {
return EscapeC(key);
}
}
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Metrics
+
+ ui64 BytesRead = 0;
+ ui64 BytesWritten = 0;
+ ui64 LastBytesRead = 0;
+ ui64 LastBytesWritten = 0;
+
+ void HandlePushMetrics();
};
#define BDEV_QUERY(MARKER, TEXT, ...) BDEV(MARKER, TEXT, (VG, Agent.VirtualGroupId), (BDT, Agent.TabletId), \
diff --git a/ydb/core/blob_depot/agent/metrics.cpp b/ydb/core/blob_depot/agent/metrics.cpp
new file mode 100644
index 0000000000..30ffff5330
--- /dev/null
+++ b/ydb/core/blob_depot/agent/metrics.cpp
@@ -0,0 +1,16 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ void TBlobDepotAgent::HandlePushMetrics() {
+ if (IsConnected) {
+ const ui64 bytesRead = BytesRead - std::exchange(LastBytesRead, BytesRead);
+ const ui64 bytesWritten = BytesWritten - std::exchange(LastBytesWritten, BytesWritten);
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvPushMetrics(bytesRead, bytesWritten));
+ }
+
+ TActivationContext::Schedule(TDuration::MilliSeconds(2500), new IEventHandle(TEvPrivate::EvPushMetrics, 0, SelfId(),
+ {}, nullptr, 0));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index 0c682e105b..f577a74bfb 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -133,6 +133,7 @@ namespace NKikimr::NBlobDepot {
TEvBlobStorage::TEvGet::TQuery query;
query.Set(item.Id, item.Offset, item.Size);
queriesPerGroup[item.GroupId].emplace_back(item.OutputOffset, query);
+ Agent.BytesRead += item.Size;
}
for (const auto& [groupId, queries] : queriesPerGroup) {
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 8185b5d568..9efff1c64f 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -124,6 +124,7 @@ namespace NKikimr::NBlobDepot {
ev->ExtraBlockChecks.emplace_back(Request.Id.TabletID(), Request.Id.Generation());
BDEV_QUERY(BDEV10, "TEvPut_sendToProxy", (BlobSeqId, BlobSeqId), (GroupId, groupId), (BlobId, id));
Agent.SendToProxy(groupId, std::move(ev), this, nullptr);
+ Agent.BytesWritten += id.BlobSize();
++PutsInFlight;
};
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 9a464fc5c1..aa81538680 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -86,6 +86,8 @@ namespace NKikimr::NBlobDepot {
fFunc(TEvBlobDepot::EvPushNotifyResult, handleFromAgentPipe);
fFunc(TEvBlobDepot::EvCollectGarbage, handleFromAgentPipe);
+ hFunc(TEvBlobDepot::TEvPushMetrics, Handle);
+
cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
@@ -100,6 +102,7 @@ namespace NKikimr::NBlobDepot {
cFunc(TEvPrivate::EvCommitCertainKeys, Data->HandleCommitCertainKeys);
cFunc(TEvPrivate::EvDoGroupMetricsExchange, DoGroupMetricsExchange);
hFunc(TEvBlobStorage::TEvControllerGroupMetricsExchange, Handle);
+ cFunc(TEvPrivate::EvUpdateThroughputs, UpdateThroughputs);
default:
if (!HandleDefaultEvents(ev, ctx)) {
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index c13e342ab2..fc0ab0ea7f 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -28,6 +28,7 @@ namespace NKikimr::NBlobDepot {
EvDoGroupMetricsExchange,
EvKickSpaceMonitor,
EvProcessRegisterAgentQ,
+ EvUpdateThroughputs,
};
};
@@ -177,6 +178,7 @@ namespace NKikimr::NBlobDepot {
ProcessRegisterAgentQ();
KickSpaceMonitor();
StartDataLoad();
+ UpdateThroughputs();
}
void StartDataLoad();
@@ -308,8 +310,14 @@ namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Group metrics exchange
+ ui64 BytesRead = 0;
+ ui64 BytesWritten = 0;
+ std::deque<std::tuple<TMonotonic, ui64, ui64>> MetricsQ;
+
void DoGroupMetricsExchange();
void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev);
+ void Handle(TEvBlobDepot::TEvPushMetrics::TPtr ev);
+ void UpdateThroughputs();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Validation
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 501d78ae95..5f19c5dac0 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -27,6 +27,7 @@ namespace NKikimr {
EvResolve,
EvResolveResult,
EvDiscardSpoiledBlobSeq,
+ EvPushMetrics,
};
#define BLOBDEPOT_PARAM_ARG(ARG) std::optional<std::decay_t<decltype(Record.Get##ARG())>> param##ARG,
@@ -71,6 +72,7 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve);
BLOBDEPOT_EVENT_PB(EvResolveResult, Status, ErrorReason);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvDiscardSpoiledBlobSeq);
+ BLOBDEPOT_EVENT_PB(EvPushMetrics, BytesRead, BytesWritten);
template<typename TEvent>
struct TResponseFor {};
diff --git a/ydb/core/blob_depot/group_metrics_exchange.cpp b/ydb/core/blob_depot/group_metrics_exchange.cpp
index 2b86d5edcf..f10696815a 100644
--- a/ydb/core/blob_depot/group_metrics_exchange.cpp
+++ b/ydb/core/blob_depot/group_metrics_exchange.cpp
@@ -86,8 +86,6 @@ namespace NKikimr::NBlobDepot {
wb.SetGroupID(Config.GetVirtualGroupId());
wb.SetAllocatedSize(Data->GetTotalStoredDataSize());
wb.SetAvailableSize(params->GetAvailableSize());
- wb.SetReadThroughput(0);
- wb.SetWriteThroughput(0);
Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), ev.release());
params->SetAllocatedSize(Data->GetTotalStoredDataSize());
@@ -101,4 +99,53 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TBlobDepot::Handle(TEvBlobDepot::TEvPushMetrics::TPtr ev) {
+ const auto& record = ev->Get()->Record;
+ BytesRead += record.GetBytesRead();
+ BytesWritten += record.GetBytesWritten();
+ MetricsQ.emplace_back(TActivationContext::Monotonic(), BytesRead, BytesWritten);
+ }
+
+ void TBlobDepot::UpdateThroughputs() {
+ static constexpr TDuration Window = TDuration::Seconds(3);
+
+ if (Config.HasVirtualGroupId() && !MetricsQ.empty()) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ const TMonotonic left = now - Window;
+ const auto comp = [](TMonotonic x, const auto& y) { return x < std::get<0>(y); };
+ const auto it = std::upper_bound(MetricsQ.begin(), MetricsQ.end(), left, comp);
+ if (it != MetricsQ.begin()) { // interpolate
+ MetricsQ.erase(MetricsQ.begin(), std::prev(it)); // remove all obsolete entries
+ Y_VERIFY(MetricsQ.size() >= 2);
+ const auto& [xTimestamp, xRead, xWritten] = MetricsQ[0];
+ auto& [yTimestamp, yRead, yWritten] = MetricsQ[1];
+ Y_VERIFY(xTimestamp <= left && left < yTimestamp);
+ const ui64 scale = 1'000'000;
+ const ui64 factor = (left - xTimestamp).MicroSeconds() * scale / (yTimestamp - xTimestamp).MicroSeconds();
+ yTimestamp = left;
+ yRead = xRead + (yRead - xRead) * factor / scale;
+ yWritten = xWritten + (yWritten - xWritten) * factor / scale;
+ MetricsQ.pop_front();
+ }
+
+ ui64 readThroughput = 0;
+ ui64 writeThroughput = 0;
+ const auto& [ts, read, written] = MetricsQ.front();
+ if (ts + TDuration::Seconds(1) < now) {
+ readThroughput = (BytesRead - read) * 1'000'000 / (now - ts).MicroSeconds();
+ writeThroughput = (BytesWritten - written) * 1'000'000 / (now - ts).MicroSeconds();
+ }
+
+ auto ev = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate>();
+ auto& wb = ev->Record;
+ wb.SetGroupID(Config.GetVirtualGroupId());
+ wb.SetReadThroughput(readThroughput);
+ wb.SetWriteThroughput(writeThroughput);
+ Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), ev.release());
+ }
+
+ TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateThroughputs, 0,
+ SelfId(), {}, nullptr, 0));
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 2efe99467e..c7fe27a8ad 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -228,3 +228,8 @@ message TEvResolveResult {
optional string ErrorReason = 2;
repeated TResolvedKey ResolvedKeys = 3;
}
+
+message TEvPushMetrics {
+ optional uint64 BytesRead = 1; // since last update
+ optional uint64 BytesWritten = 2; // since last update
+}