diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-07-04 17:36:38 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-07-04 17:36:38 +0300 |
commit | 3c9dfb5ccfd56a66319bd512f35a741b62847e3f (patch) | |
tree | 325868b5f866ac8edc25bf6ccdeb1a39c9775416 | |
parent | d3988e8cf35b73cd6181d95e06d159d8e931acb9 (diff) | |
download | ydb-3c9dfb5ccfd56a66319bd512f35a741b62847e3f.tar.gz |
Async YQL Facade execution and evaluation graph result support
ref:84bdf5f56eeb786255e30a7475a5d85af2e6ac30
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 5 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 480 | ||||
-rw-r--r-- | ydb/core/yq/libs/events/events.h | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/gateway/empty_gateway.cpp | 23 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 1 |
5 files changed, 413 insertions, 97 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index aec8826f62d..387936ece00 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -159,11 +159,8 @@ public: NActors::IActor::PassAway(); } - void Bootstrap(const TActorContext& ctx) { + void Bootstrap() { Become(&TPendingFetcher::StateFunc); - - Y_UNUSED(ctx); - DatabaseResolver = Register(CreateDatabaseResolver(MakeYqlAnalyticsHttpProxyId(), CredentialsFactory)); Send(SelfId(), new NActors::TEvents::TEvWakeup()); diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index bff5d831bd7..00f7acb5784 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -17,6 +17,7 @@ #include <ydb/library/yql/providers/dq/actors/executer_actor.h> #include <ydb/library/yql/providers/dq/actors/proto_builder.h> #include <ydb/library/yql/providers/dq/actors/task_controller.h> +#include <ydb/library/yql/providers/dq/actors/result_receiver.h> #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> #include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> @@ -83,10 +84,187 @@ using namespace NActors; using namespace NYql; using namespace NDqs; +namespace { + +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvProgramFinished = EvBegin, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + // Events + struct TEvProgramFinished : public NActors::TEventLocal<TEvProgramFinished, EvProgramFinished> { + TEvProgramFinished(TIssues issues, const TString& plan, const TString& expr, NYql::TProgram::TStatus status, const TString& message) + : Issues(issues), Plan(plan), Expr(expr), Status(status), Message(message) + { + } + + TIssues Issues; + TString Plan; + TString Expr; + NYql::TProgram::TStatus Status; + TString Message; + }; +}; + +} + +class TProgramRunnerActor : public NActors::TActorBootstrapped<TProgramRunnerActor> { +public: + TProgramRunnerActor( + const TActorId& runActorId, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + ui64 nextUniqueId, + TVector<TDataProviderInitializer> dataProvidersInit, + NYql::IModuleResolver::TPtr& moduleResolver, + NYql::TGatewaysConfig gatewaysConfig, + const TString& sql, + const TString& sessionId, + NSQLTranslation::TTranslationSettings sqlSettings, + YandexQuery::ExecuteMode executeMode + ) + : RunActorId(runActorId), + FunctionRegistry(functionRegistry), + NextUniqueId(nextUniqueId), + DataProvidersInit(dataProvidersInit), + ModuleResolver(moduleResolver), + GatewaysConfig(gatewaysConfig), + Sql(sql), + SessionId(sessionId), + SqlSettings(sqlSettings), + ExecuteMode(executeMode) + { + } + + void Bootstrap(const TActorContext& ctx) { + TProgramFactory progFactory(false, FunctionRegistry, NextUniqueId, DataProvidersInit, "yq"); + progFactory.SetModules(ModuleResolver); + progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry, nullptr)); + progFactory.SetGatewaysConfig(&GatewaysConfig); + + Program = progFactory.Create("-stdin-", Sql, SessionId); + Program->EnableResultPosition(); + + // parse phase + { + if (!Program->ParseSql(SqlSettings)) { + Issues.AddIssues(Program->Issues()); + SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to parse query"); + return; + } + + if (ExecuteMode == YandexQuery::ExecuteMode::PARSE) { + SendStatusAndDie(ctx, TProgram::TStatus::Ok); + return; + } + } + + // compile phase + { + if (!Program->Compile("")) { + Issues.AddIssues(Program->Issues()); + SendStatusAndDie(ctx, TProgram::TStatus::Error, "Failed to compile query"); + return; + } + + if (ExecuteMode == YandexQuery::ExecuteMode::COMPILE) { + SendStatusAndDie(ctx, TProgram::TStatus::Ok); + return; + } + } + + Compiled = true; + + // next phases can be async: optimize, validate, run + TProgram::TFutureStatus futureStatus; + switch (ExecuteMode) { + case YandexQuery::ExecuteMode::EXPLAIN: + futureStatus = Program->OptimizeAsync(""); + break; + case YandexQuery::ExecuteMode::VALIDATE: + futureStatus = Program->ValidateAsync(""); + break; + case YandexQuery::ExecuteMode::RUN: + futureStatus = Program->RunAsync(""); + break; + default: + SendStatusAndDie(ctx, TProgram::TStatus::Error, TStringBuilder() << "Unexpected execute mode " << static_cast<int>(ExecuteMode)); + return; + } + + futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { + actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); + }); + + Become(&TProgramRunnerActor::StateFunc); + } + + void SendStatusAndDie(const TActorContext& ctx, NYql::TProgram::TStatus status, const TString& message = "") { + TString expr; + TString plan; + if (Compiled) { + TStringStream exprOut; + TStringStream planOut; + Program->Print(&exprOut, &planOut); + plan = NJson2Yson::ConvertYson2Json(planOut.Str()); + expr = exprOut.Str(); + } + Issues.AddIssues(Program->Issues()); + Send(RunActorId, new TEvPrivate::TEvProgramFinished(Issues, plan, expr, status, message)); + Die(ctx); + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvAsyncContinue, Handle); + ) + + void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) { + NYql::TProgram::TStatus status = TProgram::TStatus::Error; + + const auto& f = ev->Get()->Future; + try { + status = f.GetValue(); + if (status == TProgram::TStatus::Async) { + auto futureStatus = Program->ContinueAsync(); + auto actorSystem = ctx.ActorSystem(); + auto selfId = ctx.SelfID; + futureStatus.Subscribe([actorSystem, selfId](const TProgram::TFutureStatus& f) { + actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); + }); + return; + } + } catch (const std::exception& err) { + Issues.AddIssue(ExceptionToIssue(err)); + } + SendStatusAndDie(ctx, status); + } + +private: + TProgramPtr Program; + TIssues Issues; + TActorId RunActorId; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; + ui64 NextUniqueId; + TVector<TDataProviderInitializer> DataProvidersInit; + NYql::IModuleResolver::TPtr ModuleResolver; + NYql::TGatewaysConfig GatewaysConfig; + const TString Sql; + const TString SessionId; + NSQLTranslation::TTranslationSettings SqlSettings; + YandexQuery::ExecuteMode ExecuteMode; + bool Compiled = false; +}; + class TRunActor : public NActors::TActorBootstrapped<TRunActor> { public: explicit TRunActor( - const NActors::TActorId& fetcherId + const TActorId& fetcherId , const ::NYql::NCommon::TServiceCounters& queryCounters , TRunActorParams&& params) : FetcherId(fetcherId) @@ -137,6 +315,7 @@ private: } STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvProgramFinished, Handle); HFunc(TEvents::TEvAsyncContinue, Handle); hFunc(NActors::TEvents::TEvUndelivered, Handle); hFunc(TEvents::TEvGraphParams, Handle); @@ -241,13 +420,6 @@ private: } } - void RunProgram() { - LOG_D("RunProgram"); - if (!CompileQuery()) { - Abort("Failed to compile query", YandexQuery::QueryMeta::FAILED); - } - } - void FailOnException() { Fail(CurrentExceptionMessage()); } @@ -453,8 +625,32 @@ private: } void Handle(TEvents::TEvGraphParams::TPtr& ev) { - LOG_D("Graph params with tasks: " << ev->Get()->GraphParams.TasksSize()); - DqGraphParams.push_back(ev->Get()->GraphParams); + LOG_D("Graph (" << (ev->Get()->IsEvaluation ? "evaluation" : "execution") << ") with tasks: " << ev->Get()->GraphParams.TasksSize()); + + if (ev->Get()->IsEvaluation) { + Y_ASSERT(!EvaluationInProgress); + EvaluationInProgress = true; + EvaluationResult = ev->Get()->Result; + RunEvalDqGraph(ev->Get()->GraphParams); + } else { + DqGraphParams.push_back(ev->Get()->GraphParams); + + NYql::IDqGateway::TResult gatewayResult; + // fake it till you make it + // generate dummy result for YQL facade now, remove this gateway completely + // when top-level YQL facade call like Preprocess() is implemented + if (ev->Get()->GraphParams.GetResultType()) { + // for resultable graphs return dummy "select 1" result (it is not used and is required to satisfy YQL facade only) + gatewayResult.SetSuccess(); + gatewayResult.Data = "[[\001\0021]]"; + gatewayResult.Truncated = true; + gatewayResult.RowsCount = 0; + } else { + // for resultless results expect infinite INSERT FROM SELECT and fail YQL facade (with well known secret code?) + gatewayResult.Issues.AddIssues({NYql::TIssue("MAGIC BREAK").SetCode(555, NYql::TSeverityIds::S_ERROR)}); + } + ev->Get()->Result.SetValue(gatewayResult); + } } void Handle(TEvCheckpointCoordinator::TEvZeroCheckpointDone::TPtr&) { @@ -479,16 +675,10 @@ private: return count; } - void UpdateAstAndPlan() { + void UpdateAstAndPlan(const TString& plan, const TString& expr) { Yq::Private::PingTaskRequest request; - - TStringStream exprOut; - TStringStream planOut; - Program->Print(&exprOut, &planOut); - const auto planStr = NJson2Yson::ConvertYson2Json(planOut.Str()); - request.set_ast(exprOut.Str()); - request.set_plan(planStr); - + request.set_ast(expr); + request.set_plan(plan); Send(Pinger, new TEvents::TEvForwardPingRequest(request)); } @@ -670,6 +860,40 @@ private: } void Handle(NYql::NDqs::TEvQueryResponse::TPtr& ev) { + + if (EvaluationInProgress) { + + IDqGateway::TResult QueryResult; + + auto& result = ev->Get()->Record; + + LOG_D("Query evaluation response. Issues count: " << result.IssuesSize() + << ". Rows count: " << result.GetRowsCount()); + + QueryResult.Data = result.yson(); + + TIssues issues; + IssuesFromMessage(result.GetIssues(), issues); + bool error = false; + for (const auto& issue : issues) { + if (issue.GetSeverity() <= TSeverityIds::S_ERROR) { + error = true; + } + } + + if (!error) { + QueryResult.SetSuccess(); + } + + QueryResult.AddIssues(issues); + QueryResult.Truncated = result.GetTruncated(); + QueryResult.RowsCount = result.GetRowsCount(); + EvaluationResult.SetValue(QueryResult); + + EvaluationInProgress = false; + return; + } + SaveQueryResponse(ev); const bool failure = Issues.Size() > 0; @@ -801,6 +1025,50 @@ private: RunNextDqGraph(); } + void RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) { + + LOG_D("RunEvalDqGraph"); + + TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); + dqConfiguration->Dispatch(dqGraphParams.GetSettings()); + dqConfiguration->FreezeDefaults(); + dqConfiguration->FallbackPolicy = "never"; + + ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false)); + + NActors::TActorId resultId; + if (dqGraphParams.GetResultType()) { + TVector<TString> columns; + for (const auto& column : dqGraphParams.GetColumns()) { + columns.emplace_back(column); + } + + NActors::TActorId empty = {}; + THashMap<TString, TString> emptySecureParams; // NOT USED in RR + resultId = NActors::TActivationContext::Register( + MakeResultReceiver( + columns, ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams, + dqGraphParams.GetResultType(), empty, false).Release()); + + } else { + LOG_D("ResultReceiver was NOT CREATED since ResultType is empty"); + resultId = ExecuterId; + } + + ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); + + Yql::DqsProto::ExecuteGraphRequest request; + request.SetSourceId(dqGraphParams.GetSourceId()); + request.SetResultType(dqGraphParams.GetResultType()); + request.SetSession(dqGraphParams.GetSession()); + *request.MutableSettings() = dqGraphParams.GetSettings(); + *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); + *request.MutableColumns() = dqGraphParams.GetColumns(); + NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); + NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId))); + LOG_D("Evaluation Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId); + } + void RunNextDqGraph() { auto& dqGraphParams = DqGraphParams.at(DqGraphIndex); TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); @@ -998,6 +1266,7 @@ private: LOG_D("Is about to finish query with status " << YandexQuery::QueryMeta::ComputeStatus_Name(status)); Finishing = true; FinalQueryStatus = status; + QueryStateUpdateRequest.set_status(FinalQueryStatus); // Can be changed later. QueryStateUpdateRequest.set_status_code(NYql::NDqProto::StatusIds::SUCCESS); *QueryStateUpdateRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); @@ -1068,7 +1337,75 @@ private: RunNextDqGraph(); } - bool CompileQuery() { + bool RunProgram( + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + ui64 nextUniqueId, + TVector<TDataProviderInitializer> dataProvidersInit, + NYql::IModuleResolver::TPtr& moduleResolver, + NYql::TGatewaysConfig gatewaysConfig, + const TString& sql, + const TString& sessionId, + NSQLTranslation::TTranslationSettings sqlSettings, + YandexQuery::ExecuteMode executeMode + ) { + TProgramFactory progFactory(false, functionRegistry, nextUniqueId, dataProvidersInit, "yq"); + progFactory.SetModules(moduleResolver); + progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(functionRegistry, nullptr)); + progFactory.SetGatewaysConfig(&gatewaysConfig); + + Program = progFactory.Create("-stdin-", sql, sessionId); + Program->EnableResultPosition(); + + // parse phase + { + if (!Program->ParseSql(sqlSettings)) { + Issues.AddIssues(Program->Issues()); + return false; + + } + + if (executeMode == YandexQuery::ExecuteMode::PARSE) { + return true; + } + } + + // compile phase + { + if (!Program->Compile("")) { + Issues.AddIssues(Program->Issues()); + return false; + } + + if (executeMode == YandexQuery::ExecuteMode::COMPILE) { + return true; + } + } + + // next phases can be async: optimize, validate, run + TProgram::TFutureStatus futureStatus; + switch (executeMode) { + case YandexQuery::ExecuteMode::EXPLAIN: + futureStatus = Program->OptimizeAsync(""); + break; + case YandexQuery::ExecuteMode::VALIDATE: + futureStatus = Program->ValidateAsync(""); + break; + case YandexQuery::ExecuteMode::RUN: + futureStatus = Program->RunAsync(""); + break; + default: + Issues.AddIssue(TStringBuilder() << "Unexpected execute mode " << static_cast<int>(Params.ExecuteMode)); + return false; + } + + futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { + actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); + }); + + return true; + } + + void RunProgram() { LOG_D("Compiling query ..."); NYql::TGatewaysConfig gatewaysConfig; SetupDqSettings(*gatewaysConfig.MutableDq()); @@ -1132,11 +1469,6 @@ private: dataProvidersInit.push_back(GetSolomonDataProviderInitializer(solomonGateway, false)); } - TProgramFactory progFactory(false, Params.FunctionRegistry, Params.NextUniqueId, dataProvidersInit, "yq"); - progFactory.SetModules(Params.ModuleResolver); - progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(Params.FunctionRegistry, nullptr)); - progFactory.SetGatewaysConfig(&gatewaysConfig); - SessionId = TStringBuilder() << Params.QueryId << '#' << Params.ResultId << '#' @@ -1144,9 +1476,6 @@ private: << Params.Owner << '#' << Params.CloudId; - Program = progFactory.Create("-stdin-", Params.Sql, SessionId); - Program->EnableResultPosition(); - NSQLTranslation::TTranslationSettings sqlSettings; sqlSettings.ClusterMapping = clusters; sqlSettings.SyntaxVersion = 1; @@ -1156,55 +1485,42 @@ private: AddTableBindingsFromBindings(Params.Bindings, YqConnections, sqlSettings); } catch (const std::exception& e) { Issues.AddIssue(ExceptionToIssue(e)); - return false; - } - - // parse phase - { - if (!Program->ParseSql(sqlSettings)) { - Issues.AddIssues(Program->Issues()); - return false; - - } - - if (Params.ExecuteMode == YandexQuery::ExecuteMode::PARSE) { - return true; - } + FinishProgram(TProgram::TStatus::Error); + return; } - // compile phase - { - if (!Program->Compile("")) { - Issues.AddIssues(Program->Issues()); - return false; - } - - if (Params.ExecuteMode == YandexQuery::ExecuteMode::COMPILE) { - return true; - } - } +/* + return RunProgram( + Params.FunctionRegistry, + Params.NextUniqueId, + dataProvidersInit, + Params.ModuleResolver, + gatewaysConfig, + Params.Sql, + SessionId, + sqlSettings, + Params.ExecuteMode + ); +*/ + ProgramRunnerId = Register(new TProgramRunnerActor( + SelfId(), + Params.FunctionRegistry, + Params.NextUniqueId, + dataProvidersInit, + Params.ModuleResolver, + gatewaysConfig, + Params.Sql, + SessionId, + sqlSettings, + Params.ExecuteMode + )); + } - // next phases can be async: optimize, validate, run - TProgram::TFutureStatus futureStatus; - switch (Params.ExecuteMode) { - case YandexQuery::ExecuteMode::EXPLAIN: - futureStatus = Program->OptimizeAsync(""); - break; - case YandexQuery::ExecuteMode::VALIDATE: - futureStatus = Program->ValidateAsync(""); - break; - case YandexQuery::ExecuteMode::RUN: - futureStatus = Program->RunAsync(""); - break; - default: - Issues.AddIssue(TStringBuilder() << "Unexpected execute mode " << static_cast<int>(Params.ExecuteMode)); - return false; + void Handle(TEvPrivate::TEvProgramFinished::TPtr& ev) { + if (!Finishing) { + UpdateAstAndPlan(ev->Get()->Plan, ev->Get()->Expr); + FinishProgram(ev->Get()->Status, ev->Get()->Message, ev->Get()->Issues); } - - futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { - actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); - }); - return true; } void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) { @@ -1227,12 +1543,23 @@ private: Issues.AddIssue(ExceptionToIssue(err)); } - UpdateAstAndPlan(); + TStringStream exprOut; + TStringStream planOut; + Program->Print(&exprOut, &planOut); + + UpdateAstAndPlan(NJson2Yson::ConvertYson2Json(planOut.Str()), exprOut.Str()); + FinishProgram(status); + } + void FinishProgram(NYql::TProgram::TStatus status, const TString& message = "", const NYql::TIssues& issues = {}) { if (status == TProgram::TStatus::Ok || (DqGraphParams.size() > 0 && !DqGraphParams[0].GetResultType())) { PrepareGraphs(); } else { - Abort(TStringBuilder() << "Run query failed: " << ToString(status), YandexQuery::QueryMeta::FAILED, Program->Issues()); + TString abortMessage = message; + if (abortMessage == "") { + abortMessage = TStringBuilder() << "Run query failed: " << ToString(status); + } + Abort(abortMessage, YandexQuery::QueryMeta::FAILED, issues); } } @@ -1282,6 +1609,7 @@ private: private: TActorId FetcherId; + TActorId ProgramRunnerId; TRunActorParams Params; THashMap<TString, YandexQuery::Connection> YqConnections; @@ -1293,6 +1621,8 @@ private: TActorId Pinger; TInstant CreatedAt; YandexQuery::QueryAction Action = YandexQuery::QueryAction::QUERY_ACTION_UNSPECIFIED; + bool EvaluationInProgress = false; + NThreading::TPromise<NYql::IDqGateway::TResult> EvaluationResult; std::vector<NYq::NProto::TGraphParams> DqGraphParams; std::vector<i32> DqGrapResultIndices; i32 DqGraphIndex = 0; diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index 7cdff0c5906..4179ea23049 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -209,6 +209,7 @@ struct TEvents { { } NProto::TGraphParams GraphParams; + bool IsEvaluation = false; NThreading::TPromise<NYql::IDqGateway::TResult> Result; }; diff --git a/ydb/core/yq/libs/gateway/empty_gateway.cpp b/ydb/core/yq/libs/gateway/empty_gateway.cpp index e544edf42f0..305852e663c 100644 --- a/ydb/core/yq/libs/gateway/empty_gateway.cpp +++ b/ydb/core/yq/libs/gateway/empty_gateway.cpp @@ -39,7 +39,6 @@ public: Y_UNUSED(progressWriter); Y_UNUSED(modulesMapping); // TODO: support. Y_UNUSED(discard); - Y_UNUSED(queryParams); NProto::TGraphParams params; THashMap<i64, TString> stagePrograms; @@ -63,24 +62,12 @@ public: } params.SetSession(sessionId); - NActors::TActivationContext::Send(new NActors::IEventHandle(RunActorId, {}, new TEvents::TEvGraphParams(params))); - auto result = NThreading::NewPromise<NYql::IDqGateway::TResult>(); - NYql::IDqGateway::TResult gatewayResult; - // fake it till you make it - // generate dummy result for YQL facade now, remove this gateway completely - // when top-level YQL facade call like Preprocess() is implemented - if (plan.ResultType) { - // for resultable graphs return dummy "select 1" result (it is not used and is required to satisfy YQL facade only) - gatewayResult.SetSuccess(); - gatewayResult.Data = "[[\001\0021]]"; - gatewayResult.Truncated = true; - gatewayResult.RowsCount = 0; - } else { - // for resultless results expect infinite INSERT FROM SELECT and fail YQL facade (with well known secret code?) - gatewayResult.Issues.AddIssues({NYql::TIssue("MAGIC BREAK").SetCode(555, NYql::TSeverityIds::S_ERROR)}); - } - result.SetValue(gatewayResult); + auto event = MakeHolder<TEvents::TEvGraphParams>(params); + event->IsEvaluation = queryParams.Value("Evaluation", "") == "true"; + event->Result = result; + NActors::TActivationContext::Send(new NActors::IEventHandle(RunActorId, {}, event.Release())); + return result; } diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 8d1aaa87478..2b51232416f 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -789,6 +789,7 @@ private: future = NThreading::MakeFuture<IDqGateway::TResult>(std::move(result)); } } else { + graphParams["Evaluation"] = ctx.Step.IsDone(TExprStep::ExprEval) ? "false" : "true"; future = State->DqGateway->ExecutePlan( State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams, settings, progressWriter, ModulesMapping, fillSettings.Discard); |