diff options
| -rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/testlib/test_client.cpp | 2 | ||||
| -rw-r--r-- | ydb/services/ydb/ydb_query.cpp | 104 | ||||
| -rw-r--r-- | ydb/services/ydb/ydb_query.h | 17 |
4 files changed, 63 insertions, 62 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index cc600d850e1..e670fe9a0a6 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -832,7 +832,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasQueryService) { server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters, - grpcRequestProxies[0], hasDataStreams.IsRlAllowed())); + grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } if (hasLogStore) { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 07f49a1c515..1535dd9b72c 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -388,7 +388,7 @@ namespace Tests { GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0])); diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp index 03f031430f1..760599e68cc 100644 --- a/ydb/services/ydb/ydb_query.cpp +++ b/ydb/services/ydb/ydb_query.cpp @@ -7,76 +7,60 @@ namespace NKikimr::NGRpcService { +TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxyId, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + +TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxies, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + void TGRpcYdbQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { using namespace Ydb::Query; using namespace NQuery; auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + size_t proxyCounter = 0; #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<IN, OUT, TGRpcYdbQueryService>>(this, &Service_, CQ_, \ - [this](NYdbGrpc::IRequestContextBase* ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("query", #NAME))->Run(); - - ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart> - (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); - }) - - ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation> - (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); - }) - - ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<FetchScriptResultsRequest, FetchScriptResultsResponse> - (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<CreateSessionRequest, CreateSessionResponse> - (ctx, &DoCreateSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<DeleteSessionRequest, DeleteSessionResponse> - (ctx, &DoDeleteSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<AttachSessionRequest, SessionState> - (ctx, &DoAttachSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<BeginTransactionRequest, BeginTransactionResponse> - (ctx, &DoBeginTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<CommitTransactionRequest, CommitTransactionResponse> - (ctx, &DoCommitTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) +#define ADD_REQUEST(NAME, IN, OUT, CB, ...) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive<TGRpcRequest<IN, OUT, TGRpcYdbQueryService>>(this, &Service_, cq, \ + [this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ + new TGrpcRequestNoOperationCall<IN, OUT> \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr __VA_OPT__(, TAuditMode::__VA_ARGS__)})); \ + }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("query", #NAME))->Run(); \ + ++proxyCounter; \ + } \ + } - ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall<RollbackTransactionRequest, RollbackTransactionResponse> - (ctx, &DoRollbackTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) + ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, DoExecuteQuery, Auditable); + ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, DoExecuteScript, Auditable); + ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, DoFetchScriptResults); + ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, DoCreateSession); + ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, DoDeleteSession); + ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, DoAttachSession); + ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, DoBeginTransaction); + ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, DoCommitTransaction); + ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, DoRollbackTransaction); #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_query.h b/ydb/services/ydb/ydb_query.h index 8dec5cc4fa7..04e8ab915ee 100644 --- a/ydb/services/ydb/ydb_query.h +++ b/ydb/services/ydb/ydb_query.h @@ -11,8 +11,25 @@ class TGRpcYdbQueryService public: using TGrpcServiceBase<Ydb::Query::V1::QueryService>::TGrpcServiceBase; + TGRpcYdbQueryService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + + TGRpcYdbQueryService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue); + private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; }; } // namespace NKikimr::NGRpcService |
