diff options
author | ulya-sidorina <ulya-sidorina@yandex-team.ru> | 2022-03-14 15:53:23 +0300 |
---|---|---|
committer | ulya-sidorina <ulya-sidorina@yandex-team.ru> | 2022-03-14 15:53:23 +0300 |
commit | 09d5855347aeb13217d4bbdd9671630522173b64 (patch) | |
tree | cd7e7b9940a3204ee41482db4e18a9622d6cfe65 | |
parent | 6fa2b9df455f828d333446424ecf0bf29960e90a (diff) | |
download | ydb-09d5855347aeb13217d4bbdd9671630522173b64.tar.gz |
KIKIMR-14457: send request directly to executer if TxId is not needed
ref:a31dd64aca3dfacb2914c4707eeee42c1212bf6b
-rw-r--r-- | ydb/core/kqp/common/kqp_gateway.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_explain_prepared.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_ic_gateway.cpp | 18 |
3 files changed, 16 insertions, 4 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index ee72430d763..c3b167f5155 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -123,6 +123,7 @@ public: TKqpSnapshot Snapshot = TKqpSnapshot(); NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; TMaybe<NKikimrKqp::TRlPath> RlPath; + bool NeedTxId = true; }; struct TExecPhysicalResult : public TGenericResult { diff --git a/ydb/core/kqp/host/kqp_explain_prepared.cpp b/ydb/core/kqp/host/kqp_explain_prepared.cpp index 47db128b829..175aff12958 100644 --- a/ydb/core/kqp/host/kqp_explain_prepared.cpp +++ b/ydb/core/kqp/host/kqp_explain_prepared.cpp @@ -49,6 +49,7 @@ public: if (pure(*tx) && params) { IKqpGateway::TExecPhysicalRequest request; request.Transactions.emplace_back(std::move(tx), std::move(*params)); + request.NeedTxId = false; ExecuteFuture = Gateway->ExecutePure(std::move(request), {}); diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index 6ca1cf13e96..ba524a3cd82 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -831,15 +831,23 @@ public: return NKikimrServices::TActivity::KQP_EXEC_PHYSICAL_REQUEST_HANDLER; } - TKqpExecPhysicalRequestHandler(TRequest* request, bool streaming, const TActorId& target, TPromise<TResult> promise) + TKqpExecPhysicalRequestHandler(TRequest* request, bool streaming, const TActorId& target, TPromise<TResult> promise, bool needTxId) : Request(request) , Streaming(streaming) , Executer(request->ExecuterId) , Target(target) - , Promise(promise) {} + , Promise(promise) + , NeedTxId(needTxId) {} void Bootstrap(const TActorContext& ctx) { - ctx.Send(MakeTxProxyID(), this->Request.Release()); + if (NeedTxId) { + ctx.Send(MakeTxProxyID(), this->Request.Release()); + } else { + auto executerEv = MakeHolder<NKqp::TEvKqpExecuter::TEvTxRequest>(); + ActorIdToProto(ctx.SelfID, executerEv->Record.MutableTarget()); + executerEv->Record.MutableRequest()->SetTxId(0); + ctx.Send(Request->ExecuterId, executerEv.Release()); + } Become(&TKqpExecPhysicalRequestHandler::ProcessState); } @@ -939,6 +947,7 @@ private: TActorId Executer; TActorId Target; TPromise<TResult> Promise; + bool NeedTxId; }; template<typename TResult> @@ -2135,6 +2144,7 @@ private: TFuture<TExecPhysicalResult> ExecutePhysicalQueryInternal(TExecPhysicalRequest&& request, const TActorId& target, bool streaming) { + const bool needTxId = request.NeedTxId; auto executerActor = CreateKqpExecuter(std::move(request), Database, UserToken ? TMaybe<TString>(UserToken->Serialized) : Nothing(), Counters); auto executerId = RegisterActor(executerActor); @@ -2144,7 +2154,7 @@ private: auto promise = NewPromise<TExecPhysicalResult>(); auto ev = MakeHolder<TEvTxUserProxy::TEvProposeKqpTransaction>(executerId); - IActor* requestHandler = new TKqpExecPhysicalRequestHandler(ev.Release(), streaming, target, promise); + IActor* requestHandler = new TKqpExecPhysicalRequestHandler(ev.Release(), streaming, target, promise, needTxId); RegisterActor(requestHandler); return promise.GetFuture(); |