aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@ydb.tech>2024-11-25 15:41:30 +0100
committerGitHub <noreply@github.com>2024-11-25 15:41:30 +0100
commit1dd2b997111cdadcb91ad57b6ea05bd62ce15902 (patch)
tree071da29822622c321d2fd7ddf1d361283b7a2b20
parent4d17f1a714230cd40059dc80edcbc735a553d523 (diff)
downloadydb-1dd2b997111cdadcb91ad57b6ea05bd62ce15902.tar.gz
Add GRPC thread CPU time metric (#11772)
-rw-r--r--ydb/core/driver_lib/run/run.cpp8
-rw-r--r--ydb/core/grpc_streaming/grpc_streaming_ut.cpp3
-rw-r--r--ydb/core/testlib/test_client.cpp6
-rw-r--r--ydb/library/grpc/server/grpc_server.cpp39
-rw-r--r--ydb/library/grpc/server/grpc_server.h10
5 files changed, 49 insertions, 17 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 7680f911eb..ac333400fe 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -973,13 +973,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate();
sslOpts.SetSslData(sslData);
- GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts) });
+ GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });
fillFn(grpcConfig, *GRpcServers.back().second, sslOpts);
}
if (grpcConfig.GetPort()) {
- GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts) });
+ GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });
fillFn(grpcConfig, *GRpcServers.back().second, opts);
}
@@ -996,7 +996,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
xopts.SetEndpointId(ex.GetEndpointId());
}
- GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) });
+ GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *GRpcServers.back().second, xopts);
}
@@ -1035,7 +1035,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set");
Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set");
- GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts) });
+ GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *GRpcServers.back().second, xopts);
}
}
diff --git a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
index d6c723a9b2..e50dc1f5df 100644
--- a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
+++ b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp
@@ -95,12 +95,13 @@ public:
Server->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());
+
NYdbGrpc::TServerOptions options;
options.SetPort(grpc);
GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options));
auto* as = Server->GetRuntime()->GetAnyNodeActorSystem();
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());
GRpcServer->AddService(new TStreamingService<TImplActor>(as, counters));
GRpcServer->Start();
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 1f4f1e8ea0..c8184ec6c4 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -347,6 +347,9 @@ namespace Tests {
}
void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) {
+ GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
+ auto& counters = GRpcServerRootCounters;
+
GRpcServer.reset(new NYdbGrpc::TGRpcServer(options));
auto grpcService = new NGRpcProxy::TGRpcService();
@@ -379,9 +382,6 @@ namespace Tests {
auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId);
system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon);
- GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
- auto& counters = GRpcServerRootCounters;
-
// Setup discovery for typically used services on the node
{
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
diff --git a/ydb/library/grpc/server/grpc_server.cpp b/ydb/library/grpc/server/grpc_server.cpp
index cb8432563f..b027d37a1f 100644
--- a/ydb/library/grpc/server/grpc_server.cpp
+++ b/ydb/library/grpc/server/grpc_server.cpp
@@ -1,5 +1,8 @@
#include "grpc_server.h"
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <library/cpp/time_provider/monotonic.h>
+
#include <util/string/join.h>
#include <util/generic/yexception.h>
#include <util/system/thread.h>
@@ -18,19 +21,31 @@
namespace NYdbGrpc {
-using NThreading::TFuture;
-
-static void PullEvents(grpc::ServerCompletionQueue* cq) {
+static void PullEvents(grpc::ServerCompletionQueue* cq, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
TThread::SetCurrentThreadName("grpc_server");
+ auto okCounter = counters->GetCounter("RequestExecuted", true);
+ auto errorCounter = counters->GetCounter("RequestDestroyed", true);
+ auto cpuTime = counters->GetCounter("ThreadCPU", true);
+
+ NMonotonic::TMonotonic lastCpuTimeTs = {};
while (true) {
void* tag; // uniquely identifies a request.
bool ok;
+ auto now = NMonotonic::TMonotonic::Now();
+ if (now - lastCpuTimeTs >= TDuration::Seconds(1)) {
+ lastCpuTimeTs = now;
+ *cpuTime = ThreadCPUTime();
+ }
+
if (cq->Next(&tag, &ok)) {
IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
- if (!ev->Execute(ok)) {
+ if (ev->Execute(ok)) {
+ okCounter->Inc();
+ } else {
ev->DestroyRequest();
+ errorCounter->Inc();
}
} else {
break;
@@ -103,10 +118,16 @@ void TGrpcServiceProtectiable::DecRequest() {
}
}
-TGRpcServer::TGRpcServer(const TServerOptions& opts)
+TGRpcServer::TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
: Options_(opts)
+ , Counters_(std::move(counters))
, Limiter_(Options_.MaxGlobalRequestInFlight)
- {}
+{
+ if (!Counters_) {
+ // make a stub to simplify code
+ Counters_.Reset(new ::NMonitoring::TDynamicCounters());
+ }
+}
TGRpcServer::~TGRpcServer() {
Y_ABORT_UNLESS(Ts.empty());
@@ -237,10 +258,12 @@ void TGRpcServer::Start() {
}
Ts.reserve(Options_.WorkerThreads);
+ auto grpcCounters = Counters_->GetSubgroup("counters", "grpc");
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
auto* cq = &CQS_[i % CQS_.size()];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
+ auto workerCounters = grpcCounters->GetSubgroup("worker", ToString(i));
+ Ts.push_back(SystemThreadFactory()->Run([cq, workerCounters] {
+ PullEvents(cq->get(), std::move(workerCounters));
}));
}
diff --git a/ydb/library/grpc/server/grpc_server.h b/ydb/library/grpc/server/grpc_server.h
index 5ab48d2f24..459c94d616 100644
--- a/ydb/library/grpc/server/grpc_server.h
+++ b/ydb/library/grpc/server/grpc_server.h
@@ -18,6 +18,10 @@
#include <grpcpp/grpcpp.h>
+namespace NMonitoring {
+ struct TDynamicCounters;
+} // NMonitoring
+
namespace NYdbGrpc {
struct TSslData {
@@ -349,8 +353,11 @@ protected:
class TGRpcServer {
public:
using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
- TGRpcServer(const TServerOptions& opts);
+
+ // TODO: remove default nullptr after migration
+ TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = nullptr);
~TGRpcServer();
+
void AddService(IGRpcServicePtr service);
void Start();
// Send stop to registred services and call Shutdown on grpc server
@@ -365,6 +372,7 @@ private:
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
const TServerOptions Options_;
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
std::unique_ptr<grpc::Server> Server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
TVector<IThreadRef> Ts;