diff options
author | gvit <gvit@ydb.tech> | 2023-01-09 21:24:24 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-01-09 21:24:24 +0300 |
commit | 18590c1c3b06ab6b03c4bcdc6dbc645bab1f6616 (patch) | |
tree | 3f6b3122a4b194d3c235002e075d0c6f1a058b8b | |
parent | d4e68a99e3c7f1c1f686fbe4d0cd2c277312f66a (diff) | |
download | ydb-18590c1c3b06ab6b03c4bcdc6dbc645bab1f6616.tar.gz |
merge literal executer into session actor
-rw-r--r-- | ydb/core/kqp/common/kqp_lwtrace_probes.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_literal_executer.cpp | 88 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 71 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 25 |
6 files changed, 77 insertions, 133 deletions
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h index 16c80c9843..752dc3a781 100644 --- a/ydb/core/kqp/common/kqp_lwtrace_probes.h +++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h @@ -98,7 +98,7 @@ struct TQueryAction { PROBE(KqpScanExecuterFinalize, GROUPS("KQP"), \ TYPES(ui64, ui64, TString, ui64), \ NAMES("TxId", "lastCompletedTask", "lastCompletedComputeActor", "ResultsSize")) \ - PROBE(KqpLiteralExecuterReplyErrorAndDie, GROUPS("KQP"), \ + PROBE(KqpLiteralExecuterCreateErrorResponse, GROUPS("KQP"), \ TYPES(ui64), \ NAMES("TxId")) \ PROBE(KqpLiteralExecuterFinalize, GROUPS("KQP"), \ diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 0cab182400..4a143a3b08 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -85,5 +85,8 @@ struct TEvKqpExecuter { IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters); +std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure( + IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 4ee793167b..46af7edc44 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -167,28 +167,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter(std::move(request), database, userToken, counters); } - if (request.Locks.empty()) { - bool literal = true; - for (const auto& tx : request.Transactions) { - if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) { - literal = false; - break; - } - for (const auto& stage : tx.Body->GetStages()) { - if (stage.InputsSize() != 0) { - literal = false; // allow only independent stages - break; - } - } - } - - if (literal) { - return CreateKqpLiteralExecuter(std::move(request), counters); - } - } - bool data = true; // `false` stands for Scan - TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; for (auto& tx : request.Transactions) { if (txsType) { diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 0d3a6d7e98..5df271b219 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -56,27 +56,18 @@ TDqTaskRunnerExecutionContext CreateTaskRunnerExecutionContext() { return {}; } -class TKqpLiteralExecuter : public TActorBootstrapped<TKqpLiteralExecuter> { - using TBase = TActorBootstrapped<TKqpLiteralExecuter>; - +class TKqpLiteralExecuter { public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::KQP_LITERAL_EXECUTER_ACTOR; - } - -public: - TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters) + TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner) : Request(std::move(request)) , Counters(counters) + , OwnerActor(owner) , LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter") { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc); ResponseEv->Orbit = std::move(Request.Orbit); Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph, ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats()); - } - - void Bootstrap() { StartTime = TAppData::TimeProvider->Now(); if (Request.Timeout) { Deadline = StartTime + Request.Timeout; @@ -86,49 +77,31 @@ public: } LOG_D("Begin literal execution. Operation timeout: " << Request.Timeout << ", cancelAfter: " << Request.CancelAfter); - - Become(&TKqpLiteralExecuter::WorkState); } -private: - STATEFN(WorkState) { + std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure() { try { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKqpExecuter::TEvTxRequest, Handle); - default: { - LOG_C("TKqpLiteralExecuter, unexpected event: " << ev->GetTypeRewrite() << ", selfID: " << SelfId()); - InternalError("Unexpected event"); - } - } + ExecutePureImpl(); } catch (const TMemoryLimitExceededException&) { LOG_W("TKqpLiteralExecuter, memory limit exceeded."); - ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, + CreateErrorResponse(Ydb::StatusIds::PRECONDITION_FAILED, YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Memory limit exceeded")); } catch (...) { auto msg = CurrentExceptionMessage(); LOG_C("TKqpLiteralExecuter, unexpected exception caught: " << msg); InternalError(TStringBuilder() << "Unexpected exception: " << msg); } + + return std::move(ResponseEv); } - void Handle(TEvKqpExecuter::TEvTxRequest::TPtr& ev) { + void ExecutePureImpl() { NWilson::TSpan prepareTasksSpan(TWilsonKqp::LiteralExecuterPrepareTasks, LiteralExecuterSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END); if (Stats) { Stats->StartTs = TInstant::Now(); } - TxId = ev->Get()->Record.GetRequest().GetTxId(); - Target = ActorIdFromProto(ev->Get()->Record.GetTarget()); - - { - LOG_D("Report self actorId " << SelfId() << " to " << Target); - auto progressEv = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>(); - ActorIdToProto(SelfId(), progressEv->Record.MutableExecuterActorId()); - Send(Target, progressEv.Release()); - } - LOG_D("Begin literal execution, txs: " << Request.Transactions.size()); - auto& transactions = Request.Transactions; FillKqpTasksGraphStages(TasksGraph, transactions); @@ -196,7 +169,7 @@ private: } Finalize(); - PassAway(); + UpdateCounters(); } void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) { @@ -299,7 +272,7 @@ private: fakeComputeActorStats.SetDurationUs(elapsedMicros); - Stats->AddComputeActorStats(SelfId().NodeId(), std::move(fakeComputeActorStats)); + Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats)); Stats->ExecuterCpuTime = executerCpuTime; Stats->FinishTs = Stats->StartTs + TDuration::MicroSeconds(elapsedMicros); @@ -322,8 +295,7 @@ private: LiteralExecuterSpan.EndOk(); } CleanupCtx(); - LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize()); - Send(Target, ResponseEv.release()); + LOG_D("Execution is complete, results: " << ResponseEv->ResultsSize()); } private: @@ -340,17 +312,17 @@ private: auto now = AppData()->TimeProvider->Now(); if (Deadline && *Deadline <= now) { - LOG_I("Timeout exceeded. Send timeout event to the rpc actor " << Target); + LOG_I("Timeout exceeded."); - ReplyErrorAndDie(Ydb::StatusIds::TIMEOUT, + CreateErrorResponse(Ydb::StatusIds::TIMEOUT, YqlIssue({}, TIssuesIds::KIKIMR_TIMEOUT, "Request timeout exceeded.")); return true; } if (CancelAt && *CancelAt <= now) { - LOG_I("CancelAt exceeded. Send cancel event to the rpc actor " << Target); + LOG_I("CancelAt exceeded."); - ReplyErrorAndDie(Ydb::StatusIds::CANCELLED, + CreateErrorResponse(Ydb::StatusIds::CANCELLED, YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_CANCELLED, "Request timeout exceeded.")); return true; } @@ -363,16 +335,16 @@ private: LOG_E(message); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction."); issue.AddSubIssue(MakeIntrusive<TIssue>(message)); - ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issue); + CreateErrorResponse(Ydb::StatusIds::INTERNAL_ERROR, issue); } - void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const TIssue& issue) { + void CreateErrorResponse(Ydb::StatusIds::StatusCode status, const TIssue& issue) { google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issues; IssueToMessage(issue, issues.Add()); - ReplyErrorAndDie(status, &issues); + CreateErrorResponse(status, &issues); } - void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, + void CreateErrorResponse(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues) { if (status != Ydb::StatusIds::SUCCESS) { @@ -393,22 +365,19 @@ private: response.SetStatus(status); response.MutableIssues()->Swap(issues); - LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId); + LWTRACK(KqpLiteralExecuterCreateErrorResponse, ResponseEv->Orbit, TxId); if (LiteralExecuterSpan) { LiteralExecuterSpan.EndError(response.DebugString()); } CleanupCtx(); - Send(Target, ResponseEv.release()); - PassAway(); + UpdateCounters(); } - void PassAway() override { + void UpdateCounters() { auto totalTime = TInstant::Now() - StartTime; Counters->Counters->LiteralTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); - - TBase::PassAway(); } private: @@ -418,7 +387,7 @@ private: std::unique_ptr<TQueryExecutionStats> Stats; TMaybe<TInstant> Deadline; TMaybe<TInstant> CancelAt; - TActorId Target; + TActorId OwnerActor; ui64 TxId = 0; TKqpTasksGraph TasksGraph; std::unordered_map<ui64, ui32> TaskId2StageId; @@ -432,8 +401,13 @@ private: } // anonymous namespace -IActor* CreateKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters) { - return new TKqpLiteralExecuter(std::move(request), counters); +std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure( + IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner) +{ + std::unique_ptr<TKqpLiteralExecuter> executer = std::make_unique<TKqpLiteralExecuter>( + std::move(request), counters, owner); + + return executer->ExecutePure(); } } // namespace NKqp diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 004b3fc7dd..ae39f77476 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -475,6 +475,7 @@ private: TVector<NYql::NDqProto::TDqExecutionStats> Executions; }; + class TKqpExecPureRequestHandler: public TActorBootstrapped<TKqpExecPureRequestHandler> { public: using TResult = IKqpGateway::TExecPhysicalResult; @@ -483,24 +484,31 @@ public: return NKikimrServices::TActivity::KQP_EXEC_PHYSICAL_REQUEST_HANDLER; } - TKqpExecPureRequestHandler(const TActorId& executerId, TPromise<TResult> promise, TQueryData::TPtr params) - : ExecuterId(executerId) + TKqpExecPureRequestHandler(IKqpGateway::TExecPhysicalRequest&& request, + TKqpRequestCounters::TPtr counters, TPromise<TResult> promise, TQueryData::TPtr params) + : Request(std::move(request)) , Parameters(params) + , Counters(counters) , Promise(promise) {} - void Bootstrap(const TActorContext& ctx) { - auto executerEv = MakeHolder<NKqp::TEvKqpExecuter::TEvTxRequest>(); - ActorIdToProto(ctx.SelfID, executerEv->Record.MutableTarget()); - executerEv->Record.MutableRequest()->SetTxId(0); - ctx.Send(ExecuterId, executerEv.Release()); - - Become(&TKqpExecPureRequestHandler::ProcessState); + void Bootstrap() { + auto result = ::NKikimr::NKqp::ExecutePure(std::move(Request), Counters, SelfId()); + ProcessPureExecution(result); + Become(&TThis::DieState); + Send(SelfId(), new TEvents::TEvPoisonPill()); } private: - void Handle(TEvKqpExecuter::TEvTxResponse::TPtr &ev, const TActorContext &) { - auto* response = ev->Get()->Record.MutableResponse(); + + STATEFN(DieState) { + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + } + } + + void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) { + auto* response = ev->Record.MutableResponse(); TResult result; if (response->GetStatus() == Ydb::StatusIds::SUCCESS) { @@ -513,7 +521,7 @@ private: result.ExecuterResult.Swap(response->MutableResult()); { auto g = Parameters->TypeEnv().BindAllocator(); - auto& txResults = ev->Get()->GetTxResults(); + auto& txResults = ev->GetTxResults(); result.Results.reserve(txResults.size()); for(auto& tx : txResults) { result.Results.emplace_back(std::move(tx.GetMkql())); @@ -524,36 +532,14 @@ private: this->PassAway(); } - void Handle(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); - Y_UNUSED(ctx); - } - - void HandleUnexpectedEvent(ui32 eventType, const TActorContext &ctx) { - LOG_CRIT_S(ctx, NKikimrServices::KQP_GATEWAY, - "TKqpExecPureRequestHandler, unexpected event, type: " << eventType); - - Promise.SetValue(ResultFromError<TResult>(YqlIssue({}, TIssuesIds::UNEXPECTED, TStringBuilder() - << "TKqpExecPureRequestHandler, unexpected event, type: " << eventType))); - - this->PassAway(); - } - - STFUNC(ProcessState) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqpExecuter::TEvTxResponse, Handle); - HFunc(TEvKqpExecuter::TEvExecuterProgress, Handle); - default: - HandleUnexpectedEvent(ev->GetTypeRewrite(), ctx); - } - } - private: - TActorId ExecuterId; + IKqpGateway::TExecPhysicalRequest Request; TQueryData::TPtr Parameters; + TKqpRequestCounters::TPtr Counters; TPromise<TResult> Promise; }; + class TSchemeOpRequestHandler: public TRequestHandlerBase< TSchemeOpRequestHandler, TEvTxUserProxy::TEvProposeTransaction, @@ -1702,18 +1688,9 @@ public: }; YQL_ENSURE(containOnlyPureStages(request)); - - auto executerActor = CreateKqpExecuter(std::move(request), Database, - UserToken ? TMaybe<TString>(UserToken->Serialized) : Nothing(), Counters); - auto executerId = RegisterActor(executerActor); - - LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Created new KQP executer: " << executerId); - auto promise = NewPromise<TExecPhysicalResult>(); - - IActor* requestHandler = new TKqpExecPureRequestHandler(executerId, promise, params); + IActor* requestHandler = new TKqpExecPureRequestHandler(std::move(request), Counters, promise, params); RegisterActor(requestHandler); - return promise.GetFuture(); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 0f1f5861b5..f581c949e5 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1267,7 +1267,14 @@ public: } if (pure) { - ; + if (QueryState) { + request.Orbit = std::move(QueryState->Orbit); + } + request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); + auto response = ExecutePure(std::move(request), RequestCounters, SelfId()); + ++QueryState->CurrentTx; + ProcessExecuterResult(response.get()); + return true; } else if (commit) { QueryState->Commited = true; @@ -1394,9 +1401,13 @@ public: void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { TTimerGuard timer(this); - QueryState->Orbit = std::move(ev->Get()->Orbit); + ProcessExecuterResult(ev->Get()); + } + + void ProcessExecuterResult(TEvKqpExecuter::TEvTxResponse* ev) { + QueryState->Orbit = std::move(ev->Orbit); - auto* response = ev->Get()->Record.MutableResponse(); + auto* response = ev->Record.MutableResponse(); LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0) @@ -1436,16 +1447,16 @@ public: } YQL_ENSURE(QueryState); - LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->Get()->ResultRowsCount); + LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount); auto& executerResults = *response->MutableResult(); { auto g = QueryState->QueryData->TypeEnv().BindAllocator(); - QueryState->QueryData->AddTxResults(std::move(ev->Get()->GetTxResults())); + QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults())); } - if (ev->Get()->LockHandle) { - QueryState->TxCtx->Locks.LockHandle = std::move(ev->Get()->LockHandle); + if (ev->LockHandle) { + QueryState->TxCtx->Locks.LockHandle = std::move(ev->LockHandle); } if (!MergeLocksWithTxResult(executerResults)) { |