aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-03-24 14:13:35 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-03-24 14:13:35 +0300
commitb9fd46f9ff1a358339996e962e96b658677983c7 (patch)
tree23325230d5d56d2e976596d26471cb17e3694e57
parent271ac8fd7cc6c125b34b2bce83590db2b2c7e2d7 (diff)
downloadydb-b9fd46f9ff1a358339996e962e96b658677983c7.tar.gz
KIKIMR-13646 Invert grpc_request_proxy dependence for long tx service
ref:04448f39ddc9e973c6db8ab118d319008c6e9646
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp5
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h5
-rw-r--r--ydb/core/grpc_services/rpc_calls.h6
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp83
-rw-r--r--ydb/core/grpc_services/service_longtx.h17
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h1
-rw-r--r--ydb/services/ydb/ydb_long_tx.cpp43
7 files changed, 84 insertions, 76 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 00b8369643..deda24cdf0 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -602,11 +602,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvKikhouseRefreshSnapshotRequest, PreHandle);
HFunc(TEvKikhouseDiscardSnapshotRequest, PreHandle);
HFunc(TEvCoordinationSessionRequest, PreHandle);
- HFunc(TEvLongTxBeginRequest, PreHandle);
- HFunc(TEvLongTxCommitRequest, PreHandle);
- HFunc(TEvLongTxRollbackRequest, PreHandle);
- HFunc(TEvLongTxWriteRequest, PreHandle);
- HFunc(TEvLongTxReadRequest, PreHandle);
HFunc(TEvProxyRuntimeEvent, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index a6e5174c35..adeabb56a2 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -85,11 +85,6 @@ protected:
void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvLongTxCommitRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvLongTxRollbackRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvLongTxWriteRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvLongTxReadRequest::TPtr& ev, const TActorContext& ctx);
TActorId DiscoveryCacheActorID;
};
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 7fce2bb27c..9224eed750 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -23,7 +23,6 @@
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/public/api/grpc/draft/dummy.pb.h>
-#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>
@@ -82,11 +81,6 @@ using TEvKikhouseDiscardSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKi
using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>;
using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>;
-using TEvLongTxBeginRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxBegin, Ydb::LongTx::BeginTransactionRequest, Ydb::LongTx::BeginTransactionResponse, true>;
-using TEvLongTxCommitRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxCommit, Ydb::LongTx::CommitTransactionRequest, Ydb::LongTx::CommitTransactionResponse, true>;
-using TEvLongTxRollbackRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRollback, Ydb::LongTx::RollbackTransactionRequest, Ydb::LongTx::RollbackTransactionResponse, true>;
-using TEvLongTxWriteRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxWrite, Ydb::LongTx::WriteRequest, Ydb::LongTx::WriteResponse, true>;
-using TEvLongTxReadRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRead, Ydb::LongTx::ReadRequest, Ydb::LongTx::ReadResponse, true>;
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 442972f993..4da9d39957 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -1,8 +1,10 @@
-#include "rpc_calls.h"
#include "rpc_common.h"
#include "rpc_deferrable.h"
-#include "grpc_request_proxy.h"
+#include "service_longtx.h"
+#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h>
+
+#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
@@ -18,6 +20,17 @@ namespace NKikimr {
namespace {
+using TEvLongTxBeginRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::BeginTransactionRequest,
+ Ydb::LongTx::BeginTransactionResponse>;
+using TEvLongTxCommitRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::CommitTransactionRequest,
+ Ydb::LongTx::CommitTransactionResponse>;
+using TEvLongTxRollbackRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::RollbackTransactionRequest,
+ Ydb::LongTx::RollbackTransactionResponse>;
+using TEvLongTxWriteRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::WriteRequest,
+ Ydb::LongTx::WriteResponse>;
+using TEvLongTxReadRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::ReadRequest,
+ Ydb::LongTx::ReadResponse>;
+
std::shared_ptr<arrow::Schema> ExtractArrowSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
TVector<std::pair<TString, NScheme::TTypeId>> columns;
for (auto& col : schema.GetColumns()) {
@@ -93,7 +106,7 @@ THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TC
}
namespace NGRpcService {
-
+using namespace NActors;
using namespace NLongTxService;
class TLongTxBeginRPC : public TActorBootstrapped<TLongTxBeginRPC> {
@@ -104,14 +117,14 @@ public:
return NKikimrServices::TActivity::GRPC_REQ;
}
- explicit TLongTxBeginRPC(TAutoPtr<TEvLongTxBeginRequest> request)
+ explicit TLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> request)
: TBase()
- , Request(request.Release())
+ , Request(std::move(request))
, DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())))
{}
void Bootstrap() {
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxBeginRequest::GetProtoRequest(Request);
NKikimrLongTxService::TEvBeginTx::EMode mode = {};
switch (req->tx_type()) {
@@ -162,7 +175,7 @@ private:
}
private:
- std::unique_ptr<TEvLongTxBeginRequest> Request;
+ std::unique_ptr<IRequestOpCtx> Request;
TString DatabaseName;
};
@@ -175,14 +188,14 @@ public:
return NKikimrServices::TActivity::GRPC_REQ;
}
- explicit TLongTxCommitRPC(TAutoPtr<TEvLongTxCommitRequest> request)
+ explicit TLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> request)
: TBase()
- , Request(request.Release())
+ , Request(std::move(request))
{
}
void Bootstrap() {
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxCommitRequest::GetProtoRequest(Request);
TString errMsg;
if (!LongTxId.ParseString(req->tx_id(), &errMsg)) {
@@ -215,7 +228,7 @@ private:
}
Ydb::LongTx::CommitTransactionResult result;
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxCommitRequest::GetProtoRequest(Request);
result.set_tx_id(req->tx_id());
ReplySuccess(result);
}
@@ -234,7 +247,7 @@ private:
}
private:
- std::unique_ptr<TEvLongTxCommitRequest> Request;
+ std::unique_ptr<IRequestOpCtx> Request;
TLongTxId LongTxId;
};
@@ -247,14 +260,14 @@ public:
return NKikimrServices::TActivity::GRPC_REQ;
}
- explicit TLongTxRollbackRPC(TAutoPtr<TEvLongTxRollbackRequest> request)
+ explicit TLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> request)
: TBase()
- , Request(request.Release())
+ , Request(std::move(request))
{
}
void Bootstrap() {
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxRollbackRequest::GetProtoRequest(Request);
TString errMsg;
if (!LongTxId.ParseString(req->tx_id(), &errMsg)) {
@@ -287,7 +300,7 @@ private:
}
Ydb::LongTx::RollbackTransactionResult result;
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxRollbackRequest::GetProtoRequest(Request);
result.set_tx_id(req->tx_id());
ReplySuccess(result);
}
@@ -306,7 +319,7 @@ private:
}
private:
- std::unique_ptr<TEvLongTxRollbackRequest> Request;
+ std::unique_ptr<IRequestOpCtx> Request;
TLongTxId LongTxId;
};
@@ -562,13 +575,13 @@ public:
return NKikimrServices::TActivity::GRPC_REQ;
}
- explicit TLongTxWriteRPC(TAutoPtr<IRequestOpCtx> request)
+ explicit TLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> request)
: TBase(request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())),
TEvLongTxWriteRequest::GetProtoRequest(request)->path(),
request->GetInternalToken(),
TLongTxId(),
TEvLongTxWriteRequest::GetProtoRequest(request)->dedup_id())
- , Request(request.Release())
+ , Request(std::move(request))
, SchemeCache(MakeSchemeCacheID())
{
}
@@ -652,7 +665,7 @@ private:
template<>
IActor* TEvLongTxWriteRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
- return new TLongTxWriteRPC(msg);
+ return new TLongTxWriteRPC(std::unique_ptr<NKikimr::NGRpcService::IRequestOpCtx>(msg));
}
// LongTx Write implementation called from the inside of YDB (e.g. as a part of BulkUpsert call)
@@ -739,9 +752,9 @@ public:
return NKikimrServices::TActivity::GRPC_REQ;
}
- explicit TLongTxReadRPC(TAutoPtr<TEvLongTxReadRequest> request)
+ explicit TLongTxReadRPC(std::unique_ptr<IRequestOpCtx> request)
: TBase()
- , Request(request.Release())
+ , Request(std::move(request))
, DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())))
, SchemeCache(MakeSchemeCacheID())
, LeaderPipeCache(MakePipePeNodeCacheID(false))
@@ -751,7 +764,7 @@ public:
}
void Bootstrap() {
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request);
if (const TString& internalToken = Request->GetInternalToken()) {
UserToken.emplace(internalToken);
@@ -929,7 +942,7 @@ private:
Ydb::LongTx::ReadResult* MakeResult(ui64 outChunk, bool finished) const {
auto result = TEvLongTxReadRequest::AllocateResult<Ydb::LongTx::ReadResult>(Request);
- const auto* req = Request->GetProtoRequest();
+ const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request);
result->set_tx_id(req->tx_id());
result->set_path(req->path());
result->set_chunk(outChunk);
@@ -957,7 +970,7 @@ private:
}
private:
- std::unique_ptr<TEvLongTxReadRequest> Request;
+ std::unique_ptr<IRequestOpCtx> Request;
TString DatabaseName;
TActorId SchemeCache;
TActorId LeaderPipeCache;
@@ -974,24 +987,24 @@ private:
//
-void TGRpcRequestProxy::Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TLongTxBeginRPC(ev->Release().Release()));
+void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TLongTxBeginRPC(std::move(p)));
}
-void TGRpcRequestProxy::Handle(TEvLongTxCommitRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TLongTxCommitRPC(ev->Release().Release()));
+void DoLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TLongTxCommitRPC(std::move(p)));
}
-void TGRpcRequestProxy::Handle(TEvLongTxRollbackRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TLongTxRollbackRPC(ev->Release().Release()));
+void DoLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TLongTxRollbackRPC(std::move(p)));
}
-void TGRpcRequestProxy::Handle(TEvLongTxWriteRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TLongTxWriteRPC(ev->Release().Release()));
+void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TLongTxWriteRPC(std::move(p)));
}
-void TGRpcRequestProxy::Handle(TEvLongTxReadRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TLongTxReadRPC(ev->Release().Release()));
+void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TLongTxReadRPC(std::move(p)));
}
}
diff --git a/ydb/core/grpc_services/service_longtx.h b/ydb/core/grpc_services/service_longtx.h
new file mode 100644
index 0000000000..58a3c8f269
--- /dev/null
+++ b/ydb/core/grpc_services/service_longtx.h
@@ -0,0 +1,17 @@
+#pragma once
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+}
+}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 0aaee7e7f5..6971f78e62 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -17,6 +17,7 @@
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h>
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp
index 908c0fd332..0ae6bcfb74 100644
--- a/ydb/services/ydb/ydb_long_tx.cpp
+++ b/ydb/services/ydb/ydb_long_tx.cpp
@@ -1,8 +1,8 @@
#include "ydb_long_tx.h"
+#include <ydb/core/grpc_services/service_longtx.h>
#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/grpc_services/grpc_request_proxy.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/base/base.h>
namespace NKikimr {
namespace NGRpcService {
@@ -40,29 +40,22 @@ void TGRpcYdbLongTxService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::LongTx::IN, Ydb::LongTx::OUT, TGRpcYdbLongTxService>>(this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
- }, &Ydb::LongTx::V1::LongTxService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("long_tx", #NAME))->Run();
-
- ADD_REQUEST(BeginTx, BeginTransactionRequest, BeginTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxBeginRequest(ctx));
- })
- ADD_REQUEST(CommitTx, CommitTransactionRequest, CommitTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxCommitRequest(ctx));
- })
- ADD_REQUEST(RollbackTx, RollbackTransactionRequest, RollbackTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxRollbackRequest(ctx));
- })
- ADD_REQUEST(Write, WriteRequest, WriteResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxWriteRequest(ctx));
- })
- ADD_REQUEST(Read, ReadRequest, ReadResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvLongTxReadRequest(ctx));
- })
+#define ADD_REQUEST(NAME, REQ, CB) \
+ MakeIntrusive<TGRpcRequest<Ydb::LongTx::REQ##Request, Ydb::LongTx::REQ##Response, TGRpcYdbLongTxService>> \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Ydb::LongTx::REQ##Request, Ydb::LongTx::REQ##Response> \
+ (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::Off, nullptr})); \
+ }, &Ydb::LongTx::V1::LongTxService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("long_tx", #NAME))->Run();
+
+ ADD_REQUEST(BeginTx, BeginTransaction, DoLongTxBeginRPC)
+ ADD_REQUEST(CommitTx, CommitTransaction, DoLongTxCommitRPC)
+ ADD_REQUEST(RollbackTx, RollbackTransaction, DoLongTxRollbackRPC)
+ ADD_REQUEST(Write, Write, DoLongTxWriteRPC)
+ ADD_REQUEST(Read, Read, DoLongTxReadRPC)
#undef ADD_REQUEST
}