diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-06 18:18:25 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-04-06 18:18:25 +0300 |
commit | 61b49a878a33977897d44f72118d6791ac3794fa (patch) | |
tree | 8a497ee545e4040a625884428e18d9a836639414 | |
parent | 01fbacb386809436dfa331780875aed72cb76118 (diff) | |
download | ydb-61b49a878a33977897d44f72118d6791ac3794fa.tar.gz |
Invert grpc_request_proxy dependence for export and import services. KIKIMR-13646
ref:54e6a05020d029a1e809bd8d78449ae1f0c617ae
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 6 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_export.cpp | 15 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_import.cpp | 10 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_import_data.cpp | 9 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_export.h | 15 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_import.h | 15 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_export.cpp | 18 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_import.cpp | 19 |
10 files changed, 73 insertions, 42 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 8ed61dcf41..7cd5e3c9ec 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -580,10 +580,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvPQAddReadRuleRequest, PreHandle); HFunc(TEvPQRemoveReadRuleRequest, PreHandle); HFunc(TEvPQDescribeTopicRequest, PreHandle); - HFunc(TEvExportToYtRequest, PreHandle); - HFunc(TEvExportToS3Request, PreHandle); - HFunc(TEvImportFromS3Request, PreHandle); - HFunc(TEvImportDataRequest, PreHandle); HFunc(TEvDiscoverPQClustersRequest, PreHandle); HFunc(TEvCreateRateLimiterResource, PreHandle); HFunc(TEvAlterRateLimiterResource, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index 0cbf963cda..500ea9f779 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -62,10 +62,6 @@ protected: void Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvExportToYtRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvExportToS3Request::TPtr& ev, const TActorContext& ctx); - void Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx); - void Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index e092fa7759..949e353343 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -11,8 +11,6 @@ #include <ydb/public/api/protos/ydb_coordination.pb.h> #include <ydb/public/api/protos/ydb_discovery.pb.h> #include <ydb/public/api/protos/ydb_experimental.pb.h> -#include <ydb/public/api/protos/ydb_export.pb.h> -#include <ydb/public/api/protos/ydb_import.pb.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/api/protos/ydb_s3_internal.pb.h> @@ -59,10 +57,6 @@ using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::Ev using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>; using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>; -using TEvExportToYtRequest = TGRpcRequestValidationWrapper<TRpcServices::EvExportToYt, Ydb::Export::ExportToYtRequest, Ydb::Export::ExportToYtResponse, true>; -using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExportToS3, Ydb::Export::ExportToS3Request, Ydb::Export::ExportToS3Response, true>; -using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>; -using TEvImportDataRequest = TGRpcRequestValidationWrapper<TRpcServices::EvImportData, Ydb::Import::ImportDataRequest, Ydb::Import::ImportDataResponse, true>; using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>; using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>; diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp index 352d5f6bc7..6b846bea5f 100644 --- a/ydb/core/grpc_services/rpc_export.cpp +++ b/ydb/core/grpc_services/rpc_export.cpp @@ -1,8 +1,10 @@ +#include "service_export.h" #include "grpc_request_proxy.h" #include "rpc_export_base.h" #include "rpc_calls.h" #include "rpc_operation_request_base.h" +#include <ydb/public/api/protos/ydb_export.pb.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <library/cpp/actors/core/hfunc.h> @@ -18,6 +20,11 @@ using namespace NSchemeShard; using namespace NKikimrIssues; using namespace Ydb; +using TEvExportToYtRequest = TGrpcRequestOperationCall<Ydb::Export::ExportToYtRequest, + Ydb::Export::ExportToYtResponse>; +using TEvExportToS3Request = TGrpcRequestOperationCall<Ydb::Export::ExportToS3Request, + Ydb::Export::ExportToS3Response>; + template <typename TDerived, typename TEvRequest> class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TExportConv { TStringBuf GetLogPrefix() const override { @@ -210,12 +217,12 @@ public: using TExportRPC::TExportRPC; }; -void TGRpcRequestProxy::Handle(TEvExportToYtRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TExportToYtRPC(ev->Release().Release())); +void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TExportToYtRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvExportToS3Request::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TExportToS3RPC(ev->Release().Release())); +void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TExportToS3RPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_import.cpp b/ydb/core/grpc_services/rpc_import.cpp index 6fc610c5d0..7f95489106 100644 --- a/ydb/core/grpc_services/rpc_import.cpp +++ b/ydb/core/grpc_services/rpc_import.cpp @@ -1,8 +1,11 @@ +#include "service_import.h" #include "grpc_request_proxy.h" #include "rpc_import_base.h" #include "rpc_calls.h" #include "rpc_operation_request_base.h" +#include <ydb/public/api/protos/ydb_import.pb.h> + #include <ydb/core/tx/schemeshard/schemeshard_import.h> #include <library/cpp/actors/core/hfunc.h> @@ -18,6 +21,9 @@ using namespace NSchemeShard; using namespace NKikimrIssues; using namespace Ydb; +using TEvImportFromS3Request = TGrpcRequestOperationCall<Ydb::Import::ImportFromS3Request, + Ydb::Import::ImportFromS3Response>; + template <typename TDerived, typename TEvRequest> class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TImportConv { TStringBuf GetLogPrefix() const override { @@ -81,8 +87,8 @@ public: using TImportRPC::TImportRPC; }; -void TGRpcRequestProxy::Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TImportFromS3RPC(ev->Release().Release())); +void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TImportFromS3RPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index 87c2d39c25..c782b04de7 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -3,6 +3,8 @@ #include "rpc_common.h" #include "rpc_request_base.h" +#include <ydb/public/api/protos/ydb_import.pb.h> + #include <ydb/core/base/appdata.h> #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/scheme/scheme_tabledefs.h> @@ -29,6 +31,9 @@ using namespace NActors; using namespace NKikimrIssues; using namespace Ydb; +using TEvImportDataRequest = TGrpcRequestOperationCall<Ydb::Import::ImportDataRequest, + Ydb::Import::ImportDataResponse>; + class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataRequest, true> { using TNavigate = NSchemeCache::TSchemeCacheNavigate; using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet; @@ -431,8 +436,8 @@ private: }; // TImportDataRPC -void TGRpcRequestProxy::Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TImportDataRPC(ev->Release().Release())); +void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TImportDataRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/service_export.h b/ydb/core/grpc_services/service_export.h new file mode 100644 index 0000000000..3a140b5806 --- /dev/null +++ b/ydb/core/grpc_services/service_export.h @@ -0,0 +1,15 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/grpc_services/service_import.h b/ydb/core/grpc_services/service_import.h new file mode 100644 index 0000000000..59b234471e --- /dev/null +++ b/ydb/core/grpc_services/service_import.h @@ -0,0 +1,15 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/services/ydb/ydb_export.cpp b/ydb/services/ydb/ydb_export.cpp index 7a0933ce55..4a180db6be 100644 --- a/ydb/services/ydb/ydb_export.cpp +++ b/ydb/services/ydb/ydb_export.cpp @@ -1,8 +1,8 @@ #include "ydb_export.h" +#include <ydb/core/grpc_services/service_export.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> namespace NKikimr { namespace NGRpcService { @@ -37,20 +37,18 @@ void TGRpcYdbExportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ +#define ADD_REQUEST(NAME, IN, OUT, CB) \ MakeIntrusive<TGRpcRequest<Ydb::Export::IN, Ydb::Export::OUT, TGRpcYdbExportService>>(this, &Service_, CQ_, \ [this](NGrpc::IRequestContextBase *ctx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new NGRpcService::TGrpcRequestOperationCall<Ydb::Export::IN, Ydb::Export::OUT> \ + (ctx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Off, nullptr})); \ }, &Ydb::Export::V1::ExportService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("export", #NAME))->Run(); - ADD_REQUEST(ExportToYt, ExportToYtRequest, ExportToYtResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvExportToYtRequest(ctx)); - }) - ADD_REQUEST(ExportToS3, ExportToS3Request, ExportToS3Response, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvExportToS3Request(ctx)); - }) + ADD_REQUEST(ExportToYt, ExportToYtRequest, ExportToYtResponse, DoExportToYtRequest); + ADD_REQUEST(ExportToS3, ExportToS3Request, ExportToS3Response, DoExportToS3Request); #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_import.cpp b/ydb/services/ydb/ydb_import.cpp index c27578329d..ba7f928915 100644 --- a/ydb/services/ydb/ydb_import.cpp +++ b/ydb/services/ydb/ydb_import.cpp @@ -1,8 +1,8 @@ #include "ydb_import.h" +#include <ydb/core/grpc_services/service_import.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> namespace NKikimr { namespace NGRpcService { @@ -37,20 +37,19 @@ void TGRpcYdbImportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ +#define ADD_REQUEST(NAME, IN, OUT, CB) \ MakeIntrusive<TGRpcRequest<Ydb::Import::IN, Ydb::Import::OUT, TGRpcYdbImportService>>(this, &Service_, CQ_, \ [this](NGrpc::IRequestContextBase *ctx) { \ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new NGRpcService::TGrpcRequestOperationCall<Ydb::Import::IN, Ydb::Import::OUT> \ + (ctx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Off, nullptr})); \ }, &Ydb::Import::V1::ImportService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("import", #NAME))->Run(); - ADD_REQUEST(ImportFromS3, ImportFromS3Request, ImportFromS3Response, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvImportFromS3Request(ctx)); - }) - ADD_REQUEST(ImportData, ImportDataRequest, ImportDataResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvImportDataRequest(ctx)); - }) + ADD_REQUEST(ImportFromS3, ImportFromS3Request, ImportFromS3Response, DoImportFromS3Request); + ADD_REQUEST(ImportData, ImportDataRequest, ImportDataResponse, DoImportDataRequest); + #undef ADD_REQUEST } |