aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-06 18:18:25 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-06 18:18:25 +0300
commit61b49a878a33977897d44f72118d6791ac3794fa (patch)
tree8a497ee545e4040a625884428e18d9a836639414
parent01fbacb386809436dfa331780875aed72cb76118 (diff)
downloadydb-61b49a878a33977897d44f72118d6791ac3794fa.tar.gz
Invert grpc_request_proxy dependence for export and import services. KIKIMR-13646
ref:54e6a05020d029a1e809bd8d78449ae1f0c617ae
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp4
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h4
-rw-r--r--ydb/core/grpc_services/rpc_calls.h6
-rw-r--r--ydb/core/grpc_services/rpc_export.cpp15
-rw-r--r--ydb/core/grpc_services/rpc_import.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_import_data.cpp9
-rw-r--r--ydb/core/grpc_services/service_export.h15
-rw-r--r--ydb/core/grpc_services/service_import.h15
-rw-r--r--ydb/services/ydb/ydb_export.cpp18
-rw-r--r--ydb/services/ydb/ydb_import.cpp19
10 files changed, 73 insertions, 42 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 8ed61dcf41..7cd5e3c9ec 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -580,10 +580,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvPQAddReadRuleRequest, PreHandle);
HFunc(TEvPQRemoveReadRuleRequest, PreHandle);
HFunc(TEvPQDescribeTopicRequest, PreHandle);
- HFunc(TEvExportToYtRequest, PreHandle);
- HFunc(TEvExportToS3Request, PreHandle);
- HFunc(TEvImportFromS3Request, PreHandle);
- HFunc(TEvImportDataRequest, PreHandle);
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
HFunc(TEvCreateRateLimiterResource, PreHandle);
HFunc(TEvAlterRateLimiterResource, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index 0cbf963cda..500ea9f779 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -62,10 +62,6 @@ protected:
void Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvExportToYtRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvExportToS3Request::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx);
void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index e092fa7759..949e353343 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -11,8 +11,6 @@
#include <ydb/public/api/protos/ydb_coordination.pb.h>
#include <ydb/public/api/protos/ydb_discovery.pb.h>
#include <ydb/public/api/protos/ydb_experimental.pb.h>
-#include <ydb/public/api/protos/ydb_export.pb.h>
-#include <ydb/public/api/protos/ydb_import.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_s3_internal.pb.h>
@@ -59,10 +57,6 @@ using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::Ev
using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>;
using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>;
-using TEvExportToYtRequest = TGRpcRequestValidationWrapper<TRpcServices::EvExportToYt, Ydb::Export::ExportToYtRequest, Ydb::Export::ExportToYtResponse, true>;
-using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExportToS3, Ydb::Export::ExportToS3Request, Ydb::Export::ExportToS3Response, true>;
-using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>;
-using TEvImportDataRequest = TGRpcRequestValidationWrapper<TRpcServices::EvImportData, Ydb::Import::ImportDataRequest, Ydb::Import::ImportDataResponse, true>;
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>;
using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>;
diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp
index 352d5f6bc7..6b846bea5f 100644
--- a/ydb/core/grpc_services/rpc_export.cpp
+++ b/ydb/core/grpc_services/rpc_export.cpp
@@ -1,8 +1,10 @@
+#include "service_export.h"
#include "grpc_request_proxy.h"
#include "rpc_export_base.h"
#include "rpc_calls.h"
#include "rpc_operation_request_base.h"
+#include <ydb/public/api/protos/ydb_export.pb.h>
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -18,6 +20,11 @@ using namespace NSchemeShard;
using namespace NKikimrIssues;
using namespace Ydb;
+using TEvExportToYtRequest = TGrpcRequestOperationCall<Ydb::Export::ExportToYtRequest,
+ Ydb::Export::ExportToYtResponse>;
+using TEvExportToS3Request = TGrpcRequestOperationCall<Ydb::Export::ExportToS3Request,
+ Ydb::Export::ExportToS3Response>;
+
template <typename TDerived, typename TEvRequest>
class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TExportConv {
TStringBuf GetLogPrefix() const override {
@@ -210,12 +217,12 @@ public:
using TExportRPC::TExportRPC;
};
-void TGRpcRequestProxy::Handle(TEvExportToYtRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TExportToYtRPC(ev->Release().Release()));
+void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TExportToYtRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvExportToS3Request::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TExportToS3RPC(ev->Release().Release()));
+void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TExportToS3RPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_import.cpp b/ydb/core/grpc_services/rpc_import.cpp
index 6fc610c5d0..7f95489106 100644
--- a/ydb/core/grpc_services/rpc_import.cpp
+++ b/ydb/core/grpc_services/rpc_import.cpp
@@ -1,8 +1,11 @@
+#include "service_import.h"
#include "grpc_request_proxy.h"
#include "rpc_import_base.h"
#include "rpc_calls.h"
#include "rpc_operation_request_base.h"
+#include <ydb/public/api/protos/ydb_import.pb.h>
+
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -18,6 +21,9 @@ using namespace NSchemeShard;
using namespace NKikimrIssues;
using namespace Ydb;
+using TEvImportFromS3Request = TGrpcRequestOperationCall<Ydb::Import::ImportFromS3Request,
+ Ydb::Import::ImportFromS3Response>;
+
template <typename TDerived, typename TEvRequest>
class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, public TImportConv {
TStringBuf GetLogPrefix() const override {
@@ -81,8 +87,8 @@ public:
using TImportRPC::TImportRPC;
};
-void TGRpcRequestProxy::Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TImportFromS3RPC(ev->Release().Release()));
+void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TImportFromS3RPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp
index 87c2d39c25..c782b04de7 100644
--- a/ydb/core/grpc_services/rpc_import_data.cpp
+++ b/ydb/core/grpc_services/rpc_import_data.cpp
@@ -3,6 +3,8 @@
#include "rpc_common.h"
#include "rpc_request_base.h"
+#include <ydb/public/api/protos/ydb_import.pb.h>
+
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
@@ -29,6 +31,9 @@ using namespace NActors;
using namespace NKikimrIssues;
using namespace Ydb;
+using TEvImportDataRequest = TGrpcRequestOperationCall<Ydb::Import::ImportDataRequest,
+ Ydb::Import::ImportDataResponse>;
+
class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataRequest, true> {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet;
@@ -431,8 +436,8 @@ private:
}; // TImportDataRPC
-void TGRpcRequestProxy::Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TImportDataRPC(ev->Release().Release()));
+void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TImportDataRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/service_export.h b/ydb/core/grpc_services/service_export.h
new file mode 100644
index 0000000000..3a140b5806
--- /dev/null
+++ b/ydb/core/grpc_services/service_export.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoExportToYtRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoExportToS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+}
+}
diff --git a/ydb/core/grpc_services/service_import.h b/ydb/core/grpc_services/service_import.h
new file mode 100644
index 0000000000..59b234471e
--- /dev/null
+++ b/ydb/core/grpc_services/service_import.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoImportFromS3Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoImportDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+}
+}
diff --git a/ydb/services/ydb/ydb_export.cpp b/ydb/services/ydb/ydb_export.cpp
index 7a0933ce55..4a180db6be 100644
--- a/ydb/services/ydb/ydb_export.cpp
+++ b/ydb/services/ydb/ydb_export.cpp
@@ -1,8 +1,8 @@
#include "ydb_export.h"
+#include <ydb/core/grpc_services/service_export.h>
#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/grpc_services/grpc_request_proxy.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/base/base.h>
namespace NKikimr {
namespace NGRpcService {
@@ -37,20 +37,18 @@ void TGRpcYdbExportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
+#define ADD_REQUEST(NAME, IN, OUT, CB) \
MakeIntrusive<TGRpcRequest<Ydb::Export::IN, Ydb::Export::OUT, TGRpcYdbExportService>>(this, &Service_, CQ_, \
[this](NGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new NGRpcService::TGrpcRequestOperationCall<Ydb::Export::IN, Ydb::Export::OUT> \
+ (ctx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Off, nullptr})); \
}, &Ydb::Export::V1::ExportService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("export", #NAME))->Run();
- ADD_REQUEST(ExportToYt, ExportToYtRequest, ExportToYtResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvExportToYtRequest(ctx));
- })
- ADD_REQUEST(ExportToS3, ExportToS3Request, ExportToS3Response, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvExportToS3Request(ctx));
- })
+ ADD_REQUEST(ExportToYt, ExportToYtRequest, ExportToYtResponse, DoExportToYtRequest);
+ ADD_REQUEST(ExportToS3, ExportToS3Request, ExportToS3Response, DoExportToS3Request);
#undef ADD_REQUEST
}
diff --git a/ydb/services/ydb/ydb_import.cpp b/ydb/services/ydb/ydb_import.cpp
index c27578329d..ba7f928915 100644
--- a/ydb/services/ydb/ydb_import.cpp
+++ b/ydb/services/ydb/ydb_import.cpp
@@ -1,8 +1,8 @@
#include "ydb_import.h"
+#include <ydb/core/grpc_services/service_import.h>
#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/grpc_services/grpc_request_proxy.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/base/base.h>
namespace NKikimr {
namespace NGRpcService {
@@ -37,20 +37,19 @@ void TGRpcYdbImportService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
+#define ADD_REQUEST(NAME, IN, OUT, CB) \
MakeIntrusive<TGRpcRequest<Ydb::Import::IN, Ydb::Import::OUT, TGRpcYdbImportService>>(this, &Service_, CQ_, \
[this](NGrpc::IRequestContextBase *ctx) { \
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new NGRpcService::TGrpcRequestOperationCall<Ydb::Import::IN, Ydb::Import::OUT> \
+ (ctx, &CB, NGRpcService::TRequestAuxSettings{NGRpcService::TRateLimiterMode::Off, nullptr})); \
}, &Ydb::Import::V1::ImportService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("import", #NAME))->Run();
- ADD_REQUEST(ImportFromS3, ImportFromS3Request, ImportFromS3Response, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvImportFromS3Request(ctx));
- })
- ADD_REQUEST(ImportData, ImportDataRequest, ImportDataResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvImportDataRequest(ctx));
- })
+ ADD_REQUEST(ImportFromS3, ImportFromS3Request, ImportFromS3Response, DoImportFromS3Request);
+ ADD_REQUEST(ImportData, ImportDataRequest, ImportDataResponse, DoImportDataRequest);
+
#undef ADD_REQUEST
}