aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <ulya-sidorina@yandex-team.ru>2022-03-14 15:53:23 +0300
committerulya-sidorina <ulya-sidorina@yandex-team.ru>2022-03-14 15:53:23 +0300
commit09d5855347aeb13217d4bbdd9671630522173b64 (patch)
treecd7e7b9940a3204ee41482db4e18a9622d6cfe65
parent6fa2b9df455f828d333446424ecf0bf29960e90a (diff)
downloadydb-09d5855347aeb13217d4bbdd9671630522173b64.tar.gz
KIKIMR-14457: send request directly to executer if TxId is not needed
ref:a31dd64aca3dfacb2914c4707eeee42c1212bf6b
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h1
-rw-r--r--ydb/core/kqp/host/kqp_explain_prepared.cpp1
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp18
3 files changed, 16 insertions, 4 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index ee72430d763..c3b167f5155 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -123,6 +123,7 @@ public:
TKqpSnapshot Snapshot = TKqpSnapshot();
NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
TMaybe<NKikimrKqp::TRlPath> RlPath;
+ bool NeedTxId = true;
};
struct TExecPhysicalResult : public TGenericResult {
diff --git a/ydb/core/kqp/host/kqp_explain_prepared.cpp b/ydb/core/kqp/host/kqp_explain_prepared.cpp
index 47db128b829..175aff12958 100644
--- a/ydb/core/kqp/host/kqp_explain_prepared.cpp
+++ b/ydb/core/kqp/host/kqp_explain_prepared.cpp
@@ -49,6 +49,7 @@ public:
if (pure(*tx) && params) {
IKqpGateway::TExecPhysicalRequest request;
request.Transactions.emplace_back(std::move(tx), std::move(*params));
+ request.NeedTxId = false;
ExecuteFuture = Gateway->ExecutePure(std::move(request), {});
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index 6ca1cf13e96..ba524a3cd82 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -831,15 +831,23 @@ public:
return NKikimrServices::TActivity::KQP_EXEC_PHYSICAL_REQUEST_HANDLER;
}
- TKqpExecPhysicalRequestHandler(TRequest* request, bool streaming, const TActorId& target, TPromise<TResult> promise)
+ TKqpExecPhysicalRequestHandler(TRequest* request, bool streaming, const TActorId& target, TPromise<TResult> promise, bool needTxId)
: Request(request)
, Streaming(streaming)
, Executer(request->ExecuterId)
, Target(target)
- , Promise(promise) {}
+ , Promise(promise)
+ , NeedTxId(needTxId) {}
void Bootstrap(const TActorContext& ctx) {
- ctx.Send(MakeTxProxyID(), this->Request.Release());
+ if (NeedTxId) {
+ ctx.Send(MakeTxProxyID(), this->Request.Release());
+ } else {
+ auto executerEv = MakeHolder<NKqp::TEvKqpExecuter::TEvTxRequest>();
+ ActorIdToProto(ctx.SelfID, executerEv->Record.MutableTarget());
+ executerEv->Record.MutableRequest()->SetTxId(0);
+ ctx.Send(Request->ExecuterId, executerEv.Release());
+ }
Become(&TKqpExecPhysicalRequestHandler::ProcessState);
}
@@ -939,6 +947,7 @@ private:
TActorId Executer;
TActorId Target;
TPromise<TResult> Promise;
+ bool NeedTxId;
};
template<typename TResult>
@@ -2135,6 +2144,7 @@ private:
TFuture<TExecPhysicalResult> ExecutePhysicalQueryInternal(TExecPhysicalRequest&& request, const TActorId& target,
bool streaming)
{
+ const bool needTxId = request.NeedTxId;
auto executerActor = CreateKqpExecuter(std::move(request), Database,
UserToken ? TMaybe<TString>(UserToken->Serialized) : Nothing(), Counters);
auto executerId = RegisterActor(executerActor);
@@ -2144,7 +2154,7 @@ private:
auto promise = NewPromise<TExecPhysicalResult>();
auto ev = MakeHolder<TEvTxUserProxy::TEvProposeKqpTransaction>(executerId);
- IActor* requestHandler = new TKqpExecPhysicalRequestHandler(ev.Release(), streaming, target, promise);
+ IActor* requestHandler = new TKqpExecPhysicalRequestHandler(ev.Release(), streaming, target, promise, needTxId);
RegisterActor(requestHandler);
return promise.GetFuture();