summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <[email protected]>2022-11-01 15:51:07 +0300
committerva-kuznecov <[email protected]>2022-11-01 15:51:07 +0300
commit35637e1cb63a2c47420861852eea3e68556c621a (patch)
tree6a0d2c0ac7b53e16b427a977dc9521d7ed0c0bd6
parentee592643d76623dd06501a1b0ae47193f33b090e (diff)
Rework TTimer in SessionActor
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp61
1 files changed, 44 insertions, 17 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 4563c09825d..e193c390527 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -104,9 +104,18 @@ struct TKqpQueryState {
NTopic::TTopicOperations TopicOperations;
TDuration CpuTime;
+ std::optional<NCpuTime::TCpuTimer> CurrentTimer;
- std::optional<NCpuTime::TCpuTimer> GetTimer() {
- return {CpuTime};
+ void ResetTimer() {
+ if (CurrentTimer) {
+ CpuTime += CurrentTimer->GetTime();
+ CurrentTimer.reset();
+ }
+ }
+
+ TDuration GetCpuTime() {
+ ResetTimer();
+ return CpuTime;
}
};
@@ -120,6 +129,28 @@ struct TKqpCleanupCtx {
};
class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
+
+class TTimerGuard {
+public:
+ TTimerGuard(TKqpSessionActor* this_)
+ : This(this_)
+ {
+ if (This->QueryState) {
+ YQL_ENSURE(!This->QueryState->CurrentTimer);
+ This->QueryState->CurrentTimer.emplace();
+ }
+ }
+
+ ~TTimerGuard() {
+ if (This->QueryState) {
+ This->QueryState->ResetTimer();
+ }
+ }
+
+private:
+ TKqpSessionActor* This;
+};
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::KQP_SESSION_ACTOR;
@@ -263,7 +294,7 @@ public:
return;
}
- bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true, /*timer*/ {});
+ bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true);
if (!replied) {
Become(&TKqpSessionActor::ExecuteState);
@@ -306,7 +337,7 @@ public:
}
MakeNewQueryState();
- auto timer = QueryState->GetTimer();
+ TTimerGuard timer(this);
QueryState->Request.Swap(event.MutableRequest());
auto& queryRequest = QueryState->Request;
@@ -515,7 +546,7 @@ public:
void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) {
auto compileResult = ev->Get()->CompileResult;
- auto timer = QueryState->GetTimer();
+ TTimerGuard timer(this);
QueryState->Orbit = std::move(ev->Get()->Orbit);
QueryState->MaxReadType = compileResult->MaxReadType;
@@ -563,7 +594,7 @@ public:
}
// Can reply inside (in case of deferred-only transactions) and become ReadyState
- ExecuteOrDefer(timer);
+ ExecuteOrDefer();
}
void AcquirePersistentSnapshot() {
@@ -607,6 +638,7 @@ public:
}
void HandleExecute(TEvKqpSnapshot::TEvCreateSnapshotResponse::TPtr& ev) {
+ TTimerGuard timer(this);
auto *response = ev->Get();
if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) {
@@ -969,7 +1001,7 @@ public:
return false;
}
- void ExecuteOrDefer(std::optional<NCpuTime::TCpuTimer> timer = {}) {
+ void ExecuteOrDefer() {
auto& txCtx = *QueryState->TxCtx;
bool haveWork = QueryState->PreparedQuery &&
@@ -1011,19 +1043,16 @@ public:
bool commit = QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1;
if (tx || commit) {
- bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit, timer);
+ bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit);
if (!replied) {
++QueryState->CurrentTx;
}
} else {
- timer.reset();
ReplySuccess();
}
}
- bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit,
- std::optional<NCpuTime::TCpuTimer> timer)
- {
+ bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit) {
auto& txCtx = *QueryState->TxCtx;
auto calcPure = [](const auto& tx) {
@@ -1071,7 +1100,6 @@ public:
YQL_ENSURE(commit);
if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) {
- timer.reset();
ReplySuccess();
return true;
}
@@ -1193,7 +1221,7 @@ public:
}
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
- auto timer = QueryState->GetTimer();
+ TTimerGuard timer(this);
QueryState->Orbit = std::move(ev->Get()->Orbit);
auto* response = ev->Get()->Record.MutableResponse();
@@ -1234,7 +1262,6 @@ public:
break;
}
- timer.reset();
ReplyQueryError(requestInfo, status, "", MessageFromIssues(issues));
return;
}
@@ -1258,7 +1285,7 @@ public:
exec->Swap(txResult.MutableStats());
}
- ExecuteOrDefer(timer);
+ ExecuteOrDefer();
}
void HandleExecute(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
@@ -1324,7 +1351,7 @@ public:
auto* stats = &QueryState->Stats;
stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
- stats->SetWorkerCpuTimeUs(QueryState->CpuTime.MicroSeconds());
+ stats->SetWorkerCpuTimeUs(QueryState->GetCpuTime().MicroSeconds());
if (QueryState->CompileResult) {
stats->MutableCompilation()->Swap(&QueryState->CompileStats);
}