diff options
author | va-kuznecov <[email protected]> | 2022-11-01 15:51:07 +0300 |
---|---|---|
committer | va-kuznecov <[email protected]> | 2022-11-01 15:51:07 +0300 |
commit | 35637e1cb63a2c47420861852eea3e68556c621a (patch) | |
tree | 6a0d2c0ac7b53e16b427a977dc9521d7ed0c0bd6 | |
parent | ee592643d76623dd06501a1b0ae47193f33b090e (diff) |
Rework TTimer in SessionActor
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 61 |
1 files changed, 44 insertions, 17 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 4563c09825d..e193c390527 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -104,9 +104,18 @@ struct TKqpQueryState { NTopic::TTopicOperations TopicOperations; TDuration CpuTime; + std::optional<NCpuTime::TCpuTimer> CurrentTimer; - std::optional<NCpuTime::TCpuTimer> GetTimer() { - return {CpuTime}; + void ResetTimer() { + if (CurrentTimer) { + CpuTime += CurrentTimer->GetTime(); + CurrentTimer.reset(); + } + } + + TDuration GetCpuTime() { + ResetTimer(); + return CpuTime; } }; @@ -120,6 +129,28 @@ struct TKqpCleanupCtx { }; class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> { + +class TTimerGuard { +public: + TTimerGuard(TKqpSessionActor* this_) + : This(this_) + { + if (This->QueryState) { + YQL_ENSURE(!This->QueryState->CurrentTimer); + This->QueryState->CurrentTimer.emplace(); + } + } + + ~TTimerGuard() { + if (This->QueryState) { + This->QueryState->ResetTimer(); + } + } + +private: + TKqpSessionActor* This; +}; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; @@ -263,7 +294,7 @@ public: return; } - bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true, /*timer*/ {}); + bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true); if (!replied) { Become(&TKqpSessionActor::ExecuteState); @@ -306,7 +337,7 @@ public: } MakeNewQueryState(); - auto timer = QueryState->GetTimer(); + TTimerGuard timer(this); QueryState->Request.Swap(event.MutableRequest()); auto& queryRequest = QueryState->Request; @@ -515,7 +546,7 @@ public: void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) { auto compileResult = ev->Get()->CompileResult; - auto timer = QueryState->GetTimer(); + TTimerGuard timer(this); QueryState->Orbit = std::move(ev->Get()->Orbit); QueryState->MaxReadType = compileResult->MaxReadType; @@ -563,7 +594,7 @@ public: } // Can reply inside (in case of deferred-only transactions) and become ReadyState - ExecuteOrDefer(timer); + ExecuteOrDefer(); } void AcquirePersistentSnapshot() { @@ -607,6 +638,7 @@ public: } void HandleExecute(TEvKqpSnapshot::TEvCreateSnapshotResponse::TPtr& ev) { + TTimerGuard timer(this); auto *response = ev->Get(); if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) { @@ -969,7 +1001,7 @@ public: return false; } - void ExecuteOrDefer(std::optional<NCpuTime::TCpuTimer> timer = {}) { + void ExecuteOrDefer() { auto& txCtx = *QueryState->TxCtx; bool haveWork = QueryState->PreparedQuery && @@ -1011,19 +1043,16 @@ public: bool commit = QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1; if (tx || commit) { - bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit, timer); + bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit); if (!replied) { ++QueryState->CurrentTx; } } else { - timer.reset(); ReplySuccess(); } } - bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit, - std::optional<NCpuTime::TCpuTimer> timer) - { + bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit) { auto& txCtx = *QueryState->TxCtx; auto calcPure = [](const auto& tx) { @@ -1071,7 +1100,6 @@ public: YQL_ENSURE(commit); if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) { - timer.reset(); ReplySuccess(); return true; } @@ -1193,7 +1221,7 @@ public: } void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { - auto timer = QueryState->GetTimer(); + TTimerGuard timer(this); QueryState->Orbit = std::move(ev->Get()->Orbit); auto* response = ev->Get()->Record.MutableResponse(); @@ -1234,7 +1262,6 @@ public: break; } - timer.reset(); ReplyQueryError(requestInfo, status, "", MessageFromIssues(issues)); return; } @@ -1258,7 +1285,7 @@ public: exec->Swap(txResult.MutableStats()); } - ExecuteOrDefer(timer); + ExecuteOrDefer(); } void HandleExecute(TEvKqpExecuter::TEvStreamData::TPtr& ev) { @@ -1324,7 +1351,7 @@ public: auto* stats = &QueryState->Stats; stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds()); - stats->SetWorkerCpuTimeUs(QueryState->CpuTime.MicroSeconds()); + stats->SetWorkerCpuTimeUs(QueryState->GetCpuTime().MicroSeconds()); if (QueryState->CompileResult) { stats->MutableCompilation()->Swap(&QueryState->CompileStats); } |