summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/driver_lib/run/run.cpp2
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/services/ydb/ydb_query.cpp104
-rw-r--r--ydb/services/ydb/ydb_query.h17
4 files changed, 63 insertions, 62 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index cc600d850e1..e670fe9a0a6 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -832,7 +832,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (hasQueryService) {
server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters,
- grpcRequestProxies[0], hasDataStreams.IsRlAllowed()));
+ grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
}
if (hasLogStore) {
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 07f49a1c515..1535dd9b72c 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -388,7 +388,7 @@ namespace Tests {
GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0]));
GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true));
- GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1));
if (Settings->EnableYq) {
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0]));
GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0]));
diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp
index 03f031430f1..760599e68cc 100644
--- a/ydb/services/ydb/ydb_query.cpp
+++ b/ydb/services/ydb/ydb_query.cpp
@@ -7,76 +7,60 @@
namespace NKikimr::NGRpcService {
+TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const NActors::TActorId& proxyId,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue)
+ : TGrpcServiceBase(system, counters, proxyId, rlAllowed)
+ , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
+{
+}
+
+TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue)
+ : TGrpcServiceBase(system, counters, proxies, rlAllowed)
+ , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue))
+{
+}
+
void TGRpcYdbQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
using namespace Ydb::Query;
using namespace NQuery;
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+ size_t proxyCounter = 0;
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<IN, OUT, TGRpcYdbQueryService>>(this, &Service_, CQ_, \
- [this](NYdbGrpc::IRequestContextBase* ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
- }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("query", #NAME))->Run();
-
- ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart>
- (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable}));
- })
-
- ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation>
- (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable}));
- })
-
- ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<FetchScriptResultsRequest, FetchScriptResultsResponse>
- (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
-
- ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<CreateSessionRequest, CreateSessionResponse>
- (ctx, &DoCreateSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
-
- ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<DeleteSessionRequest, DeleteSessionResponse>
- (ctx, &DoDeleteSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
-
- ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<AttachSessionRequest, SessionState>
- (ctx, &DoAttachSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
-
- ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<BeginTransactionRequest, BeginTransactionResponse>
- (ctx, &DoBeginTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
-
- ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<CommitTransactionRequest, CommitTransactionResponse>
- (ctx, &DoCommitTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
+#define ADD_REQUEST(NAME, IN, OUT, CB, ...) \
+ for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
+ for (auto* cq: CQS) { \
+ MakeIntrusive<TGRpcRequest<IN, OUT, TGRpcYdbQueryService>>(this, &Service_, cq, \
+ [this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \
+ new TGrpcRequestNoOperationCall<IN, OUT> \
+ (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr __VA_OPT__(, TAuditMode::__VA_ARGS__)})); \
+ }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("query", #NAME))->Run(); \
+ ++proxyCounter; \
+ } \
+ }
- ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_,
- new TGrpcRequestNoOperationCall<RollbackTransactionRequest, RollbackTransactionResponse>
- (ctx, &DoRollbackTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr}));
- })
+ ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, DoExecuteQuery, Auditable);
+ ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, DoExecuteScript, Auditable);
+ ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, DoFetchScriptResults);
+ ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, DoCreateSession);
+ ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, DoDeleteSession);
+ ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, DoAttachSession);
+ ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, DoBeginTransaction);
+ ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, DoCommitTransaction);
+ ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, DoRollbackTransaction);
#undef ADD_REQUEST
}
diff --git a/ydb/services/ydb/ydb_query.h b/ydb/services/ydb/ydb_query.h
index 8dec5cc4fa7..04e8ab915ee 100644
--- a/ydb/services/ydb/ydb_query.h
+++ b/ydb/services/ydb/ydb_query.h
@@ -11,8 +11,25 @@ class TGRpcYdbQueryService
public:
using TGrpcServiceBase<Ydb::Query::V1::QueryService>::TGrpcServiceBase;
+ TGRpcYdbQueryService(
+ NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const NActors::TActorId& proxyId,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue = 1);
+
+ TGRpcYdbQueryService(
+ NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue);
+
private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
+
+private:
+ const size_t HandlersPerCompletionQueue;
};
} // namespace NKikimr::NGRpcService