diff options
author | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-10 10:00:56 +0300 |
---|---|---|
committer | mdartemenko <mdartemenko@yandex-team.com> | 2022-08-10 10:00:56 +0300 |
commit | a1b5c2e7f704c40f99d96b1df8ccdb75b05a40df (patch) | |
tree | 414f25ddb305a7e16304a6d444400103b9138c10 | |
parent | d14f94dc62d1adb69305de5e1ad7194f19eb09e3 (diff) | |
download | ydb-a1b5c2e7f704c40f99d96b1df8ccdb75b05a40df.tar.gz |
fix loss of orbit due to an error in executer
-rw-r--r-- | ydb/core/kqp/common/kqp_lwtrace_probes.h | 36 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_executer_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_literal_executer.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_request.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_service.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 8 |
6 files changed, 43 insertions, 27 deletions
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h index e197c32bed3..bc257ff1bfe 100644 --- a/ydb/core/kqp/common/kqp_lwtrace_probes.h +++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h @@ -38,6 +38,9 @@ struct TQueryAction { PROBE(KqpSessionPhyQueryTxResponse, GROUPS("KQP"), \ TYPES(ui64, ui32), \ NAMES("currentTx", "resultsSize")) \ + PROBE(KqpSessionSendRollback, GROUPS("KQP"), \ + TYPES(ui64), \ + NAMES("currentTx")) \ PROBE(KqpSessionReplySuccess, GROUPS("KQP"), \ TYPES(ui64), \ NAMES("responseArenaSpaceUsed")) \ @@ -45,23 +48,32 @@ struct TQueryAction { TYPES(TString), \ NAMES("errMsg")) \ PROBE(KqpCompileRequestBootstrap, GROUPS("KQP"), \ - TYPES(TString, ui64), \ - NAMES("userSid", "queryHash")) \ + TYPES(TString), \ + NAMES("userSid")) \ PROBE(KqpCompileRequestHandleServiceReply, GROUPS("KQP"), \ - TYPES(TString, ui64), \ - NAMES("userSid", "queryHash")) \ + TYPES(TString), \ + NAMES("userSid")) \ PROBE(KqpCompileServiceHandleRequest, GROUPS("KQP"), \ - TYPES(TString, ui64), \ - NAMES("userSid", "queryHash")) \ + TYPES(TString), \ + NAMES("userSid")) \ PROBE(KqpCompileServiceEnqueued, GROUPS("KQP"), \ - TYPES(TString, ui64), \ - NAMES("userSid", "queryHash")) \ + TYPES(TString), \ + NAMES("userSid")) \ PROBE(KqpCompileServiceGetCompilation, GROUPS("KQP"), \ - TYPES(TString, ui64, TString), \ - NAMES("userSid", "queryHash", "compileActor")) \ + TYPES(TString, TString), \ + NAMES("userSid", "compileActor")) \ PROBE(KqpCompileServiceReply, GROUPS("KQP"), \ - TYPES(TString, ui64, TString), \ - NAMES("userSid", "queryHash", "compileResult")) \ + TYPES(TString, TString), \ + NAMES("userSid", "compileResult")) \ + PROBE(KqpCompileServiceReplyFromCache, GROUPS("KQP"), \ + TYPES(), \ + NAMES()) \ + PROBE(KqpCompileServiceReplyError, GROUPS("KQP"), \ + TYPES(), \ + NAMES()) \ + PROBE(KqpCompileServiceReplyInternalError, GROUPS("KQP"), \ + TYPES(), \ + NAMES()) \ PROBE(KqpBaseExecuterHandleReady, GROUPS("KQP"), \ TYPES(ui64), \ NAMES("TxId")) \ diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index 49754c6b8a5..e1830b9f5b1 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -604,7 +604,6 @@ protected: } LWTRACK(KqpBaseExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); - ResponseEv->Orbit = std::move(ResponseEv->Orbit); this->Send(Target, ResponseEv.release()); this->PassAway(); diff --git a/ydb/core/kqp/executer/kqp_literal_executer.cpp b/ydb/core/kqp/executer/kqp_literal_executer.cpp index 3a4935f859b..5ca5367accd 100644 --- a/ydb/core/kqp/executer/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer/kqp_literal_executer.cpp @@ -412,7 +412,6 @@ private: response.MutableIssues()->Swap(issues); LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); - ResponseEv->Orbit = std::move(ResponseEv->Orbit); Send(Target, ResponseEv.release()); PassAway(); diff --git a/ydb/core/kqp/kqp_compile_request.cpp b/ydb/core/kqp/kqp_compile_request.cpp index 3bc7e0fed0d..0d5f80ce409 100644 --- a/ydb/core/kqp/kqp_compile_request.cpp +++ b/ydb/core/kqp/kqp_compile_request.cpp @@ -41,8 +41,7 @@ public: void Bootstrap(const TActorContext& ctx) { LWTRACK(KqpCompileRequestBootstrap, Orbit, - Query ? Query->UserSid : 0, - Query ? Query->GetHash() : 0); + Query ? Query->UserSid : 0); TimeoutTimerId = CreateLongTimer(ctx, Deadline - TInstant::Now(), new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup())); @@ -61,8 +60,7 @@ public: const auto& query = ev->Get()->CompileResult->Query; LWTRACK(KqpCompileRequestHandleServiceReply, ev->Get()->Orbit, - query ? query->UserSid : 0, - query ? query->GetHash() : 0); + query ? query->UserSid : 0); auto compileResult = ev->Get()->CompileResult; const auto& stats = ev->Get()->Stats; @@ -96,7 +94,7 @@ public: << ", queryUid: " << compileResult.Uid); auto recompileEv = MakeHolder<TEvKqp::TEvRecompileRequest>(UserToken, compileResult.Uid, compileResult.Query, - Deadline, DbCounters); + Deadline, DbCounters, std::move(DeferredResponse->Orbit)); ctx.Send(MakeKqpCompileServiceID(ctx.SelfID.NodeId()), recompileEv.Release()); DeferredResponse.Reset(); diff --git a/ydb/core/kqp/kqp_compile_service.cpp b/ydb/core/kqp/kqp_compile_service.cpp index a992fec7ea2..2c1ee8e2f78 100644 --- a/ydb/core/kqp/kqp_compile_service.cpp +++ b/ydb/core/kqp/kqp_compile_service.cpp @@ -408,8 +408,7 @@ private: const auto& query = ev->Get()->Query; LWTRACK(KqpCompileServiceHandleRequest, ev->Get()->Orbit, - query ? query->UserSid : 0, - query ? query->GetHash() : 0); + query ? query->UserSid : 0); try { PerformRequest(ev, ctx); @@ -498,8 +497,7 @@ private: LWTRACK(KqpCompileServiceEnqueued, ev->Get()->Orbit, - ev->Get()->Query ? ev->Get()->Query->UserSid : 0, - ev->Get()->Query ? ev->Get()->Query->GetHash() : 0); + ev->Get()->Query ? ev->Get()->Query->UserSid : 0); TKqpCompileRequest compileRequest(ev->Sender, CreateGuidAsString(), std::move(*request.Query), @@ -549,7 +547,7 @@ private: Counters->ReportCompileRequestCompile(dbCounters); TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query, - true, request.UserToken, request.Deadline, dbCounters); + true, request.UserToken, request.Deadline, dbCounters, ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit()); if (!RequestsQueue.Enqueue(std::move(compileRequest))) { Counters->ReportCompileRequestRejected(dbCounters); @@ -612,7 +610,7 @@ private: auto requests = RequestsQueue.ExtractByQuery(*compileResult->Query); for (auto& request : requests) { - LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, request.Query.GetHash(), compileActorId.ToString()); + LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString()); Reply(request.Sender, compileResult, compileStats, ctx, std::move(request.Orbit)); } } else { @@ -621,7 +619,7 @@ private: } } - LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileRequest.Query.GetHash(), compileActorId.ToString()); + LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString()); Reply(compileRequest.Sender, compileResult, compileStats, ctx, std::move(compileRequest.Orbit)); } catch (const std::exception& e) { @@ -722,7 +720,6 @@ private: LWTRACK(KqpCompileServiceReply, orbit, query ? query->UserSid : 0, - query ? query->GetHash() : 0, compileResult->Issues.ToString()); LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Send response" @@ -747,12 +744,14 @@ private: NKqpProto::TKqpStatsCompile stats; stats.SetFromCache(true); + LWTRACK(KqpCompileServiceReplyFromCache, orbit); Reply(sender, compileResult, stats, ctx, std::move(orbit)); } void ReplyError(const TActorId& sender, const TString& uid, Ydb::StatusIds::StatusCode status, const TIssues& issues, const TActorContext& ctx, NLWTrace::TOrbit orbit) { + LWTRACK(KqpCompileServiceReplyError, orbit); Reply(sender, TKqpCompileResult::Make(uid, status, issues), NKqpProto::TKqpStatsCompile(), ctx, std::move(orbit)); } @@ -762,6 +761,7 @@ private: NYql::TIssue issue(NYql::TPosition(), TStringBuilder() << "Internal error during query compilation."); issue.AddSubIssue(MakeIntrusive<TIssue>(NYql::TPosition(), message)); + LWTRACK(KqpCompileServiceReplyInternalError, orbit); ReplyError(sender, uid, Ydb::StatusIds::INTERNAL_ERROR, {issue}, ctx, std::move(orbit)); } diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 46c12550eb0..dcdc5a7450a 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -1625,6 +1625,10 @@ public: } void SendRollbackRequest(TKqpTransactionContext* txCtx) { + if (QueryState) { + LWTRACK(KqpSessionSendRollback, QueryState->Orbit, QueryState->CurrentTx); + } + auto request = PreparePhysicalRequest(nullptr); request.EraseLocks = true; @@ -1706,6 +1710,10 @@ public: } void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { + if (QueryState) { + QueryState->Orbit = std::move(ev->Get()->Orbit); + } + auto& response = ev->Get()->Record.GetResponse(); if (response.GetStatus() != Ydb::StatusIds::SUCCESS) { TIssues issues; |