aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <mdartemenko@yandex-team.com>2022-08-10 10:00:56 +0300
committermdartemenko <mdartemenko@yandex-team.com>2022-08-10 10:00:56 +0300
commita1b5c2e7f704c40f99d96b1df8ccdb75b05a40df (patch)
tree414f25ddb305a7e16304a6d444400103b9138c10
parentd14f94dc62d1adb69305de5e1ad7194f19eb09e3 (diff)
downloadydb-a1b5c2e7f704c40f99d96b1df8ccdb75b05a40df.tar.gz
fix loss of orbit due to an error in executer
-rw-r--r--ydb/core/kqp/common/kqp_lwtrace_probes.h36
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h1
-rw-r--r--ydb/core/kqp/executer/kqp_literal_executer.cpp1
-rw-r--r--ydb/core/kqp/kqp_compile_request.cpp8
-rw-r--r--ydb/core/kqp/kqp_compile_service.cpp16
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp8
6 files changed, 43 insertions, 27 deletions
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h
index e197c32bed..bc257ff1bf 100644
--- a/ydb/core/kqp/common/kqp_lwtrace_probes.h
+++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h
@@ -38,6 +38,9 @@ struct TQueryAction {
PROBE(KqpSessionPhyQueryTxResponse, GROUPS("KQP"), \
TYPES(ui64, ui32), \
NAMES("currentTx", "resultsSize")) \
+ PROBE(KqpSessionSendRollback, GROUPS("KQP"), \
+ TYPES(ui64), \
+ NAMES("currentTx")) \
PROBE(KqpSessionReplySuccess, GROUPS("KQP"), \
TYPES(ui64), \
NAMES("responseArenaSpaceUsed")) \
@@ -45,23 +48,32 @@ struct TQueryAction {
TYPES(TString), \
NAMES("errMsg")) \
PROBE(KqpCompileRequestBootstrap, GROUPS("KQP"), \
- TYPES(TString, ui64), \
- NAMES("userSid", "queryHash")) \
+ TYPES(TString), \
+ NAMES("userSid")) \
PROBE(KqpCompileRequestHandleServiceReply, GROUPS("KQP"), \
- TYPES(TString, ui64), \
- NAMES("userSid", "queryHash")) \
+ TYPES(TString), \
+ NAMES("userSid")) \
PROBE(KqpCompileServiceHandleRequest, GROUPS("KQP"), \
- TYPES(TString, ui64), \
- NAMES("userSid", "queryHash")) \
+ TYPES(TString), \
+ NAMES("userSid")) \
PROBE(KqpCompileServiceEnqueued, GROUPS("KQP"), \
- TYPES(TString, ui64), \
- NAMES("userSid", "queryHash")) \
+ TYPES(TString), \
+ NAMES("userSid")) \
PROBE(KqpCompileServiceGetCompilation, GROUPS("KQP"), \
- TYPES(TString, ui64, TString), \
- NAMES("userSid", "queryHash", "compileActor")) \
+ TYPES(TString, TString), \
+ NAMES("userSid", "compileActor")) \
PROBE(KqpCompileServiceReply, GROUPS("KQP"), \
- TYPES(TString, ui64, TString), \
- NAMES("userSid", "queryHash", "compileResult")) \
+ TYPES(TString, TString), \
+ NAMES("userSid", "compileResult")) \
+ PROBE(KqpCompileServiceReplyFromCache, GROUPS("KQP"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(KqpCompileServiceReplyError, GROUPS("KQP"), \
+ TYPES(), \
+ NAMES()) \
+ PROBE(KqpCompileServiceReplyInternalError, GROUPS("KQP"), \
+ TYPES(), \
+ NAMES()) \
PROBE(KqpBaseExecuterHandleReady, GROUPS("KQP"), \
TYPES(ui64), \
NAMES("TxId")) \
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index 49754c6b8a..e1830b9f5b 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -604,7 +604,6 @@ protected:
}
LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
- ResponseEv->Orbit = std::move(ResponseEv->Orbit);
this->Send(Target, ResponseEv.release());
this->PassAway();
diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp
index 3a4935f859..5ca5367acc 100644
--- a/ydb/core/kqp/executer/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp
@@ -412,7 +412,6 @@ private:
response.MutableIssues()->Swap(issues);
LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
- ResponseEv->Orbit = std::move(ResponseEv->Orbit);
Send(Target, ResponseEv.release());
PassAway();
diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp
index 3bc7e0fed0..0d5f80ce40 100644
--- a/ydb/core/kqp/kqp_compile_request.cpp
+++ b/ydb/core/kqp/kqp_compile_request.cpp
@@ -41,8 +41,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
LWTRACK(KqpCompileRequestBootstrap,
Orbit,
- Query ? Query->UserSid : 0,
- Query ? Query->GetHash() : 0);
+ Query ? Query->UserSid : 0);
TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
@@ -61,8 +60,7 @@ public:
const auto& query = ev->Get()->CompileResult->Query;
LWTRACK(KqpCompileRequestHandleServiceReply,
ev->Get()->Orbit,
- query ? query->UserSid : 0,
- query ? query->GetHash() : 0);
+ query ? query->UserSid : 0);
auto compileResult = ev->Get()->CompileResult;
const auto& stats = ev->Get()->Stats;
@@ -96,7 +94,7 @@ public:
<< ", queryUid: " << compileResult.Uid);
auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query,
- Deadline, DbCounters);
+ Deadline, DbCounters, std::move(DeferredResponse->Orbit));
ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release());
DeferredResponse.Reset();
diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp
index a992fec7ea..2c1ee8e2f7 100644
--- a/ydb/core/kqp/kqp_compile_service.cpp
+++ b/ydb/core/kqp/kqp_compile_service.cpp
@@ -408,8 +408,7 @@ private:
const auto& query = ev->Get()->Query;
LWTRACK(KqpCompileServiceHandleRequest,
ev->Get()->Orbit,
- query ? query->UserSid : 0,
- query ? query->GetHash() : 0);
+ query ? query->UserSid : 0);
try {
PerformRequest(ev, ctx);
@@ -498,8 +497,7 @@ private:
LWTRACK(KqpCompileServiceEnqueued,
ev->Get()->Orbit,
- ev->Get()->Query ? ev->Get()->Query->UserSid : 0,
- ev->Get()->Query ? ev->Get()->Query->GetHash() : 0);
+ ev->Get()->Query ? ev->Get()->Query->UserSid : 0);
TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query),
@@ -549,7 +547,7 @@ private:
Counters->ReportCompileRequestCompile(dbCounters);
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
- true, request.UserToken, request.Deadline, dbCounters);
+ true, request.UserToken, request.Deadline, dbCounters, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit());
if (!RequestsQueue.Enqueue(std::move(compileRequest))) {
Counters->ReportCompileRequestRejected(dbCounters);
@@ -612,7 +610,7 @@ private:
auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query);
for (auto& request : requests) {
- LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, request.Query.GetHash(), compileActorId.ToString());
+ LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit));
}
} else {
@@ -621,7 +619,7 @@ private:
}
}
- LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileRequest.Query.GetHash(), compileActorId.ToString());
+ LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit));
}
catch (const std::exception& e) {
@@ -722,7 +720,6 @@ private:
LWTRACK(KqpCompileServiceReply,
orbit,
query ? query->UserSid : 0,
- query ? query->GetHash() : 0,
compileResult->Issues.ToString());
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response"
@@ -747,12 +744,14 @@ private:
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
+ LWTRACK(KqpCompileServiceReplyFromCache, orbit);
Reply(sender, compileResult, stats, ctx, std::move(orbit));
}
void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status,
const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit)
{
+ LWTRACK(KqpCompileServiceReplyError, orbit);
Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit));
}
@@ -762,6 +761,7 @@ private:
NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation.");
issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message));
+ LWTRACK(KqpCompileServiceReplyInternalError, orbit);
ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit));
}
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 46c12550eb..dcdc5a7450 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -1625,6 +1625,10 @@ public:
}
void SendRollbackRequest(TKqpTransactionContext* txCtx) {
+ if (QueryState) {
+ LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx);
+ }
+
auto request = PreparePhysicalRequest(nullptr);
request.EraseLocks = true;
@@ -1706,6 +1710,10 @@ public:
}
void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+ if (QueryState) {
+ QueryState->Orbit = std::move(ev->Get()->Orbit);
+ }
+
auto& response = ev->Get()->Record.GetResponse();
if (response.GetStatus() != Ydb::StatusIds::SUCCESS) {
TIssues issues;