aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-07-04 17:36:38 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-07-04 17:36:38 +0300
commit3c9dfb5ccfd56a66319bd512f35a741b62847e3f (patch)
tree325868b5f866ac8edc25bf6ccdeb1a39c9775416
parentd3988e8cf35b73cd6181d95e06d159d8e931acb9 (diff)
downloadydb-3c9dfb5ccfd56a66319bd512f35a741b62847e3f.tar.gz
Async YQL Facade execution and evaluation graph result support
ref:84bdf5f56eeb786255e30a7475a5d85af2e6ac30
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp5
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp480
-rw-r--r--ydb/core/yq/libs/events/events.h1
-rw-r--r--ydb/core/yq/libs/gateway/empty_gateway.cpp23
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp1
5 files changed, 413 insertions, 97 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index aec8826f62d..387936ece00 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -159,11 +159,8 @@ public:
NActors::IActor::PassAway();
}
- void Bootstrap(const TActorContext& ctx) {
+ void Bootstrap() {
Become(&TPendingFetcher::StateFunc);
-
- Y_UNUSED(ctx);
-
DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory));
Send(SelfId(), new NActors::TEvents::TEvWakeup());
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index bff5d831bd7..00f7acb5784 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -17,6 +17,7 @@
#include <ydb/library/yql/providers/dq/actors/executer_actor.h>
#include <ydb/library/yql/providers/dq/actors/proto_builder.h>
#include <ydb/library/yql/providers/dq/actors/task_controller.h>
+#include <ydb/library/yql/providers/dq/actors/result_receiver.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
@@ -83,10 +84,187 @@ using namespace NActors;
using namespace NYql;
using namespace NDqs;
+namespace {
+
+struct TEvPrivate {
+ // Event ids
+ enum EEv : ui32 {
+ EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+
+ EvProgramFinished = EvBegin,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ // Events
+ struct TEvProgramFinished : public NActors::TEventLocal<TEvProgramFinished, EvProgramFinished> {
+ TEvProgramFinished(TIssues issues, const TString& plan, const TString& expr, NYql::TProgram::TStatus status, const TString& message)
+ : Issues(issues), Plan(plan), Expr(expr), Status(status), Message(message)
+ {
+ }
+
+ TIssues Issues;
+ TString Plan;
+ TString Expr;
+ NYql::TProgram::TStatus Status;
+ TString Message;
+ };
+};
+
+}
+
+class TProgramRunnerActor : public NActors::TActorBootstrapped<TProgramRunnerActor> {
+public:
+ TProgramRunnerActor(
+ const TActorId& runActorId,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ ui64 nextUniqueId,
+ TVector<TDataProviderInitializer> dataProvidersInit,
+ NYql::IModuleResolver::TPtr& moduleResolver,
+ NYql::TGatewaysConfig gatewaysConfig,
+ const TString& sql,
+ const TString& sessionId,
+ NSQLTranslation::TTranslationSettings sqlSettings,
+ YandexQuery::ExecuteMode executeMode
+ )
+ : RunActorId(runActorId),
+ FunctionRegistry(functionRegistry),
+ NextUniqueId(nextUniqueId),
+ DataProvidersInit(dataProvidersInit),
+ ModuleResolver(moduleResolver),
+ GatewaysConfig(gatewaysConfig),
+ Sql(sql),
+ SessionId(sessionId),
+ SqlSettings(sqlSettings),
+ ExecuteMode(executeMode)
+ {
+ }
+
+ void Bootstrap(const TActorContext& ctx) {
+ TProgramFactory progFactory(false, FunctionRegistry, NextUniqueId, DataProvidersInit, "yq");
+ progFactory.SetModules(ModuleResolver);
+ progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry, nullptr));
+ progFactory.SetGatewaysConfig(&GatewaysConfig);
+
+ Program = progFactory.Create("-stdin-", Sql, SessionId);
+ Program->EnableResultPosition();
+
+ // parse phase
+ {
+ if (!Program->ParseSql(SqlSettings)) {
+ Issues.AddIssues(Program->Issues());
+ SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to parse query");
+ return;
+ }
+
+ if (ExecuteMode == YandexQuery::ExecuteMode::PARSE) {
+ SendStatusAndDie(ctx, TProgram::TStatus::Ok);
+ return;
+ }
+ }
+
+ // compile phase
+ {
+ if (!Program->Compile("")) {
+ Issues.AddIssues(Program->Issues());
+ SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to compile query");
+ return;
+ }
+
+ if (ExecuteMode == YandexQuery::ExecuteMode::COMPILE) {
+ SendStatusAndDie(ctx, TProgram::TStatus::Ok);
+ return;
+ }
+ }
+
+ Compiled = true;
+
+ // next phases can be async: optimize, validate, run
+ TProgram::TFutureStatus futureStatus;
+ switch (ExecuteMode) {
+ case YandexQuery::ExecuteMode::EXPLAIN:
+ futureStatus = Program->OptimizeAsync("");
+ break;
+ case YandexQuery::ExecuteMode::VALIDATE:
+ futureStatus = Program->ValidateAsync("");
+ break;
+ case YandexQuery::ExecuteMode::RUN:
+ futureStatus = Program->RunAsync("");
+ break;
+ default:
+ SendStatusAndDie(ctx, TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode));
+ return;
+ }
+
+ futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
+ actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
+ });
+
+ Become(&TProgramRunnerActor::StateFunc);
+ }
+
+ void SendStatusAndDie(const TActorContext& ctx, NYql::TProgram::TStatus status, const TString& message = "") {
+ TString expr;
+ TString plan;
+ if (Compiled) {
+ TStringStream exprOut;
+ TStringStream planOut;
+ Program->Print(&exprOut, &planOut);
+ plan = NJson2Yson::ConvertYson2Json(planOut.Str());
+ expr = exprOut.Str();
+ }
+ Issues.AddIssues(Program->Issues());
+ Send(RunActorId, new TEvPrivate::TEvProgramFinished(Issues, plan, expr, status, message));
+ Die(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ HFunc(TEvents::TEvAsyncContinue, Handle);
+ )
+
+ void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) {
+ NYql::TProgram::TStatus status = TProgram::TStatus::Error;
+
+ const auto& f = ev->Get()->Future;
+ try {
+ status = f.GetValue();
+ if (status == TProgram::TStatus::Async) {
+ auto futureStatus = Program->ContinueAsync();
+ auto actorSystem = ctx.ActorSystem();
+ auto selfId = ctx.SelfID;
+ futureStatus.Subscribe([actorSystem, selfId](const TProgram::TFutureStatus& f) {
+ actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
+ });
+ return;
+ }
+ } catch (const std::exception& err) {
+ Issues.AddIssue(ExceptionToIssue(err));
+ }
+ SendStatusAndDie(ctx, status);
+ }
+
+private:
+ TProgramPtr Program;
+ TIssues Issues;
+ TActorId RunActorId;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
+ ui64 NextUniqueId;
+ TVector<TDataProviderInitializer> DataProvidersInit;
+ NYql::IModuleResolver::TPtr ModuleResolver;
+ NYql::TGatewaysConfig GatewaysConfig;
+ const TString Sql;
+ const TString SessionId;
+ NSQLTranslation::TTranslationSettings SqlSettings;
+ YandexQuery::ExecuteMode ExecuteMode;
+ bool Compiled = false;
+};
+
class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
public:
explicit TRunActor(
- const NActors::TActorId& fetcherId
+ const TActorId& fetcherId
, const ::NYql::NCommon::TServiceCounters& queryCounters
, TRunActorParams&& params)
: FetcherId(fetcherId)
@@ -137,6 +315,7 @@ private:
}
STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvProgramFinished, Handle);
HFunc(TEvents::TEvAsyncContinue, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(TEvents::TEvGraphParams, Handle);
@@ -241,13 +420,6 @@ private:
}
}
- void RunProgram() {
- LOG_D("RunProgram");
- if (!CompileQuery()) {
- Abort("Failed to compile query", YandexQuery::QueryMeta::FAILED);
- }
- }
-
void FailOnException() {
Fail(CurrentExceptionMessage());
}
@@ -453,8 +625,32 @@ private:
}
void Handle(TEvents::TEvGraphParams::TPtr& ev) {
- LOG_D("Graph params with tasks: " << ev->Get()->GraphParams.TasksSize());
- DqGraphParams.push_back(ev->Get()->GraphParams);
+ LOG_D("Graph (" << (ev->Get()->IsEvaluation ? "evaluation" : "execution") << ") with tasks: " << ev->Get()->GraphParams.TasksSize());
+
+ if (ev->Get()->IsEvaluation) {
+ Y_ASSERT(!EvaluationInProgress);
+ EvaluationInProgress = true;
+ EvaluationResult = ev->Get()->Result;
+ RunEvalDqGraph(ev->Get()->GraphParams);
+ } else {
+ DqGraphParams.push_back(ev->Get()->GraphParams);
+
+ NYql::IDqGateway::TResult gatewayResult;
+ // fake it till you make it
+ // generate dummy result for YQL facade now, remove this gateway completely
+ // when top-level YQL facade call like Preprocess() is implemented
+ if (ev->Get()->GraphParams.GetResultType()) {
+ // for resultable graphs return dummy "select 1" result (it is not used and is required to satisfy YQL facade only)
+ gatewayResult.SetSuccess();
+ gatewayResult.Data = "[[\001\0021]]";
+ gatewayResult.Truncated = true;
+ gatewayResult.RowsCount = 0;
+ } else {
+ // for resultless results expect infinite INSERT FROM SELECT and fail YQL facade (with well known secret code?)
+ gatewayResult.Issues.AddIssues({NYql::TIssue("MAGIC BREAK").SetCode(555, NYql::TSeverityIds::S_ERROR)});
+ }
+ ev->Get()->Result.SetValue(gatewayResult);
+ }
}
void Handle(TEvCheckpointCoordinator::TEvZeroCheckpointDone::TPtr&) {
@@ -479,16 +675,10 @@ private:
return count;
}
- void UpdateAstAndPlan() {
+ void UpdateAstAndPlan(const TString& plan, const TString& expr) {
Yq::Private::PingTaskRequest request;
-
- TStringStream exprOut;
- TStringStream planOut;
- Program->Print(&exprOut, &planOut);
- const auto planStr = NJson2Yson::ConvertYson2Json(planOut.Str());
- request.set_ast(exprOut.Str());
- request.set_plan(planStr);
-
+ request.set_ast(expr);
+ request.set_plan(plan);
Send(Pinger, new TEvents::TEvForwardPingRequest(request));
}
@@ -670,6 +860,40 @@ private:
}
void Handle(NYql::NDqs::TEvQueryResponse::TPtr& ev) {
+
+ if (EvaluationInProgress) {
+
+ IDqGateway::TResult QueryResult;
+
+ auto& result = ev->Get()->Record;
+
+ LOG_D("Query evaluation response. Issues count: " << result.IssuesSize()
+ << ". Rows count: " << result.GetRowsCount());
+
+ QueryResult.Data = result.yson();
+
+ TIssues issues;
+ IssuesFromMessage(result.GetIssues(), issues);
+ bool error = false;
+ for (const auto& issue : issues) {
+ if (issue.GetSeverity() <= TSeverityIds::S_ERROR) {
+ error = true;
+ }
+ }
+
+ if (!error) {
+ QueryResult.SetSuccess();
+ }
+
+ QueryResult.AddIssues(issues);
+ QueryResult.Truncated = result.GetTruncated();
+ QueryResult.RowsCount = result.GetRowsCount();
+ EvaluationResult.SetValue(QueryResult);
+
+ EvaluationInProgress = false;
+ return;
+ }
+
SaveQueryResponse(ev);
const bool failure = Issues.Size() > 0;
@@ -801,6 +1025,50 @@ private:
RunNextDqGraph();
}
+ void RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) {
+
+ LOG_D("RunEvalDqGraph");
+
+ TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>();
+ dqConfiguration->Dispatch(dqGraphParams.GetSettings());
+ dqConfiguration->FreezeDefaults();
+ dqConfiguration->FallbackPolicy = "never";
+
+ ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false));
+
+ NActors::TActorId resultId;
+ if (dqGraphParams.GetResultType()) {
+ TVector<TString> columns;
+ for (const auto& column : dqGraphParams.GetColumns()) {
+ columns.emplace_back(column);
+ }
+
+ NActors::TActorId empty = {};
+ THashMap<TString, TString> emptySecureParams; // NOT USED in RR
+ resultId = NActors::TActivationContext::Register(
+ MakeResultReceiver(
+ columns, ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams,
+ dqGraphParams.GetResultType(), empty, false).Release());
+
+ } else {
+ LOG_D("ResultReceiver was NOT CREATED since ResultType is empty");
+ resultId = ExecuterId;
+ }
+
+ ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release());
+
+ Yql::DqsProto::ExecuteGraphRequest request;
+ request.SetSourceId(dqGraphParams.GetSourceId());
+ request.SetResultType(dqGraphParams.GetResultType());
+ request.SetSession(dqGraphParams.GetSession());
+ *request.MutableSettings() = dqGraphParams.GetSettings();
+ *request.MutableSecureParams() = dqGraphParams.GetSecureParams();
+ *request.MutableColumns() = dqGraphParams.GetColumns();
+ NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
+ NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId)));
+ LOG_D("Evaluation Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId);
+ }
+
void RunNextDqGraph() {
auto& dqGraphParams = DqGraphParams.at(DqGraphIndex);
TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>();
@@ -998,6 +1266,7 @@ private:
LOG_D("Is about to finish query with status " << YandexQuery::QueryMeta::ComputeStatus_Name(status));
Finishing = true;
FinalQueryStatus = status;
+
QueryStateUpdateRequest.set_status(FinalQueryStatus); // Can be changed later.
QueryStateUpdateRequest.set_status_code(NYql::NDqProto::StatusIds::SUCCESS);
*QueryStateUpdateRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
@@ -1068,7 +1337,75 @@ private:
RunNextDqGraph();
}
- bool CompileQuery() {
+ bool RunProgram(
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ ui64 nextUniqueId,
+ TVector<TDataProviderInitializer> dataProvidersInit,
+ NYql::IModuleResolver::TPtr& moduleResolver,
+ NYql::TGatewaysConfig gatewaysConfig,
+ const TString& sql,
+ const TString& sessionId,
+ NSQLTranslation::TTranslationSettings sqlSettings,
+ YandexQuery::ExecuteMode executeMode
+ ) {
+ TProgramFactory progFactory(false, functionRegistry, nextUniqueId, dataProvidersInit, "yq");
+ progFactory.SetModules(moduleResolver);
+ progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(functionRegistry, nullptr));
+ progFactory.SetGatewaysConfig(&gatewaysConfig);
+
+ Program = progFactory.Create("-stdin-", sql, sessionId);
+ Program->EnableResultPosition();
+
+ // parse phase
+ {
+ if (!Program->ParseSql(sqlSettings)) {
+ Issues.AddIssues(Program->Issues());
+ return false;
+
+ }
+
+ if (executeMode == YandexQuery::ExecuteMode::PARSE) {
+ return true;
+ }
+ }
+
+ // compile phase
+ {
+ if (!Program->Compile("")) {
+ Issues.AddIssues(Program->Issues());
+ return false;
+ }
+
+ if (executeMode == YandexQuery::ExecuteMode::COMPILE) {
+ return true;
+ }
+ }
+
+ // next phases can be async: optimize, validate, run
+ TProgram::TFutureStatus futureStatus;
+ switch (executeMode) {
+ case YandexQuery::ExecuteMode::EXPLAIN:
+ futureStatus = Program->OptimizeAsync("");
+ break;
+ case YandexQuery::ExecuteMode::VALIDATE:
+ futureStatus = Program->ValidateAsync("");
+ break;
+ case YandexQuery::ExecuteMode::RUN:
+ futureStatus = Program->RunAsync("");
+ break;
+ default:
+ Issues.AddIssue(TStringBuilder() << "Unexpected execute mode " << static_cast<int>(Params.ExecuteMode));
+ return false;
+ }
+
+ futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
+ actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
+ });
+
+ return true;
+ }
+
+ void RunProgram() {
LOG_D("Compiling query ...");
NYql::TGatewaysConfig gatewaysConfig;
SetupDqSettings(*gatewaysConfig.MutableDq());
@@ -1132,11 +1469,6 @@ private:
dataProvidersInit.push_back(GetSolomonDataProviderInitializer(solomonGateway, false));
}
- TProgramFactory progFactory(false, Params.FunctionRegistry, Params.NextUniqueId, dataProvidersInit, "yq");
- progFactory.SetModules(Params.ModuleResolver);
- progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(Params.FunctionRegistry, nullptr));
- progFactory.SetGatewaysConfig(&gatewaysConfig);
-
SessionId = TStringBuilder()
<< Params.QueryId << '#'
<< Params.ResultId << '#'
@@ -1144,9 +1476,6 @@ private:
<< Params.Owner << '#'
<< Params.CloudId;
- Program = progFactory.Create("-stdin-", Params.Sql, SessionId);
- Program->EnableResultPosition();
-
NSQLTranslation::TTranslationSettings sqlSettings;
sqlSettings.ClusterMapping = clusters;
sqlSettings.SyntaxVersion = 1;
@@ -1156,55 +1485,42 @@ private:
AddTableBindingsFromBindings(Params.Bindings, YqConnections, sqlSettings);
} catch (const std::exception& e) {
Issues.AddIssue(ExceptionToIssue(e));
- return false;
- }
-
- // parse phase
- {
- if (!Program->ParseSql(sqlSettings)) {
- Issues.AddIssues(Program->Issues());
- return false;
-
- }
-
- if (Params.ExecuteMode == YandexQuery::ExecuteMode::PARSE) {
- return true;
- }
+ FinishProgram(TProgram::TStatus::Error);
+ return;
}
- // compile phase
- {
- if (!Program->Compile("")) {
- Issues.AddIssues(Program->Issues());
- return false;
- }
-
- if (Params.ExecuteMode == YandexQuery::ExecuteMode::COMPILE) {
- return true;
- }
- }
+/*
+ return RunProgram(
+ Params.FunctionRegistry,
+ Params.NextUniqueId,
+ dataProvidersInit,
+ Params.ModuleResolver,
+ gatewaysConfig,
+ Params.Sql,
+ SessionId,
+ sqlSettings,
+ Params.ExecuteMode
+ );
+*/
+ ProgramRunnerId = Register(new TProgramRunnerActor(
+ SelfId(),
+ Params.FunctionRegistry,
+ Params.NextUniqueId,
+ dataProvidersInit,
+ Params.ModuleResolver,
+ gatewaysConfig,
+ Params.Sql,
+ SessionId,
+ sqlSettings,
+ Params.ExecuteMode
+ ));
+ }
- // next phases can be async: optimize, validate, run
- TProgram::TFutureStatus futureStatus;
- switch (Params.ExecuteMode) {
- case YandexQuery::ExecuteMode::EXPLAIN:
- futureStatus = Program->OptimizeAsync("");
- break;
- case YandexQuery::ExecuteMode::VALIDATE:
- futureStatus = Program->ValidateAsync("");
- break;
- case YandexQuery::ExecuteMode::RUN:
- futureStatus = Program->RunAsync("");
- break;
- default:
- Issues.AddIssue(TStringBuilder() << "Unexpected execute mode " << static_cast<int>(Params.ExecuteMode));
- return false;
+ void Handle(TEvPrivate::TEvProgramFinished::TPtr& ev) {
+ if (!Finishing) {
+ UpdateAstAndPlan(ev->Get()->Plan, ev->Get()->Expr);
+ FinishProgram(ev->Get()->Status, ev->Get()->Message, ev->Get()->Issues);
}
-
- futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) {
- actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f));
- });
- return true;
}
void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) {
@@ -1227,12 +1543,23 @@ private:
Issues.AddIssue(ExceptionToIssue(err));
}
- UpdateAstAndPlan();
+ TStringStream exprOut;
+ TStringStream planOut;
+ Program->Print(&exprOut, &planOut);
+
+ UpdateAstAndPlan(NJson2Yson::ConvertYson2Json(planOut.Str()), exprOut.Str());
+ FinishProgram(status);
+ }
+ void FinishProgram(NYql::TProgram::TStatus status, const TString& message = "", const NYql::TIssues& issues = {}) {
if (status == TProgram::TStatus::Ok || (DqGraphParams.size() > 0 && !DqGraphParams[0].GetResultType())) {
PrepareGraphs();
} else {
- Abort(TStringBuilder() << "Run query failed: " << ToString(status), YandexQuery::QueryMeta::FAILED, Program->Issues());
+ TString abortMessage = message;
+ if (abortMessage == "") {
+ abortMessage = TStringBuilder() << "Run query failed: " << ToString(status);
+ }
+ Abort(abortMessage, YandexQuery::QueryMeta::FAILED, issues);
}
}
@@ -1282,6 +1609,7 @@ private:
private:
TActorId FetcherId;
+ TActorId ProgramRunnerId;
TRunActorParams Params;
THashMap<TString, YandexQuery::Connection> YqConnections;
@@ -1293,6 +1621,8 @@ private:
TActorId Pinger;
TInstant CreatedAt;
YandexQuery::QueryAction Action = YandexQuery::QueryAction::QUERY_ACTION_UNSPECIFIED;
+ bool EvaluationInProgress = false;
+ NThreading::TPromise<NYql::IDqGateway::TResult> EvaluationResult;
std::vector<NYq::NProto::TGraphParams> DqGraphParams;
std::vector<i32> DqGrapResultIndices;
i32 DqGraphIndex = 0;
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index 7cdff0c5906..4179ea23049 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -209,6 +209,7 @@ struct TEvents {
{ }
NProto::TGraphParams GraphParams;
+ bool IsEvaluation = false;
NThreading::TPromise<NYql::IDqGateway::TResult> Result;
};
diff --git a/ydb/core/yq/libs/gateway/empty_gateway.cpp b/ydb/core/yq/libs/gateway/empty_gateway.cpp
index e544edf42f0..305852e663c 100644
--- a/ydb/core/yq/libs/gateway/empty_gateway.cpp
+++ b/ydb/core/yq/libs/gateway/empty_gateway.cpp
@@ -39,7 +39,6 @@ public:
Y_UNUSED(progressWriter);
Y_UNUSED(modulesMapping); // TODO: support.
Y_UNUSED(discard);
- Y_UNUSED(queryParams);
NProto::TGraphParams params;
THashMap<i64, TString> stagePrograms;
@@ -63,24 +62,12 @@ public:
}
params.SetSession(sessionId);
- NActors::TActivationContext::Send(new NActors::IEventHandle(RunActorId, {}, new TEvents::TEvGraphParams(params)));
-
auto result = NThreading::NewPromise<NYql::IDqGateway::TResult>();
- NYql::IDqGateway::TResult gatewayResult;
- // fake it till you make it
- // generate dummy result for YQL facade now, remove this gateway completely
- // when top-level YQL facade call like Preprocess() is implemented
- if (plan.ResultType) {
- // for resultable graphs return dummy "select 1" result (it is not used and is required to satisfy YQL facade only)
- gatewayResult.SetSuccess();
- gatewayResult.Data = "[[\001\0021]]";
- gatewayResult.Truncated = true;
- gatewayResult.RowsCount = 0;
- } else {
- // for resultless results expect infinite INSERT FROM SELECT and fail YQL facade (with well known secret code?)
- gatewayResult.Issues.AddIssues({NYql::TIssue("MAGIC BREAK").SetCode(555, NYql::TSeverityIds::S_ERROR)});
- }
- result.SetValue(gatewayResult);
+ auto event = MakeHolder<TEvents::TEvGraphParams>(params);
+ event->IsEvaluation = queryParams.Value("Evaluation", "") == "true";
+ event->Result = result;
+ NActors::TActivationContext::Send(new NActors::IEventHandle(RunActorId, {}, event.Release()));
+
return result;
}
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 8d1aaa87478..2b51232416f 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -789,6 +789,7 @@ private:
future = NThreading::MakeFuture<IDqGateway::TResult>(std::move(result));
}
} else {
+ graphParams["Evaluation"] = ctx.Step.IsDone(TExprStep::ExprEval) ? "false" : "true";
future = State->DqGateway->ExecutePlan(
State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);