diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-03-24 14:13:35 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-03-24 14:13:35 +0300 |
commit | b9fd46f9ff1a358339996e962e96b658677983c7 (patch) | |
tree | 23325230d5d56d2e976596d26471cb17e3694e57 | |
parent | 271ac8fd7cc6c125b34b2bce83590db2b2c7e2d7 (diff) | |
download | ydb-b9fd46f9ff1a358339996e962e96b658677983c7.tar.gz |
KIKIMR-13646 Invert grpc_request_proxy dependence for long tx service
ref:04448f39ddc9e973c6db8ab118d319008c6e9646
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 5 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.h | 5 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 6 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 83 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_longtx.h | 17 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 1 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_long_tx.cpp | 43 |
7 files changed, 84 insertions, 76 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 00b8369643..deda24cdf0 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -602,11 +602,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvKikhouseRefreshSnapshotRequest, PreHandle); HFunc(TEvKikhouseDiscardSnapshotRequest, PreHandle); HFunc(TEvCoordinationSessionRequest, PreHandle); - HFunc(TEvLongTxBeginRequest, PreHandle); - HFunc(TEvLongTxCommitRequest, PreHandle); - HFunc(TEvLongTxRollbackRequest, PreHandle); - HFunc(TEvLongTxWriteRequest, PreHandle); - HFunc(TEvLongTxReadRequest, PreHandle); HFunc(TEvProxyRuntimeEvent, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index a6e5174c35..adeabb56a2 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -85,11 +85,6 @@ protected: void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvLongTxCommitRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvLongTxRollbackRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvLongTxWriteRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvLongTxReadRequest::TPtr& ev, const TActorContext& ctx); TActorId DiscoveryCacheActorID; }; diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 7fce2bb27c..9224eed750 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -23,7 +23,6 @@ #include <ydb/public/api/protos/yq.pb.h> #include <ydb/public/api/grpc/draft/dummy.pb.h> -#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h> #include <ydb/public/lib/operation_id/operation_id.h> @@ -82,11 +81,6 @@ using TEvKikhouseDiscardSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKi using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>; using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>; -using TEvLongTxBeginRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxBegin, Ydb::LongTx::BeginTransactionRequest, Ydb::LongTx::BeginTransactionResponse, true>; -using TEvLongTxCommitRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxCommit, Ydb::LongTx::CommitTransactionRequest, Ydb::LongTx::CommitTransactionResponse, true>; -using TEvLongTxRollbackRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRollback, Ydb::LongTx::RollbackTransactionRequest, Ydb::LongTx::RollbackTransactionResponse, true>; -using TEvLongTxWriteRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxWrite, Ydb::LongTx::WriteRequest, Ydb::LongTx::WriteResponse, true>; -using TEvLongTxReadRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRead, Ydb::LongTx::ReadRequest, Ydb::LongTx::ReadResponse, true>; } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 442972f993..4da9d39957 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -1,8 +1,10 @@ -#include "rpc_calls.h" #include "rpc_common.h" #include "rpc_deferrable.h" -#include "grpc_request_proxy.h" +#include "service_longtx.h" +#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h> + +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> @@ -18,6 +20,17 @@ namespace NKikimr { namespace { +using TEvLongTxBeginRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::BeginTransactionRequest, + Ydb::LongTx::BeginTransactionResponse>; +using TEvLongTxCommitRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::CommitTransactionRequest, + Ydb::LongTx::CommitTransactionResponse>; +using TEvLongTxRollbackRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::RollbackTransactionRequest, + Ydb::LongTx::RollbackTransactionResponse>; +using TEvLongTxWriteRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::WriteRequest, + Ydb::LongTx::WriteResponse>; +using TEvLongTxReadRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::ReadRequest, + Ydb::LongTx::ReadResponse>; + std::shared_ptr<arrow::Schema> ExtractArrowSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { TVector<std::pair<TString, NScheme::TTypeId>> columns; for (auto& col : schema.GetColumns()) { @@ -93,7 +106,7 @@ THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TC } namespace NGRpcService { - +using namespace NActors; using namespace NLongTxService; class TLongTxBeginRPC : public TActorBootstrapped<TLongTxBeginRPC> { @@ -104,14 +117,14 @@ public: return NKikimrServices::TActivity::GRPC_REQ; } - explicit TLongTxBeginRPC(TAutoPtr<TEvLongTxBeginRequest> request) + explicit TLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> request) : TBase() - , Request(request.Release()) + , Request(std::move(request)) , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))) {} void Bootstrap() { - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxBeginRequest::GetProtoRequest(Request); NKikimrLongTxService::TEvBeginTx::EMode mode = {}; switch (req->tx_type()) { @@ -162,7 +175,7 @@ private: } private: - std::unique_ptr<TEvLongTxBeginRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; TString DatabaseName; }; @@ -175,14 +188,14 @@ public: return NKikimrServices::TActivity::GRPC_REQ; } - explicit TLongTxCommitRPC(TAutoPtr<TEvLongTxCommitRequest> request) + explicit TLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> request) : TBase() - , Request(request.Release()) + , Request(std::move(request)) { } void Bootstrap() { - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxCommitRequest::GetProtoRequest(Request); TString errMsg; if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { @@ -215,7 +228,7 @@ private: } Ydb::LongTx::CommitTransactionResult result; - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxCommitRequest::GetProtoRequest(Request); result.set_tx_id(req->tx_id()); ReplySuccess(result); } @@ -234,7 +247,7 @@ private: } private: - std::unique_ptr<TEvLongTxCommitRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; TLongTxId LongTxId; }; @@ -247,14 +260,14 @@ public: return NKikimrServices::TActivity::GRPC_REQ; } - explicit TLongTxRollbackRPC(TAutoPtr<TEvLongTxRollbackRequest> request) + explicit TLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> request) : TBase() - , Request(request.Release()) + , Request(std::move(request)) { } void Bootstrap() { - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxRollbackRequest::GetProtoRequest(Request); TString errMsg; if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { @@ -287,7 +300,7 @@ private: } Ydb::LongTx::RollbackTransactionResult result; - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxRollbackRequest::GetProtoRequest(Request); result.set_tx_id(req->tx_id()); ReplySuccess(result); } @@ -306,7 +319,7 @@ private: } private: - std::unique_ptr<TEvLongTxRollbackRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; TLongTxId LongTxId; }; @@ -562,13 +575,13 @@ public: return NKikimrServices::TActivity::GRPC_REQ; } - explicit TLongTxWriteRPC(TAutoPtr<IRequestOpCtx> request) + explicit TLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> request) : TBase(request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())), TEvLongTxWriteRequest::GetProtoRequest(request)->path(), request->GetInternalToken(), TLongTxId(), TEvLongTxWriteRequest::GetProtoRequest(request)->dedup_id()) - , Request(request.Release()) + , Request(std::move(request)) , SchemeCache(MakeSchemeCacheID()) { } @@ -652,7 +665,7 @@ private: template<> IActor* TEvLongTxWriteRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { - return new TLongTxWriteRPC(msg); + return new TLongTxWriteRPC(std::unique_ptr<NKikimr::NGRpcService::IRequestOpCtx>(msg)); } // LongTx Write implementation called from the inside of YDB (e.g. as a part of BulkUpsert call) @@ -739,9 +752,9 @@ public: return NKikimrServices::TActivity::GRPC_REQ; } - explicit TLongTxReadRPC(TAutoPtr<TEvLongTxReadRequest> request) + explicit TLongTxReadRPC(std::unique_ptr<IRequestOpCtx> request) : TBase() - , Request(request.Release()) + , Request(std::move(request)) , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))) , SchemeCache(MakeSchemeCacheID()) , LeaderPipeCache(MakePipePeNodeCacheID(false)) @@ -751,7 +764,7 @@ public: } void Bootstrap() { - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request); if (const TString& internalToken = Request->GetInternalToken()) { UserToken.emplace(internalToken); @@ -929,7 +942,7 @@ private: Ydb::LongTx::ReadResult* MakeResult(ui64 outChunk, bool finished) const { auto result = TEvLongTxReadRequest::AllocateResult<Ydb::LongTx::ReadResult>(Request); - const auto* req = Request->GetProtoRequest(); + const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request); result->set_tx_id(req->tx_id()); result->set_path(req->path()); result->set_chunk(outChunk); @@ -957,7 +970,7 @@ private: } private: - std::unique_ptr<TEvLongTxReadRequest> Request; + std::unique_ptr<IRequestOpCtx> Request; TString DatabaseName; TActorId SchemeCache; TActorId LeaderPipeCache; @@ -974,24 +987,24 @@ private: // -void TGRpcRequestProxy::Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TLongTxBeginRPC(ev->Release().Release())); +void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TLongTxBeginRPC(std::move(p))); } -void TGRpcRequestProxy::Handle(TEvLongTxCommitRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TLongTxCommitRPC(ev->Release().Release())); +void DoLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TLongTxCommitRPC(std::move(p))); } -void TGRpcRequestProxy::Handle(TEvLongTxRollbackRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TLongTxRollbackRPC(ev->Release().Release())); +void DoLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TLongTxRollbackRPC(std::move(p))); } -void TGRpcRequestProxy::Handle(TEvLongTxWriteRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TLongTxWriteRPC(ev->Release().Release())); +void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TLongTxWriteRPC(std::move(p))); } -void TGRpcRequestProxy::Handle(TEvLongTxReadRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TLongTxReadRPC(ev->Release().Release())); +void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TLongTxReadRPC(std::move(p))); } } diff --git a/ydb/core/grpc_services/service_longtx.h b/ydb/core/grpc_services/service_longtx.h new file mode 100644 index 0000000000..58a3c8f269 --- /dev/null +++ b/ydb/core/grpc_services/service_longtx.h @@ -0,0 +1,17 @@ +#pragma once +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 0aaee7e7f5..6971f78e62 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -17,6 +17,7 @@ #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h> #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp index 908c0fd332..0ae6bcfb74 100644 --- a/ydb/services/ydb/ydb_long_tx.cpp +++ b/ydb/services/ydb/ydb_long_tx.cpp @@ -1,8 +1,8 @@ #include "ydb_long_tx.h" +#include <ydb/core/grpc_services/service_longtx.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/grpc_request_proxy.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/base/base.h> namespace NKikimr { namespace NGRpcService { @@ -40,29 +40,22 @@ void TGRpcYdbLongTxService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::LongTx::IN, Ydb::LongTx::OUT, TGRpcYdbLongTxService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::LongTx::V1::LongTxService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("long_tx", #NAME))->Run(); - - ADD_REQUEST(BeginTx, BeginTransactionRequest, BeginTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxBeginRequest(ctx)); - }) - ADD_REQUEST(CommitTx, CommitTransactionRequest, CommitTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxCommitRequest(ctx)); - }) - ADD_REQUEST(RollbackTx, RollbackTransactionRequest, RollbackTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxRollbackRequest(ctx)); - }) - ADD_REQUEST(Write, WriteRequest, WriteResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxWriteRequest(ctx)); - }) - ADD_REQUEST(Read, ReadRequest, ReadResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxReadRequest(ctx)); - }) +#define ADD_REQUEST(NAME, REQ, CB) \ + MakeIntrusive<TGRpcRequest<Ydb::LongTx::REQ##Request, Ydb::LongTx::REQ##Response, TGRpcYdbLongTxService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Ydb::LongTx::REQ##Request, Ydb::LongTx::REQ##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Off, nullptr})); \ + }, &Ydb::LongTx::V1::LongTxService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("long_tx", #NAME))->Run(); + + ADD_REQUEST(BeginTx, BeginTransaction, DoLongTxBeginRPC) + ADD_REQUEST(CommitTx, CommitTransaction, DoLongTxCommitRPC) + ADD_REQUEST(RollbackTx, RollbackTransaction, DoLongTxRollbackRPC) + ADD_REQUEST(Write, Write, DoLongTxWriteRPC) + ADD_REQUEST(Read, Read, DoLongTxReadRPC) #undef ADD_REQUEST } |