aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-01-09 21:24:24 +0300
committergvit <gvit@ydb.tech>2023-01-09 21:24:24 +0300
commit18590c1c3b06ab6b03c4bcdc6dbc645bab1f6616 (patch)
tree3f6b3122a4b194d3c235002e075d0c6f1a058b8b
parentd4e68a99e3c7f1c1f686fbe4d0cd2c277312f66a (diff)
downloadydb-18590c1c3b06ab6b03c4bcdc6dbc645bab1f6616.tar.gz
merge literal executer into session actor
-rw-r--r--ydb/core/kqp/common/kqp_lwtrace_probes.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp21
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp88
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp71
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp25
6 files changed, 77 insertions, 133 deletions
diff --git a/ydb/core/kqp/common/kqp_lwtrace_probes.h b/ydb/core/kqp/common/kqp_lwtrace_probes.h
index 16c80c9843..752dc3a781 100644
--- a/ydb/core/kqp/common/kqp_lwtrace_probes.h
+++ b/ydb/core/kqp/common/kqp_lwtrace_probes.h
@@ -98,7 +98,7 @@ struct TQueryAction {
PROBE(KqpScanExecuterFinalize, GROUPS("KQP"), \
TYPES(ui64, ui64, TString, ui64), \
NAMES("TxId", "lastCompletedTask", "lastCompletedComputeActor", "ResultsSize")) \
- PROBE(KqpLiteralExecuterReplyErrorAndDie, GROUPS("KQP"), \
+ PROBE(KqpLiteralExecuterCreateErrorResponse, GROUPS("KQP"), \
TYPES(ui64), \
NAMES("TxId")) \
PROBE(KqpLiteralExecuterFinalize, GROUPS("KQP"), \
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 0cab182400..4a143a3b08 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -85,5 +85,8 @@ struct TEvKqpExecuter {
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters);
+std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure(
+ IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 4ee793167b..46af7edc44 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -167,28 +167,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
return CreateKqpDataExecuter(std::move(request), database, userToken, counters);
}
- if (request.Locks.empty()) {
- bool literal = true;
- for (const auto& tx : request.Transactions) {
- if (tx.Body->GetType() != NKqpProto::TKqpPhyTx::TYPE_COMPUTE) {
- literal = false;
- break;
- }
- for (const auto& stage : tx.Body->GetStages()) {
- if (stage.InputsSize() != 0) {
- literal = false; // allow only independent stages
- break;
- }
- }
- }
-
- if (literal) {
- return CreateKqpLiteralExecuter(std::move(request), counters);
- }
- }
-
bool data = true; // `false` stands for Scan
-
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
for (auto& tx : request.Transactions) {
if (txsType) {
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 0d3a6d7e98..5df271b219 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -56,27 +56,18 @@ TDqTaskRunnerExecutionContext CreateTaskRunnerExecutionContext() {
return {};
}
-class TKqpLiteralExecuter : public TActorBootstrapped<TKqpLiteralExecuter> {
- using TBase = TActorBootstrapped<TKqpLiteralExecuter>;
-
+class TKqpLiteralExecuter {
public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::KQP_LITERAL_EXECUTER_ACTOR;
- }
-
-public:
- TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters)
+ TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner)
: Request(std::move(request))
, Counters(counters)
+ , OwnerActor(owner)
, LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
- }
-
- void Bootstrap() {
StartTime = TAppData::TimeProvider->Now();
if (Request.Timeout) {
Deadline = StartTime + Request.Timeout;
@@ -86,49 +77,31 @@ public:
}
LOG_D("Begin literal execution. Operation timeout: " << Request.Timeout << ", cancelAfter: " << Request.CancelAfter);
-
- Become(&TKqpLiteralExecuter::WorkState);
}
-private:
- STATEFN(WorkState) {
+ std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure() {
try {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqpExecuter::TEvTxRequest, Handle);
- default: {
- LOG_C("TKqpLiteralExecuter, unexpected event: " << ev->GetTypeRewrite() << ", selfID: " << SelfId());
- InternalError("Unexpected event");
- }
- }
+ ExecutePureImpl();
} catch (const TMemoryLimitExceededException&) {
LOG_W("TKqpLiteralExecuter, memory limit exceeded.");
- ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
+ CreateErrorResponse(Ydb::StatusIds::PRECONDITION_FAILED,
YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Memory limit exceeded"));
} catch (...) {
auto msg = CurrentExceptionMessage();
LOG_C("TKqpLiteralExecuter, unexpected exception caught: " << msg);
InternalError(TStringBuilder() << "Unexpected exception: " << msg);
}
+
+ return std::move(ResponseEv);
}
- void Handle(TEvKqpExecuter::TEvTxRequest::TPtr& ev) {
+ void ExecutePureImpl() {
NWilson::TSpan prepareTasksSpan(TWilsonKqp::LiteralExecuterPrepareTasks, LiteralExecuterSpan.GetTraceId(), "PrepareTasks", NWilson::EFlags::AUTO_END);
if (Stats) {
Stats->StartTs = TInstant::Now();
}
- TxId = ev->Get()->Record.GetRequest().GetTxId();
- Target = ActorIdFromProto(ev->Get()->Record.GetTarget());
-
- {
- LOG_D("Report self actorId " << SelfId() << " to " << Target);
- auto progressEv = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
- ActorIdToProto(SelfId(), progressEv->Record.MutableExecuterActorId());
- Send(Target, progressEv.Release());
- }
-
LOG_D("Begin literal execution, txs: " << Request.Transactions.size());
-
auto& transactions = Request.Transactions;
FillKqpTasksGraphStages(TasksGraph, transactions);
@@ -196,7 +169,7 @@ private:
}
Finalize();
- PassAway();
+ UpdateCounters();
}
void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
@@ -299,7 +272,7 @@ private:
fakeComputeActorStats.SetDurationUs(elapsedMicros);
- Stats->AddComputeActorStats(SelfId().NodeId(), std::move(fakeComputeActorStats));
+ Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats));
Stats->ExecuterCpuTime = executerCpuTime;
Stats->FinishTs = Stats->StartTs + TDuration::MicroSeconds(elapsedMicros);
@@ -322,8 +295,7 @@ private:
LiteralExecuterSpan.EndOk();
}
CleanupCtx();
- LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
- Send(Target, ResponseEv.release());
+ LOG_D("Execution is complete, results: " << ResponseEv->ResultsSize());
}
private:
@@ -340,17 +312,17 @@ private:
auto now = AppData()->TimeProvider->Now();
if (Deadline && *Deadline <= now) {
- LOG_I("Timeout exceeded. Send timeout event to the rpc actor " << Target);
+ LOG_I("Timeout exceeded.");
- ReplyErrorAndDie(Ydb::StatusIds::TIMEOUT,
+ CreateErrorResponse(Ydb::StatusIds::TIMEOUT,
YqlIssue({}, TIssuesIds::KIKIMR_TIMEOUT, "Request timeout exceeded."));
return true;
}
if (CancelAt && *CancelAt <= now) {
- LOG_I("CancelAt exceeded. Send cancel event to the rpc actor " << Target);
+ LOG_I("CancelAt exceeded.");
- ReplyErrorAndDie(Ydb::StatusIds::CANCELLED,
+ CreateErrorResponse(Ydb::StatusIds::CANCELLED,
YqlIssue({}, TIssuesIds::KIKIMR_OPERATION_CANCELLED, "Request timeout exceeded."));
return true;
}
@@ -363,16 +335,16 @@ private:
LOG_E(message);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
issue.AddSubIssue(MakeIntrusive<TIssue>(message));
- ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issue);
+ CreateErrorResponse(Ydb::StatusIds::INTERNAL_ERROR, issue);
}
- void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const TIssue& issue) {
+ void CreateErrorResponse(Ydb::StatusIds::StatusCode status, const TIssue& issue) {
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issues;
IssueToMessage(issue, issues.Add());
- ReplyErrorAndDie(status, &issues);
+ CreateErrorResponse(status, &issues);
}
- void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
+ void CreateErrorResponse(Ydb::StatusIds::StatusCode status,
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
if (status != Ydb::StatusIds::SUCCESS) {
@@ -393,22 +365,19 @@ private:
response.SetStatus(status);
response.MutableIssues()->Swap(issues);
- LWTRACK(KqpLiteralExecuterReplyErrorAndDie, ResponseEv->Orbit, TxId);
+ LWTRACK(KqpLiteralExecuterCreateErrorResponse, ResponseEv->Orbit, TxId);
if (LiteralExecuterSpan) {
LiteralExecuterSpan.EndError(response.DebugString());
}
CleanupCtx();
- Send(Target, ResponseEv.release());
- PassAway();
+ UpdateCounters();
}
- void PassAway() override {
+ void UpdateCounters() {
auto totalTime = TInstant::Now() - StartTime;
Counters->Counters->LiteralTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
-
- TBase::PassAway();
}
private:
@@ -418,7 +387,7 @@ private:
std::unique_ptr<TQueryExecutionStats> Stats;
TMaybe<TInstant> Deadline;
TMaybe<TInstant> CancelAt;
- TActorId Target;
+ TActorId OwnerActor;
ui64 TxId = 0;
TKqpTasksGraph TasksGraph;
std::unordered_map<ui64, ui32> TaskId2StageId;
@@ -432,8 +401,13 @@ private:
} // anonymous namespace
-IActor* CreateKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters) {
- return new TKqpLiteralExecuter(std::move(request), counters);
+std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecutePure(
+ IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner)
+{
+ std::unique_ptr<TKqpLiteralExecuter> executer = std::make_unique<TKqpLiteralExecuter>(
+ std::move(request), counters, owner);
+
+ return executer->ExecutePure();
}
} // namespace NKqp
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 004b3fc7dd..ae39f77476 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -475,6 +475,7 @@ private:
TVector<NYql::NDqProto::TDqExecutionStats> Executions;
};
+
class TKqpExecPureRequestHandler: public TActorBootstrapped<TKqpExecPureRequestHandler> {
public:
using TResult = IKqpGateway::TExecPhysicalResult;
@@ -483,24 +484,31 @@ public:
return NKikimrServices::TActivity::KQP_EXEC_PHYSICAL_REQUEST_HANDLER;
}
- TKqpExecPureRequestHandler(const TActorId& executerId, TPromise<TResult> promise, TQueryData::TPtr params)
- : ExecuterId(executerId)
+ TKqpExecPureRequestHandler(IKqpGateway::TExecPhysicalRequest&& request,
+ TKqpRequestCounters::TPtr counters, TPromise<TResult> promise, TQueryData::TPtr params)
+ : Request(std::move(request))
, Parameters(params)
+ , Counters(counters)
, Promise(promise)
{}
- void Bootstrap(const TActorContext& ctx) {
- auto executerEv = MakeHolder<NKqp::TEvKqpExecuter::TEvTxRequest>();
- ActorIdToProto(ctx.SelfID, executerEv->Record.MutableTarget());
- executerEv->Record.MutableRequest()->SetTxId(0);
- ctx.Send(ExecuterId, executerEv.Release());
-
- Become(&TKqpExecPureRequestHandler::ProcessState);
+ void Bootstrap() {
+ auto result = ::NKikimr::NKqp::ExecutePure(std::move(Request), Counters, SelfId());
+ ProcessPureExecution(result);
+ Become(&TThis::DieState);
+ Send(SelfId(), new TEvents::TEvPoisonPill());
}
private:
- void Handle(TEvKqpExecuter::TEvTxResponse::TPtr &ev, const TActorContext &) {
- auto* response = ev->Get()->Record.MutableResponse();
+
+ STATEFN(DieState) {
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ }
+ }
+
+ void ProcessPureExecution(std::unique_ptr<TEvKqpExecuter::TEvTxResponse>& ev) {
+ auto* response = ev->Record.MutableResponse();
TResult result;
if (response->GetStatus() == Ydb::StatusIds::SUCCESS) {
@@ -513,7 +521,7 @@ private:
result.ExecuterResult.Swap(response->MutableResult());
{
auto g = Parameters->TypeEnv().BindAllocator();
- auto& txResults = ev->Get()->GetTxResults();
+ auto& txResults = ev->GetTxResults();
result.Results.reserve(txResults.size());
for(auto& tx : txResults) {
result.Results.emplace_back(std::move(tx.GetMkql()));
@@ -524,36 +532,14 @@ private:
this->PassAway();
}
- void Handle(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
- Y_UNUSED(ev);
- Y_UNUSED(ctx);
- }
-
- void HandleUnexpectedEvent(ui32 eventType, const TActorContext &ctx) {
- LOG_CRIT_S(ctx, NKikimrServices::KQP_GATEWAY,
- "TKqpExecPureRequestHandler, unexpected event, type: " << eventType);
-
- Promise.SetValue(ResultFromError<TResult>(YqlIssue({}, TIssuesIds::UNEXPECTED, TStringBuilder()
- << "TKqpExecPureRequestHandler, unexpected event, type: " << eventType)));
-
- this->PassAway();
- }
-
- STFUNC(ProcessState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvKqpExecuter::TEvTxResponse, Handle);
- HFunc(TEvKqpExecuter::TEvExecuterProgress, Handle);
- default:
- HandleUnexpectedEvent(ev->GetTypeRewrite(), ctx);
- }
- }
-
private:
- TActorId ExecuterId;
+ IKqpGateway::TExecPhysicalRequest Request;
TQueryData::TPtr Parameters;
+ TKqpRequestCounters::TPtr Counters;
TPromise<TResult> Promise;
};
+
class TSchemeOpRequestHandler: public TRequestHandlerBase<
TSchemeOpRequestHandler,
TEvTxUserProxy::TEvProposeTransaction,
@@ -1702,18 +1688,9 @@ public:
};
YQL_ENSURE(containOnlyPureStages(request));
-
- auto executerActor = CreateKqpExecuter(std::move(request), Database,
- UserToken ? TMaybe<TString>(UserToken->Serialized) : Nothing(), Counters);
- auto executerId = RegisterActor(executerActor);
-
- LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Created new KQP executer: " << executerId);
-
auto promise = NewPromise<TExecPhysicalResult>();
-
- IActor* requestHandler = new TKqpExecPureRequestHandler(executerId, promise, params);
+ IActor* requestHandler = new TKqpExecPureRequestHandler(std::move(request), Counters, promise, params);
RegisterActor(requestHandler);
-
return promise.GetFuture();
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 0f1f5861b5..f581c949e5 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1267,7 +1267,14 @@ public:
}
if (pure) {
- ;
+ if (QueryState) {
+ request.Orbit = std::move(QueryState->Orbit);
+ }
+ request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
+ auto response = ExecutePure(std::move(request), RequestCounters, SelfId());
+ ++QueryState->CurrentTx;
+ ProcessExecuterResult(response.get());
+ return true;
} else if (commit) {
QueryState->Commited = true;
@@ -1394,9 +1401,13 @@ public:
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
TTimerGuard timer(this);
- QueryState->Orbit = std::move(ev->Get()->Orbit);
+ ProcessExecuterResult(ev->Get());
+ }
+
+ void ProcessExecuterResult(TEvKqpExecuter::TEvTxResponse* ev) {
+ QueryState->Orbit = std::move(ev->Orbit);
- auto* response = ev->Get()->Record.MutableResponse();
+ auto* response = ev->Record.MutableResponse();
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx
<< "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0)
@@ -1436,16 +1447,16 @@ public:
}
YQL_ENSURE(QueryState);
- LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->Get()->ResultRowsCount);
+ LWTRACK(KqpSessionPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, ev->ResultRowsCount);
auto& executerResults = *response->MutableResult();
{
auto g = QueryState->QueryData->TypeEnv().BindAllocator();
- QueryState->QueryData->AddTxResults(std::move(ev->Get()->GetTxResults()));
+ QueryState->QueryData->AddTxResults(std::move(ev->GetTxResults()));
}
- if (ev->Get()->LockHandle) {
- QueryState->TxCtx->Locks.LockHandle = std::move(ev->Get()->LockHandle);
+ if (ev->LockHandle) {
+ QueryState->TxCtx->Locks.LockHandle = std::move(ev->LockHandle);
}
if (!MergeLocksWithTxResult(executerResults)) {