diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-15 19:45:03 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-15 19:45:03 +0300 |
commit | 3ca0e4d20361f13ade902c78a87faa4da73f44c0 (patch) | |
tree | 189a5c1686ab70d58511d050f177e7c652627778 | |
parent | 477261f1113065d578e21c21933da7ae432d0afd (diff) | |
download | ydb-3ca0e4d20361f13ade902c78a87faa4da73f44c0.tar.gz |
Invert grpc_request_proxy dependence for table service. KIKIMR-13646
ref:0b126dafc321557e232e9131321cc8ffa4790f9a
31 files changed, 334 insertions, 254 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp index fec4fd7b2c0..89e04ed68de 100644 --- a/ydb/core/blobstorage/testload/test_load_kqp.cpp +++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp @@ -223,7 +223,8 @@ private: } }; using namespace NGRpcService; - using TEvCreateSessionRequest = TGRpcRequestWrapper<TRpcServices::EvCreateSession, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, true, TRateLimiterMode::Rps>; + using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest, + Ydb::Table::CreateSessionResponse>; NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvCreateSessionRequest>( std::move(request), std::move(cb), WorkingDir, TString(), ctx ); @@ -247,7 +248,8 @@ private: } }; using namespace NGRpcService; - using TEvExecuteSchemeQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExecuteSchemeQuery, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse, true, TRateLimiterMode::Rps>; + using TEvExecuteSchemeQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest, + Ydb::Table::ExecuteSchemeQueryResponse>; NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvExecuteSchemeQueryRequest>( std::move(request), std::move(cb), WorkingDir, TString(), ctx ); diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 44b25c4a5f9..38380586218 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -360,6 +360,11 @@ public: virtual void AddServerHint(const TString& hint) = 0; virtual void SetCostInfo(float consumed_units) = 0; + virtual void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) = 0; + virtual void FinishStream() = 0; + + virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0; + private: virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0; }; @@ -378,6 +383,7 @@ public: }; class IRequestNoOpCtx : public IRequestCtx { + }; struct TCommonResponseFillerImpl { @@ -914,7 +920,7 @@ public: return GetPeerMetaValues(NYdb::YDB_REQUEST_TYPE_HEADER); } - void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) { + void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { // res->data() pointer is used inside grpc code. // So this object should be destroyed during grpc_slice destroying routine auto res = new TString; @@ -954,9 +960,8 @@ public: return google::protobuf::Arena::CreateMessage<TResult>(ctx->GetArena()); } - template<typename Tcb> - void SetStreamingNotify(Tcb&& cb) { - Ctx_->SetNextReplyCallback(cb); + void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) override { + Ctx_->SetNextReplyCallback(std::move(cb)); } void SetClientLostAction(std::function<void()>&& cb) override { @@ -969,7 +974,7 @@ public: Ctx_->GetFinishFuture().Subscribe(std::move(shutdown)); } - void FinishStream() { + void FinishStream() override { Ctx_->FinishStreamingOk(); } @@ -1033,6 +1038,8 @@ class TGrpcRequestCall typedef typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type TRequestIface; public: static constexpr bool IsOp = IsOperation; + static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg); + TGrpcRequestCall(NGrpc::IRequestContextBase* ctx, void (*cb)(std::unique_ptr<TRequestIface>, const IFacilityProvider&), TRequestAuxSettings auxSettings = {}) : TGRpcRequestWrapperImpl< diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index a1e7accb877..358570e4e76 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -566,36 +566,17 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo switch (ev->GetTypeRewrite()) { HFunc(TRefreshTokenImpl, PreHandle); HFunc(TEvLoginRequest, PreHandle); - HFunc(TEvAlterTableRequest, PreHandle); - HFunc(TEvCreateTableRequest, PreHandle); - HFunc(TEvDropTableRequest, PreHandle); HFunc(TEvGetOperationRequest, PreHandle); HFunc(TEvCancelOperationRequest, PreHandle); HFunc(TEvForgetOperationRequest, PreHandle); HFunc(TEvListOperationsRequest, PreHandle); - HFunc(TEvCreateSessionRequest, PreHandle); - HFunc(TEvKeepAliveRequest, PreHandle); - HFunc(TEvDeleteSessionRequest, PreHandle); - HFunc(TEvCopyTableRequest, PreHandle); - HFunc(TEvCopyTablesRequest, PreHandle); - HFunc(TEvRenameTablesRequest, PreHandle); - HFunc(TEvDescribeTableRequest, PreHandle); - HFunc(TEvReadTableRequest, PreHandle); - HFunc(TEvExplainDataQueryRequest, PreHandle); - HFunc(TEvPrepareDataQueryRequest, PreHandle); - HFunc(TEvExecuteDataQueryRequest, PreHandle); - HFunc(TEvExecuteSchemeQueryRequest, PreHandle); HFunc(TEvCreateTenantRequest, PreHandle); HFunc(TEvAlterTenantRequest, PreHandle); HFunc(TEvGetTenantStatusRequest, PreHandle); HFunc(TEvListTenantsRequest, PreHandle); HFunc(TEvRemoveTenantRequest, PreHandle); - HFunc(TEvBeginTransactionRequest, PreHandle); - HFunc(TEvCommitTransactionRequest, PreHandle); - HFunc(TEvRollbackTransactionRequest, PreHandle); HFunc(TEvListEndpointsRequest, PreHandle); HFunc(TEvDescribeTenantOptionsRequest, PreHandle); - HFunc(TEvDescribeTableOptionsRequest, PreHandle); HFunc(TEvCreateCoordinationNode, PreHandle); HFunc(TEvAlterCoordinationNode, PreHandle); HFunc(TEvDropCoordinationNode, PreHandle); @@ -620,7 +601,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvImportFromS3Request, PreHandle); HFunc(TEvImportDataRequest, PreHandle); HFunc(TEvDiscoverPQClustersRequest, PreHandle); - HFunc(TEvBulkUpsertRequest, PreHandle); HFunc(TEvWhoAmIRequest, PreHandle); HFunc(TEvCreateRateLimiterResource, PreHandle); HFunc(TEvAlterRateLimiterResource, PreHandle); @@ -632,7 +612,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvKikhouseRefreshSnapshotRequest, PreHandle); HFunc(TEvKikhouseDiscardSnapshotRequest, PreHandle); HFunc(TEvSelfCheckRequest, PreHandle); - HFunc(TEvStreamExecuteScanQueryRequest, PreHandle); HFunc(TEvCoordinationSessionRequest, PreHandle); HFunc(TEvLongTxBeginRequest, PreHandle); HFunc(TEvLongTxCommitRequest, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index ac65990f67f..508f678330f 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -48,36 +48,17 @@ public: }; protected: - void Handle(TEvAlterTableRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCreateTableRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDropTableRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCopyTableRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCopyTablesRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvRenameTablesRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDescribeTableRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvGetOperationRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCancelOperationRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvForgetOperationRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvListOperationsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCreateSessionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvKeepAliveRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDeleteSessionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvReadTableRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvExplainDataQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPrepareDataQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvExecuteDataQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvExecuteSchemeQueryRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCreateTenantRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterTenantRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvGetTenantStatusRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvListTenantsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvRemoveTenantRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvBeginTransactionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvCommitTransactionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvRollbackTransactionRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDescribeTenantOptionsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDescribeTableOptionsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx); void Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx); @@ -102,7 +83,6 @@ protected: void Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx); void Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvBulkUpsertRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvWhoAmIRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx); @@ -115,7 +95,6 @@ protected: void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvSelfCheckRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvStreamExecuteScanQueryRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLongTxCommitRequest::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index 23c3b6c8035..4f95c495409 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -145,6 +145,21 @@ public: Y_FAIL("Unimplemented for local rpc"); } + void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) override { + Y_UNUSED(cb); + Y_FAIL("Unimplemented for local rpc"); + } + + void FinishStream() override { + Y_FAIL("Unimplemented for local rpc"); + } + + virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { + Y_UNUSED(in); + Y_UNUSED(status); + Y_FAIL("Unimplemented for local rpc"); + } + TMaybe<TString> GetTraceId() const override { return Nothing(); } diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index d1f2755b66d..6d403250733 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -1,9 +1,11 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_scheme_base.h" #include "rpc_common.h" #include "operation_helpers.h" #include "table_settings.h" +#include "service_table.h" #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/cms/console/configs_dispatcher.h> @@ -77,6 +79,9 @@ static std::pair<StatusIds::StatusCode, TString> CheckAddIndexDesc(const Ydb::Ta return {StatusIds::SUCCESS, ""}; } +using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest, + Ydb::Table::AlterTableResponse>; + class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest> { using TBase = TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest>; @@ -537,8 +542,8 @@ private: TTableProfiles Profiles; }; -void TGRpcRequestProxy::Handle(TEvAlterTableRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TAlterTableRPC(ev->Release().Release())); +void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TAlterTableRPC(p.release())); } template<> @@ -546,6 +551,5 @@ IActor* TEvAlterTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt return new TAlterTableRPC(msg); } - } // namespace NKikimr } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_begin_transaction.cpp b/ydb/core/grpc_services/rpc_begin_transaction.cpp index 9e77fafdd3e..d042e9c3eee 100644 --- a/ydb/core/grpc_services/rpc_begin_transaction.cpp +++ b/ydb/core/grpc_services/rpc_begin_transaction.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -14,13 +16,16 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvBeginTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::BeginTransactionRequest, + Ydb::Table::BeginTransactionResponse>; + class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TEvBeginTransactionRequest> { using TBase = TRpcKqpRequestActor<TBeginTransactionRPC, TEvBeginTransactionRequest>; public: using TResult = Ydb::Table::BeginTransactionResult; - TBeginTransactionRPC(TEvBeginTransactionRequest* msg) + TBeginTransactionRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext& ctx) { @@ -104,8 +109,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvBeginTransactionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TBeginTransactionRPC(ev->Release().Release())); +void DoBeginTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TBeginTransactionRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 43136c2f4ad..4fa6b5b38ac 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -45,28 +45,10 @@ void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues template <> void FillYdbStatus(Ydb::Coordination::SessionResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status); -using TEvAlterTableRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTable, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse, true, TRateLimiterMode::Rps>; -using TEvCreateTableRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTable, Ydb::Table::CreateTableRequest, Ydb::Table::CreateTableResponse, true, TRateLimiterMode::Rps>; -using TEvDropTableRequest = TGRpcRequestWrapper<TRpcServices::EvDropTable, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse, true, TRateLimiterMode::Rps>; -using TEvCopyTableRequest = TGRpcRequestWrapper<TRpcServices::EvCopyTable, Ydb::Table::CopyTableRequest, Ydb::Table::CopyTableResponse, true, TRateLimiterMode::Rps>; -using TEvCopyTablesRequest = TGRpcRequestWrapper<TRpcServices::EvCopyTables, Ydb::Table::CopyTablesRequest, Ydb::Table::CopyTablesResponse, true, TRateLimiterMode::Rps>; -using TEvRenameTablesRequest = TGRpcRequestWrapper<TRpcServices::EvRenameTables, Ydb::Table::RenameTablesRequest, Ydb::Table::RenameTablesResponse, true, TRateLimiterMode::Rps>; -using TEvDescribeTableRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTable, Ydb::Table::DescribeTableRequest, Ydb::Table::DescribeTableResponse, true, TRateLimiterMode::Rps>; using TEvGetOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvGetOperation, Ydb::Operations::GetOperationRequest, Ydb::Operations::GetOperationResponse, true, TRateLimiterMode::Rps>; using TEvCancelOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCancelOperation, Ydb::Operations::CancelOperationRequest, Ydb::Operations::CancelOperationResponse, false, TRateLimiterMode::Rps>; using TEvForgetOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvForgetOperation, Ydb::Operations::ForgetOperationRequest, Ydb::Operations::ForgetOperationResponse, false, TRateLimiterMode::Rps>; using TEvListOperationsRequest = TGRpcRequestValidationWrapper<TRpcServices::EvListOperations, Ydb::Operations::ListOperationsRequest, Ydb::Operations::ListOperationsResponse, false, TRateLimiterMode::Rps>; -using TEvCreateSessionRequest = TGRpcRequestWrapper<TRpcServices::EvCreateSession, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, true, TRateLimiterMode::Rps>; -using TEvDeleteSessionRequest = TGRpcRequestWrapper<TRpcServices::EvDeleteSession, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse, true>; -using TEvKeepAliveRequest = TGRpcRequestWrapper<TRpcServices::EvKeepAlive, Ydb::Table::KeepAliveRequest, Ydb::Table::KeepAliveResponse, true, TRateLimiterMode::Rps>; -using TEvReadTableRequest = TGRpcRequestWrapper<TRpcServices::EvReadTable, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse, false, TRateLimiterMode::RuOnProgress>; -using TEvExplainDataQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExplainDataQuery, Ydb::Table::ExplainDataQueryRequest, Ydb::Table::ExplainDataQueryResponse, true, TRateLimiterMode::Rps>; -using TEvPrepareDataQueryRequest = TGRpcRequestWrapper<TRpcServices::EvPrepareDataQuery, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse, true, TRateLimiterMode::Ru>; -using TEvExecuteDataQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExecuteDataQuery, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse, true, TRateLimiterMode::Ru>; -using TEvExecuteSchemeQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExecuteSchemeQuery, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse, true, TRateLimiterMode::Rps>; -using TEvBeginTransactionRequest = TGRpcRequestWrapper<TRpcServices::EvBeginTransaction, Ydb::Table::BeginTransactionRequest, Ydb::Table::BeginTransactionResponse, true, TRateLimiterMode::Rps>; -using TEvCommitTransactionRequest = TGRpcRequestWrapper<TRpcServices::EvCommitTransaction, Ydb::Table::CommitTransactionRequest, Ydb::Table::CommitTransactionResponse, true>; -using TEvRollbackTransactionRequest = TGRpcRequestWrapper<TRpcServices::EvRollbackTransaction, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse, true>; using TEvCreateTenantRequest = TGRpcRequestWrapper<TRpcServices::EvCreateTenant, Ydb::Cms::CreateDatabaseRequest, Ydb::Cms::CreateDatabaseResponse, true>; using TEvAlterTenantRequest = TGRpcRequestWrapper<TRpcServices::EvAlterTenant, Ydb::Cms::AlterDatabaseRequest, Ydb::Cms::AlterDatabaseResponse, true>; using TEvGetTenantStatusRequest = TGRpcRequestWrapper<TRpcServices::EvGetTenantStatus, Ydb::Cms::GetDatabaseStatusRequest, Ydb::Cms::GetDatabaseStatusResponse, true>; @@ -74,7 +56,6 @@ using TEvListTenantsRequest = TGRpcRequestWrapper<TRpcServices::EvListTenants, Y using TEvRemoveTenantRequest = TGRpcRequestWrapper<TRpcServices::EvRemoveTenant, Ydb::Cms::RemoveDatabaseRequest, Ydb::Cms::RemoveDatabaseResponse, true>; using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoints, Ydb::Discovery::ListEndpointsRequest, Ydb::Discovery::ListEndpointsResponse, true>; using TEvDescribeTenantOptionsRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTenantOptions, Ydb::Cms::DescribeDatabaseOptionsRequest, Ydb::Cms::DescribeDatabaseOptionsResponse, true>; -using TEvDescribeTableOptionsRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTableOptions, Ydb::Table::DescribeTableOptionsRequest, Ydb::Table::DescribeTableOptionsResponse, true, TRateLimiterMode::Rps>; using TEvCreateCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvCreateCoordinationNode, Ydb::Coordination::CreateNodeRequest, Ydb::Coordination::CreateNodeResponse, true, TRateLimiterMode::Rps>; using TEvAlterCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvAlterCoordinationNode, Ydb::Coordination::AlterNodeRequest, Ydb::Coordination::AlterNodeResponse, true, TRateLimiterMode::Rps>; using TEvDropCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDropCoordinationNode, Ydb::Coordination::DropNodeRequest, Ydb::Coordination::DropNodeResponse, true, TRateLimiterMode::Rps>; @@ -99,7 +80,6 @@ using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExpor using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>; using TEvImportDataRequest = TGRpcRequestValidationWrapper<TRpcServices::EvImportData, Ydb::Import::ImportDataRequest, Ydb::Import::ImportDataResponse, true>; using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; -using TEvBulkUpsertRequest = TGRpcRequestWrapper<TRpcServices::EvBulkUpsert, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse, true, TRateLimiterMode::Ru>; using TEvWhoAmIRequest = TGRpcRequestWrapper<TRpcServices::EvWhoAmI, Ydb::Discovery::WhoAmIRequest, Ydb::Discovery::WhoAmIResponse, true, TRateLimiterMode::Rps>; using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>; using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>; @@ -112,7 +92,6 @@ using TEvKikhouseRefreshSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKi using TEvKikhouseDiscardSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseDiscardSnapshot, Ydb::ClickhouseInternal::DiscardSnapshotRequest, Ydb::ClickhouseInternal::DiscardSnapshotResponse, true>; using TEvSelfCheckRequest = TGRpcRequestWrapper<TRpcServices::EvSelfCheck, Ydb::Monitoring::SelfCheckRequest, Ydb::Monitoring::SelfCheckResponse, true>; using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>; -using TEvStreamExecuteScanQueryRequest = TGRpcRequestWrapper<TRpcServices::EvStreamExecuteScanQuery, Ydb::Table::ExecuteScanQueryRequest, Ydb::Table::ExecuteScanQueryPartialResponse, false, TRateLimiterMode::RuOnProgress>; using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>; using TEvLongTxBeginRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxBegin, Ydb::LongTx::BeginTransactionRequest, Ydb::LongTx::BeginTransactionResponse, true>; using TEvLongTxCommitRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxCommit, Ydb::LongTx::CommitTransactionRequest, Ydb::LongTx::CommitTransactionResponse, true>; diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp index 3b85b5ef62d..561bf1e2b1e 100644 --- a/ydb/core/grpc_services/rpc_commit_transaction.cpp +++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -14,11 +16,14 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvCommitTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest, +Ydb::Table::CommitTransactionResponse>; + class TCommitTransactionRPC : public TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest> { using TBase = TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest>; public: - TCommitTransactionRPC(TEvCommitTransactionRequest* msg) + TCommitTransactionRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext& ctx) { @@ -92,8 +97,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvCommitTransactionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCommitTransactionRPC(ev->Release().Release())); +void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TCommitTransactionRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_common.h b/ydb/core/grpc_services/rpc_common.h index 075318037c9..dbcd5f9e1b3 100644 --- a/ydb/core/grpc_services/rpc_common.h +++ b/ydb/core/grpc_services/rpc_common.h @@ -8,6 +8,7 @@ namespace NKikimr { namespace NGRpcService { +class IRequestCtx; template<typename TEv> inline void SetRlPath(TEv& ev, const IRequestCtx& ctx) { diff --git a/ydb/core/grpc_services/rpc_copy_table.cpp b/ydb/core/grpc_services/rpc_copy_table.cpp index 74a0c9d9a81..125e4e4377e 100644 --- a/ydb/core/grpc_services/rpc_copy_table.cpp +++ b/ydb/core/grpc_services/rpc_copy_table.cpp @@ -1,4 +1,5 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_scheme_base.h" @@ -10,11 +11,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvCopyTableRequest = TGrpcRequestOperationCall<Ydb::Table::CopyTableRequest, + Ydb::Table::CopyTableResponse>; + class TCopyTableRPC : public TRpcSchemeRequestActor<TCopyTableRPC, TEvCopyTableRequest> { using TBase = TRpcSchemeRequestActor<TCopyTableRPC, TEvCopyTableRequest>; public: - TCopyTableRPC(TEvCopyTableRequest* msg) + TCopyTableRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -50,8 +54,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvCopyTableRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCopyTableRPC(ev->Release().Release())); +void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TCopyTableRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_copy_tables.cpp b/ydb/core/grpc_services/rpc_copy_tables.cpp index 1f5fd281a51..10f2fe0cbe6 100644 --- a/ydb/core/grpc_services/rpc_copy_tables.cpp +++ b/ydb/core/grpc_services/rpc_copy_tables.cpp @@ -1,4 +1,5 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_scheme_base.h" @@ -10,11 +11,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvCopyTablesRequest = TGrpcRequestOperationCall<Ydb::Table::CopyTablesRequest, + Ydb::Table::CopyTablesResponse>; + class TCopyTablesRPC : public TRpcSchemeRequestActor<TCopyTablesRPC, TEvCopyTablesRequest> { using TBase = TRpcSchemeRequestActor<TCopyTablesRPC, TEvCopyTablesRequest>; public: - TCopyTablesRPC(TEvCopyTablesRequest* msg) + TCopyTablesRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -44,8 +48,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvCopyTablesRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCopyTablesRPC(ev->Release().Release())); +void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TCopyTablesRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp index 2f6e9fae266..c4e6c6f9ff5 100644 --- a/ydb/core/grpc_services/rpc_create_session.cpp +++ b/ydb/core/grpc_services/rpc_create_session.cpp @@ -1,8 +1,9 @@ -#include "grpc_request_proxy.h" - +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_common.h" #include "rpc_kqp_base.h" +#include "service_table.h" #include <ydb/core/grpc_services/local_rpc/local_rpc.h> @@ -16,6 +17,9 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest, + Ydb::Table::CreateSessionResponse>; + class TCreateSessionRPC : public TRpcKqpRequestActor<TCreateSessionRPC, TEvCreateSessionRequest> { using TBase = TRpcKqpRequestActor<TCreateSessionRPC, TEvCreateSessionRequest>; @@ -98,7 +102,10 @@ private: auto database = Request_->GetDatabaseName().GetOrElse(""); - auto actorId = NRpcService::DoLocalRpcSameMailbox<NKikimr::NGRpcService::TEvDeleteSessionRequest>( + using TEvDeleteSessionRequest = TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest, + Ydb::Table::DeleteSessionResponse>; + + auto actorId = NRpcService::DoLocalRpcSameMailbox<TEvDeleteSessionRequest>( std::move(request), std::move(cb), database, Request_->GetInternalToken(), ctx); LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY, @@ -134,8 +141,8 @@ private: }; -void TGRpcRequestProxy::Handle(TEvCreateSessionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCreateSessionRPC(ev->Release().Release())); +void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TCreateSessionRPC(p.release())); } template<> diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp index 302dfde57a8..209b7483682 100644 --- a/ydb/core/grpc_services/rpc_create_table.cpp +++ b/ydb/core/grpc_services/rpc_create_table.cpp @@ -1,5 +1,5 @@ -#include "grpc_request_proxy.h" - +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -21,11 +21,14 @@ using namespace NConsole; using namespace Ydb; using namespace Ydb::Table; +using TEvCreateTableRequest = TGrpcRequestOperationCall<Ydb::Table::CreateTableRequest, + Ydb::Table::CreateTableResponse>; + class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreateTableRequest> { using TBase = TRpcSchemeRequestActor<TCreateTableRPC, TEvCreateTableRequest>; public: - TCreateTableRPC(TEvCreateTableRequest* msg) + TCreateTableRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -226,8 +229,8 @@ private: TTableProfiles Profiles; }; -void TGRpcRequestProxy::Handle(TEvCreateTableRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCreateTableRPC(ev->Release().Release())); +void DoCreateTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TCreateTableRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_delete_session.cpp b/ydb/core/grpc_services/rpc_delete_session.cpp index a2245af2362..10d1683c7ea 100644 --- a/ydb/core/grpc_services/rpc_delete_session.cpp +++ b/ydb/core/grpc_services/rpc_delete_session.cpp @@ -1,7 +1,9 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" +#include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -13,6 +15,9 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvDeleteSessionRequest = TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest, + Ydb::Table::DeleteSessionResponse>; + class TDeleteSessionRPC : public TRpcKqpRequestActor<TDeleteSessionRPC, TEvDeleteSessionRequest> { using TBase = TRpcKqpRequestActor<TDeleteSessionRPC, TEvDeleteSessionRequest>; @@ -46,8 +51,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvDeleteSessionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDeleteSessionRPC(ev->Release().Release())); +void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TDeleteSessionRPC(p.release())); } template<> diff --git a/ydb/core/grpc_services/rpc_describe_table.cpp b/ydb/core/grpc_services/rpc_describe_table.cpp index 261f10d4676..df8dbdfb9b7 100644 --- a/ydb/core/grpc_services/rpc_describe_table.cpp +++ b/ydb/core/grpc_services/rpc_describe_table.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_scheme_base.h" +#include "service_table.h" #include "rpc_common.h" #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/ydb_convert/table_description.h> @@ -14,11 +16,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvDescribeTableRequest = TGrpcRequestOperationCall<Ydb::Table::DescribeTableRequest, + Ydb::Table::DescribeTableResponse>; + class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest> { using TBase = TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest>; public: - TDescribeTableRPC(TEvDescribeTableRequest* msg) + TDescribeTableRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -133,8 +138,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvDescribeTableRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeTableRPC(ev->Release().Release())); +void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TDescribeTableRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_describe_table_options.cpp b/ydb/core/grpc_services/rpc_describe_table_options.cpp index 6d77e3cff71..057ffc8ccd7 100644 --- a/ydb/core/grpc_services/rpc_describe_table_options.cpp +++ b/ydb/core/grpc_services/rpc_describe_table_options.cpp @@ -1,9 +1,11 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" #include "table_profiles.h" +#include "service_table.h" #include <ydb/core/cms/console/configs_dispatcher.h> #include <ydb/core/protos/console_config.pb.h> @@ -17,11 +19,14 @@ using namespace NConsole; using namespace Ydb; using namespace Ydb::Table; +using TEvDescribeTableOptionsRequest = TGrpcRequestOperationCall<Ydb::Table::DescribeTableOptionsRequest, + Ydb::Table::DescribeTableOptionsResponse>; + class TDescribeTableOptionsRPC : public TRpcSchemeRequestActor<TDescribeTableOptionsRPC, TEvDescribeTableOptionsRequest> { using TBase = TRpcSchemeRequestActor<TDescribeTableOptionsRPC, TEvDescribeTableOptionsRequest>; public: - TDescribeTableOptionsRPC(TEvDescribeTableOptionsRequest* msg) + TDescribeTableOptionsRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -202,8 +207,8 @@ private: TTableProfiles Profiles; }; -void TGRpcRequestProxy::Handle(TEvDescribeTableOptionsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeTableOptionsRPC(ev->Release().Release())); +void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TDescribeTableOptionsRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_drop_table.cpp b/ydb/core/grpc_services/rpc_drop_table.cpp index 27899a9d773..d1c70f2dce3 100644 --- a/ydb/core/grpc_services/rpc_drop_table.cpp +++ b/ydb/core/grpc_services/rpc_drop_table.cpp @@ -1,5 +1,7 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> +#include "service_table.h" #include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -10,6 +12,9 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvDropTableRequest = TGrpcRequestOperationCall<Ydb::Table::DropTableRequest, + Ydb::Table::DropTableResponse>; + class TDropTableRPC : public TRpcSchemeRequestActor<TDropTableRPC, TEvDropTableRequest> { using TBase = TRpcSchemeRequestActor<TDropTableRPC, TEvDropTableRequest>; @@ -54,8 +59,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvDropTableRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDropTableRPC(ev->Release().Release())); +void DoDropTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TDropTableRPC(p.release())); } template<> diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 317a3488ce0..0f4a7072578 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -1,9 +1,11 @@ -#include "grpc_request_proxy.h" - -#include "rpc_calls.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/ydb_scheme.pb.h> #include <ydb/core/base/counters.h> #include <ydb/core/protos/console_config.pb.h> #include <ydb/core/ydb_convert/ydb_convert.h> @@ -20,13 +22,16 @@ using namespace Ydb; using namespace Ydb::Table; using namespace NKqp; +using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest, + Ydb::Table::ExecuteDataQueryResponse>; + class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest> { using TBase = TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest>; public: using TResult = Ydb::Table::ExecuteQueryResult; - TExecuteDataQueryRPC(TEvExecuteDataQueryRequest* msg) + TExecuteDataQueryRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -243,8 +248,8 @@ public: } }; -void TGRpcRequestProxy::Handle(TEvExecuteDataQueryRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TExecuteDataQueryRPC(ev->Release().Release())); +void DoExecuteDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TExecuteDataQueryRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_execute_scheme_query.cpp b/ydb/core/grpc_services/rpc_execute_scheme_query.cpp index 6239bd7798a..dd75c0851ac 100644 --- a/ydb/core/grpc_services/rpc_execute_scheme_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_scheme_query.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/core/protos/console_config.pb.h> @@ -16,6 +18,9 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvExecuteSchemeQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest, + Ydb::Table::ExecuteSchemeQueryResponse>; + class TExecuteSchemeQueryRPC : public TRpcKqpRequestActor<TExecuteSchemeQueryRPC, TEvExecuteSchemeQueryRequest> { using TBase = TRpcKqpRequestActor<TExecuteSchemeQueryRPC, TEvExecuteSchemeQueryRequest>; @@ -87,8 +92,8 @@ public: } }; -void TGRpcRequestProxy::Handle(TEvExecuteSchemeQueryRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TExecuteSchemeQueryRPC(ev->Release().Release())); +void DoExecuteSchemeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TExecuteSchemeQueryRPC(p.release())); } template<> diff --git a/ydb/core/grpc_services/rpc_explain_data_query.cpp b/ydb/core/grpc_services/rpc_explain_data_query.cpp index cc42fb92358..eafa612e2d6 100644 --- a/ydb/core/grpc_services/rpc_explain_data_query.cpp +++ b/ydb/core/grpc_services/rpc_explain_data_query.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/core/protos/console_config.pb.h> @@ -18,11 +20,14 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvExplainDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExplainDataQueryRequest, + Ydb::Table::ExplainDataQueryResponse>; + class TExplainDataQueryRPC : public TRpcKqpRequestActor<TExplainDataQueryRPC, TEvExplainDataQueryRequest> { using TBase = TRpcKqpRequestActor<TExplainDataQueryRPC, TEvExplainDataQueryRequest>; public: - TExplainDataQueryRPC(TEvExplainDataQueryRequest* msg) + TExplainDataQueryRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -87,8 +92,8 @@ public: } }; -void TGRpcRequestProxy::Handle(TEvExplainDataQueryRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TExplainDataQueryRPC(ev->Release().Release())); +void DoExplainDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TExplainDataQueryRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_keep_alive.cpp b/ydb/core/grpc_services/rpc_keep_alive.cpp index e4188c8ab91..9a38d0a3a97 100644 --- a/ydb/core/grpc_services/rpc_keep_alive.cpp +++ b/ydb/core/grpc_services/rpc_keep_alive.cpp @@ -1,9 +1,12 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" + #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -14,11 +17,14 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvKeepAliveRequest = TGrpcRequestOperationCall<Ydb::Table::KeepAliveRequest, + Ydb::Table::KeepAliveResponse>; + class TKeepAliveRPC : public TRpcKqpRequestActor<TKeepAliveRPC, TEvKeepAliveRequest> { using TBase = TRpcKqpRequestActor<TKeepAliveRPC, TEvKeepAliveRequest>; public: - TKeepAliveRPC(TEvKeepAliveRequest* msg) + TKeepAliveRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext& ctx) { @@ -87,8 +93,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvKeepAliveRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TKeepAliveRPC(ev->Release().Release())); +void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TKeepAliveRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 127347e067e..a4936f1c350 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -1,7 +1,8 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> -#include "rpc_calls.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h> #include <ydb/library/yql/public/udf/udf_types.h> @@ -173,10 +174,13 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) } +using TEvBulkUpsertRequest = TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest, + Ydb::Table::BulkUpsertResponse>; + class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> { using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>; public: - explicit TUploadRowsRPCPublic(TAutoPtr<TEvBulkUpsertRequest> request) + explicit TUploadRowsRPCPublic(TEvBulkUpsertRequest* request) : TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout())) , Request(request) {} @@ -432,14 +436,14 @@ private: } private: - TAutoPtr<TEvBulkUpsertRequest> Request; + std::unique_ptr<TEvBulkUpsertRequest> Request; TVector<std::pair<TSerializedCellVec, TString>> AllRows; }; class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> { using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>; public: - explicit TUploadColumnsRPCPublic(TAutoPtr<TEvBulkUpsertRequest> request) + explicit TUploadColumnsRPCPublic(TEvBulkUpsertRequest* request) : TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout())) , Request(request) {} @@ -646,7 +650,7 @@ private: } private: - TAutoPtr<TEvBulkUpsertRequest> Request; + std::unique_ptr<TEvBulkUpsertRequest> Request; TVector<std::pair<TSerializedCellVec, TString>> Rows; const Ydb::Formats::CsvSettings& GetCsvSettings() const { @@ -654,14 +658,16 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvBulkUpsertRequest::TPtr& ev, const TActorContext& ctx) { - auto* req = ev->Get()->GetProtoRequest(); - if (req->has_arrow_batch_settings()) { - ctx.Register(new TUploadColumnsRPCPublic(ev->Release().Release())); - } else if (req->has_csv_settings()) { - ctx.Register(new TUploadColumnsRPCPublic(ev->Release().Release())); +void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + + auto* req = dynamic_cast<TEvBulkUpsertRequest*>(p.release()); + Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + if (req->GetProtoRequest()->has_arrow_batch_settings()) { + TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req)); + } else if (req->GetProtoRequest()->has_csv_settings()) { + TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req)); } else { - ctx.Register(new TUploadRowsRPCPublic(ev->Release().Release())); + TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(req)); } } diff --git a/ydb/core/grpc_services/rpc_prepare_data_query.cpp b/ydb/core/grpc_services/rpc_prepare_data_query.cpp index eb4c5718774..7c2adce96cc 100644 --- a/ydb/core/grpc_services/rpc_prepare_data_query.cpp +++ b/ydb/core/grpc_services/rpc_prepare_data_query.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/core/protos/console_config.pb.h> #include <ydb/core/ydb_convert/ydb_convert.h> @@ -19,11 +21,15 @@ using namespace NOperationId; using namespace Ydb; using namespace NKqp; +using TEvPrepareDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::PrepareDataQueryRequest, + Ydb::Table::PrepareDataQueryResponse>; + + class TPrepareDataQueryRPC : public TRpcKqpRequestActor<TPrepareDataQueryRPC, TEvPrepareDataQueryRequest> { using TBase = TRpcKqpRequestActor<TPrepareDataQueryRPC, TEvPrepareDataQueryRequest>; public: - TPrepareDataQueryRPC(TEvPrepareDataQueryRequest* msg) + TPrepareDataQueryRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -104,8 +110,8 @@ public: } }; -void TGRpcRequestProxy::Handle(TEvPrepareDataQueryRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TPrepareDataQueryRPC(ev->Release().Release())); +void DoPrepareDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TPrepareDataQueryRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index bbb87907289..5b6b139d864 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -1,9 +1,11 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_common.h" #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "local_rate_limiter.h" +#include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -27,6 +29,9 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvReadTableRequest = TGrpcRequestNoOperationCall<Ydb::Table::ReadTableRequest, + Ydb::Table::ReadTableResponse>; + static const TDuration RlMaxDuration = TDuration::Minutes(1); static ui64 CalcRuConsumption(const TString& data) { @@ -107,7 +112,7 @@ public: return NKikimrServices::TActivity::GRPC_STREAM_REQ; } - TReadTableRPC(TEvReadTableRequest* msg) + TReadTableRPC(IRequestNoOpCtx* msg) : Request_(msg) , QuotaLimit_(10) , QuotaReserved_(0) @@ -431,7 +436,7 @@ private: } void SendProposeRequest(const TActorContext &ctx) { - const auto req = Request_->GetProtoRequest(); + const auto req = TEvReadTableRequest::GetProtoRequest(Request_.get()); auto actorId = SelfId(); const TActorSystem* const as = ctx.ExecutorThread.ActorSystem; auto cb = [actorId, as](size_t left) { @@ -721,7 +726,7 @@ private: TryToAllocateQuota(); } - std::unique_ptr<TEvReadTableRequest> Request_; + std::unique_ptr<IRequestNoOpCtx> Request_; TActorId ReadTableActor; @@ -754,8 +759,8 @@ private: bool HasPendingSuccess = false; }; -void TGRpcRequestProxy::Handle(TEvReadTableRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TReadTableRPC(ev->Release().Release())); +void DoReadTableRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TReadTableRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_rename_tables.cpp b/ydb/core/grpc_services/rpc_rename_tables.cpp index 69684e889b1..015f55e9957 100644 --- a/ydb/core/grpc_services/rpc_rename_tables.cpp +++ b/ydb/core/grpc_services/rpc_rename_tables.cpp @@ -1,5 +1,7 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> +#include "service_table.h" #include "rpc_calls.h" #include "rpc_scheme_base.h" #include "rpc_common.h" @@ -10,11 +12,14 @@ namespace NGRpcService { using namespace NActors; using namespace Ydb; +using TEvRenameTablesRequest = TGrpcRequestOperationCall<Ydb::Table::RenameTablesRequest, + Ydb::Table::RenameTablesResponse>; + class TRenameTablesRPC : public TRpcSchemeRequestActor<TRenameTablesRPC, TEvRenameTablesRequest> { using TBase = TRpcSchemeRequestActor<TRenameTablesRPC, TEvRenameTablesRequest>; public: - TRenameTablesRPC(TEvRenameTablesRequest* msg) + TRenameTablesRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext &ctx) { @@ -63,8 +68,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvRenameTablesRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TRenameTablesRPC(ev->Release().Release())); +void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TRenameTablesRPC(p.release())); } } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_rollback_transaction.cpp b/ydb/core/grpc_services/rpc_rollback_transaction.cpp index b6cdfe28177..12c102d4839 100644 --- a/ydb/core/grpc_services/rpc_rollback_transaction.cpp +++ b/ydb/core/grpc_services/rpc_rollback_transaction.cpp @@ -1,8 +1,10 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_calls.h" #include "rpc_kqp_base.h" #include "rpc_common.h" +#include "service_table.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -14,11 +16,14 @@ using namespace NActors; using namespace Ydb; using namespace NKqp; +using TEvRollbackTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::RollbackTransactionRequest, + Ydb::Table::RollbackTransactionResponse>; + class TRollbackTransactionRPC : public TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest> { using TBase = TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest>; public: - TRollbackTransactionRPC(TEvRollbackTransactionRequest* msg) + TRollbackTransactionRPC(IRequestOpCtx* msg) : TBase(msg) {} void Bootstrap(const TActorContext& ctx) { @@ -90,8 +95,8 @@ private: } }; -void TGRpcRequestProxy::Handle(TEvRollbackTransactionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TRollbackTransactionRPC(ev->Release().Release())); +void DoRollbackTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + TActivationContext::AsActorContext().Register(new TRollbackTransactionRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 69d14f9ad74..f50883e7993 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -1,7 +1,9 @@ -#include "grpc_request_proxy.h" +#include "service_table.h" +#include <ydb/core/grpc_services/base/base.h> #include "rpc_common.h" #include "rpc_kqp_base.h" +#include "service_table.h" #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> @@ -240,6 +242,9 @@ bool FillProfile(Ydb::Table::ExecuteScanQueryPartialResponse& response, return false; } +using TEvStreamExecuteScanQueryRequest = TGrpcRequestNoOperationCall<Ydb::Table::ExecuteScanQueryRequest, + Ydb::Table::ExecuteScanQueryPartialResponse>; + template<typename TRequestEv, typename TResponse> class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQueryRPC<TRequestEv, TResponse>> { private: @@ -629,10 +634,12 @@ void TGRpcRequestProxy::Handle(TEvExperimentalStreamQueryRequest::TPtr& ev, cons Ydb::Experimental::ExecuteStreamQueryResponse>(ev->Release().Release(), rpcBufferSize)); } -void TGRpcRequestProxy::Handle(TEvStreamExecuteScanQueryRequest::TPtr& ev, const TActorContext& ctx) { - ui64 rpcBufferSize = GetAppConfig().GetTableServiceConfig().GetResourceManager().GetChannelBufferSize(); - ctx.Register(new TStreamExecuteScanQueryRPC<TEvStreamExecuteScanQueryRequest, - Ydb::Table::ExecuteScanQueryPartialResponse>(ev->Release().Release(), rpcBufferSize)); +void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { + ui64 rpcBufferSize = f.GetAppConfig().GetTableServiceConfig().GetResourceManager().GetChannelBufferSize(); + auto* req = dynamic_cast<TEvStreamExecuteScanQueryRequest*>(p.release()); + Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + TActivationContext::AsActorContext().Register(new TStreamExecuteScanQueryRPC<TEvStreamExecuteScanQueryRequest, + Ydb::Table::ExecuteScanQueryPartialResponse>(req, rpcBufferSize)); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h new file mode 100644 index 00000000000..adf8493daac --- /dev/null +++ b/ydb/core/grpc_services/service_table.h @@ -0,0 +1,35 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IRequestNoOpCtx; +class IFacilityProvider; + +void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoCreateTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDropTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoReadTableRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); +void DoExplainDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoPrepareDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoExecuteDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoExecuteSchemeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoBeginTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoRollbackTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index 196da496c99..305b165b7ed 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -1229,7 +1229,8 @@ public: // FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency using namespace NGRpcService; - using TEvAlterTableRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTable, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse, true, TRateLimiterMode::Rps>; + using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest, + Ydb::Table::AlterTableResponse>; return SendLocalRpcRequestNoResult<TEvAlterTableRequest>(std::move(req), Database, GetTokenCompat()); } @@ -1283,7 +1284,8 @@ public: // FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency using namespace NGRpcService; - using TEvDropTableRequest = TGRpcRequestWrapper<TRpcServices::EvDropTable, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse, true, TRateLimiterMode::Rps>; + using TEvDropTableRequest = TGrpcRequestOperationCall<Ydb::Table::DropTableRequest, + Ydb::Table::DropTableResponse>; return SendLocalRpcRequestNoResult<TEvDropTableRequest>(std::move(dropTable), Database, GetTokenCompat()); } diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp index 7189614b8e8..cf136b19bc6 100644 --- a/ydb/services/ydb/ydb_table.cpp +++ b/ydb/services/ydb/ydb_table.cpp @@ -1,8 +1,8 @@ #include "ydb_table.h" +#include <ydb/core/grpc_services/service_table.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> namespace NKikimr { namespace NGRpcService { @@ -33,90 +33,64 @@ void TGRpcYdbTableService::DecRequest() { void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); -#ifdef ADD_REQUEST -#error ADD_REQUEST macro already defined +#ifdef ADD_REQUEST_LIMIT +#error ADD_REQUEST_LIMIT macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("table", #NAME))->Run(); - -#define ADD_BYTES_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("table", #NAME))->Run(); - - ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateSessionRequest(ctx)); - }) - ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDeleteSessionRequest(ctx)); - }) - ADD_REQUEST(KeepAlive, KeepAliveRequest, KeepAliveResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvKeepAliveRequest(ctx)); - }) - ADD_REQUEST(AlterTable, AlterTableRequest, AlterTableResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvAlterTableRequest(ctx)); - }) - ADD_REQUEST(CreateTable, CreateTableRequest, CreateTableResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateTableRequest(ctx)); - }) - ADD_REQUEST(DropTable, DropTableRequest, DropTableResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDropTableRequest(ctx)); - }) - ADD_BYTES_REQUEST(StreamReadTable, ReadTableRequest, ReadTableResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvReadTableRequest(ctx)); - }) - ADD_REQUEST(DescribeTable, DescribeTableRequest, DescribeTableResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTableRequest(ctx)); - }) - ADD_REQUEST(CopyTable, CopyTableRequest, CopyTableResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCopyTableRequest(ctx)); - }) - ADD_REQUEST(CopyTables, CopyTablesRequest, CopyTablesResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCopyTablesRequest(ctx)); - }) - ADD_REQUEST(RenameTables, RenameTablesRequest, RenameTablesResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvRenameTablesRequest(ctx)); - }) - ADD_REQUEST(ExplainDataQuery, ExplainDataQueryRequest, ExplainDataQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvExplainDataQueryRequest(ctx)); - }) - ADD_REQUEST(PrepareDataQuery, PrepareDataQueryRequest, PrepareDataQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvPrepareDataQueryRequest(ctx)); - }) - ADD_REQUEST(ExecuteDataQuery, ExecuteDataQueryRequest, ExecuteDataQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvExecuteDataQueryRequest(ctx)); - }) - ADD_REQUEST(ExecuteSchemeQuery, ExecuteSchemeQueryRequest, ExecuteSchemeQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvExecuteSchemeQueryRequest(ctx)); - }) - ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvBeginTransactionRequest(ctx)); - }) - ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvCommitTransactionRequest(ctx)); - }) - ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvRollbackTransactionRequest(ctx)); - }) - ADD_REQUEST(DescribeTableOptions, DescribeTableOptionsRequest, DescribeTableOptionsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTableOptionsRequest(ctx)); - }) - ADD_REQUEST(BulkUpsert, BulkUpsertRequest, BulkUpsertResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvBulkUpsertRequest(ctx)); - }) - ADD_REQUEST(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvStreamExecuteScanQueryRequest(ctx)); - }) - -#undef ADD_REQUEST + +#ifdef ADD_STREAM_REQUEST_LIMIT +#error ADD_STREAM_REQUEST_LIMIT macro already defined +#endif + +#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \ + MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ + }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("scheme", #NAME))->Run(); + +#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE) \ + MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ + }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("scheme", #NAME))->Run(); + + ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps) + ADD_REQUEST_LIMIT(KeepAlive, DoKeepAliveRequest, Rps) + ADD_REQUEST_LIMIT(AlterTable, DoAlterTableRequest, Rps) + ADD_REQUEST_LIMIT(CreateTable, DoCreateTableRequest, Rps) + ADD_REQUEST_LIMIT(DropTable, DoDropTableRequest, Rps) + ADD_REQUEST_LIMIT(DescribeTable, DoDescribeTableRequest, Rps) + ADD_REQUEST_LIMIT(CopyTable, DoCopyTableRequest, Rps) + ADD_REQUEST_LIMIT(CopyTables, DoCopyTablesRequest, Rps) + ADD_REQUEST_LIMIT(RenameTables, DoRenameTablesRequest, Rps) + ADD_REQUEST_LIMIT(ExplainDataQuery, DoExplainDataQueryRequest, Rps) + ADD_REQUEST_LIMIT(ExecuteSchemeQuery, DoExecuteSchemeQueryRequest, Rps) + ADD_REQUEST_LIMIT(BeginTransaction, DoBeginTransactionRequest, Rps) + ADD_REQUEST_LIMIT(DescribeTableOptions, DoDescribeTableOptionsRequest, Rps) + + ADD_REQUEST_LIMIT(DeleteSession, DoDeleteSessionRequest, Off) + ADD_REQUEST_LIMIT(CommitTransaction, DoCommitTransactionRequest, Off) + ADD_REQUEST_LIMIT(RollbackTransaction, DoRollbackTransactionRequest, Off) + + + ADD_REQUEST_LIMIT(PrepareDataQuery, DoPrepareDataQueryRequest, Ru) + ADD_REQUEST_LIMIT(ExecuteDataQuery, DoExecuteDataQueryRequest, Ru) + ADD_REQUEST_LIMIT(BulkUpsert, DoBulkUpsertRequest, Ru) + + ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress) + ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress) + +#undef ADD_REQUEST_LIMIT +#undef ADD_STREAM_REQUEST_LIMIT } } // namespace NGRpcService |