diff options
author | hor911 <hor911@ydb.tech> | 2023-07-12 22:24:46 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-07-12 22:24:46 +0300 |
commit | d316ccc1ba779ac6d0e017e3d90a753bb3466835 (patch) | |
tree | 1f0b9edd31ff508a6ebe89b26c01f60f12b97cad | |
parent | dba71ef6b2793b48a7a459fc452ac0ecf80babb1 (diff) | |
download | ydb-d316ccc1ba779ac6d0e017e3d90a753bb3466835.tar.gz |
EvaluateExpr
-rw-r--r-- | ydb/core/kqp/gateway/kqp_gateway.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 49 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 108 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_gateway.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/ut/yql/kqp_yql_ut.cpp | 45 |
7 files changed, 212 insertions, 7 deletions
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index c4be1e7fa2..793cf008fe 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -170,6 +170,7 @@ public: virtual NThreading::TFuture<TGenericResult> ModifyScheme(NKikimrSchemeOp::TModifyScheme&& modifyScheme) = 0; /* Compute */ + using NYql::IKikimrGateway::ExecuteLiteral; virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) = 0; diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 5079f336e6..c9d51a9ce6 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1777,6 +1777,55 @@ public: } } + TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override { + auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>(); + auto& phyQuery = *preparedQuery->MutablePhysicalQuery(); + NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc); + + literalRequest.NeedTxId = false; + literalRequest.MaxAffectedShards = 0; + literalRequest.TotalReadSizeLimitBytes = 0; + literalRequest.MkqlMemoryLimit = 100_MB; + + auto& transaction = *phyQuery.AddTransactions(); + transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE); + + auto& stage = *transaction.AddStages(); + auto& stageProgram = *stage.MutableProgram(); + stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0); + stageProgram.SetRaw(program); + stage.SetOutputsCount(1); + + auto& taskResult = *transaction.AddResults(); + *taskResult.MutableItemType() = resultType; + auto& taskConnection = *taskResult.MutableConnection(); + taskConnection.SetStageIndex(0); + + NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry()); + + NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc); + + literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params); + + return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) { + const auto& result = future.GetValue(); + + TExecuteLiteralResult literalResult; + + if (result.Success()) { + YQL_ENSURE(result.Results.size() == 1); + literalResult.SetSuccess(); + literalResult.Result = result.Results[0]; + } else { + literalResult.SetStatus(result.Status()); + literalResult.AddIssues(result.Issues()); + } + + return literalResult; + }); + } + + TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override { YQL_ENSURE(!request.Transactions.empty()); YQL_ENSURE(request.DataShardLocks.empty()); diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 70cd35bb36..71d75c0181 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -630,6 +630,13 @@ public: return Gateway->GetCollectedSchemeData(); } + TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, + const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override + { + return Gateway->ExecuteLiteral(program, resultType, txAlloc); + } + + private: bool IsPrepare() const { if (!SessionCtx) { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 8738792815..d1e06b6948 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1574,7 +1574,7 @@ private: .Add(TLogExprTransformer::Sync("YqlTransformer", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "LogYqlTransform") .AddPreTypeAnnotation() - // TODO: .AddExpressionEvaluation(*FuncRegistry) + .AddExpressionEvaluation(*FuncRegistry) .Add(new TFailExpressionEvaluation(), "FailExpressionEvaluation") .AddIOAnnotation() .AddTypeAnnotation() diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 3ac9147363..e659c7610c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -9,6 +9,8 @@ #include <ydb/library/yql/core/type_ann/type_ann_expr.h> #include <ydb/library/yql/core/type_ann/type_ann_core.h> #include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h> +#include <ydb/library/yql/providers/common/mkql/yql_provider_mkql.h> +#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> #include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h> #include <ydb/core/ydb_convert/ydb_convert.h> @@ -16,6 +18,12 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/api/protos/ydb_topic.pb.h> +#include <ydb/library/yql/dq/tasks/dq_task_program.h> + +#include <ydb/library/yql/minikql/mkql_program_builder.h> + +#include <ydb/core/kqp/provider/yql_kikimr_results.h> + namespace NYql { namespace { @@ -521,6 +529,68 @@ private: }; class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<TKiSourceCallableExecutionTransformer> { + +private: + + TString GetLambdaBody(TExprNode::TPtr resInput, NKikimrMiniKQL::TType& resultType, TExprContext& ctx) { + + auto pos = resInput->Pos(); + auto typeAnn = resInput->GetTypeAnn(); + + const auto kind = resInput->GetTypeAnn()->GetKind(); + const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::List && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional; + auto node = ctx.WrapByCallableIf(kind != ETypeAnnotationKind::Stream, "ToStream", + ctx.WrapByCallableIf(data, "Just", std::move(resInput))); + + auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc); + + auto input = Build<TDqPhyStage>(ctx, pos) + .Inputs() + .Build() + .Program<TCoLambda>() + .Args({}) + .Body(node) + .Build() + .Settings().Build() + .Done().Ptr(); + + NCommon::TMkqlCommonCallableCompiler compiler; + + auto programLambda = TDqPhyStage(input).Program(); + + TVector<TExprBase> fakeReads; + auto paramsType = NDq::CollectParameters(programLambda, ctx); + auto lambda = NDq::BuildProgram( + programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv, + *SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(), + ctx, fakeReads); + + NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv, + *SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry()); + + TStringStream errorStream; + auto type = NYql::NCommon::BuildType(*typeAnn, programBuilder, errorStream); + ExportTypeToProto(type, resultType); + + return lambda; + } + + TString EncodeResultToYson(const NKikimrMiniKQL::TResult& result, bool& truncated) { + TStringStream ysonStream; + NYson::TYsonWriter writer(&ysonStream, NYson::EYsonFormat::Binary); + NYql::IDataProvider::TFillSettings fillSettings; + KikimrResultToYson(ysonStream, writer, result, {}, fillSettings, truncated); + + TStringStream out; + NYson::TYsonWriter writer2((IOutputStream*)&out); + writer2.OnBeginMap(); + writer2.OnKeyedItem("Data"); + writer2.OnRaw(ysonStream.Str()); + writer2.OnEndMap(); + + return out.Str(); + } + public: TKiSourceCallableExecutionTransformer(TIntrusivePtr<IKikimrGateway> gateway, TIntrusivePtr<TKikimrSessionContext> sessionCtx) @@ -546,14 +616,40 @@ public: } if (input->Content() == "Result") { - auto resultInput = TExprBase(input->ChildPtr(0)); - auto exec = resultInput.Maybe<TCoNth>().Tuple().Maybe<TCoRight>().Input(); - YQL_ENSURE(exec.Maybe<TKiExecDataQuery>()); + auto result = TMaybeNode<TResult>(input).Cast(); + NKikimrMiniKQL::TType resultType; + auto program = GetLambdaBody(result.Input().Ptr(), resultType, ctx); + auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState()); - ui32 index = FromString<ui32>(resultInput.Cast<TCoNth>().Index().Value()); - YQL_ENSURE(index == 0); + return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply( + [this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) { + return TAsyncTransformCallback( + [future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - return RunResOrPullForExec(TResOrPullBase(input), exec.Cast(), ctx, 0); + const auto& literalResult = future.GetValueSync(); + + if (!literalResult.Success()) { + for (const auto& issue : literalResult.Issues()) { + ctx.AddError(issue); + } + input->SetState(TExprNode::EState::Error); + return IGraphTransformer::TStatus::Error; + } + + bool truncated = false; + auto yson = this->EncodeResultToYson(literalResult.Result, truncated); + if (truncated) { + input->SetState(TExprNode::EState::Error); + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated")); + return IGraphTransformer::TStatus::Error; + } + + output = input; + input->SetState(TExprNode::EState::ExecutionComplete); + input->SetResult(ctx.NewAtom(input->Pos(), yson)); + return IGraphTransformer::TStatus::Ok; + }); + })); } if (input->Content() == ConfigureName) { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 8c977865f5..60bbcff13e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -15,6 +15,7 @@ #include <ydb/services/metadata/abstract/kqp_common.h> #include <ydb/services/metadata/manager/abstract.h> +#include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/scheme/scheme_types_proto.h> @@ -689,6 +690,10 @@ public: google::protobuf::RepeatedPtrField<NKqpProto::TResultSetMeta> ResultSetsMeta; }; + struct TExecuteLiteralResult : public TGenericResult { + NKikimrMiniKQL::TResult Result; + }; + struct TLoadTableMetadataSettings { TLoadTableMetadataSettings& WithTableStats(bool enable) { RequestStats_ = enable; @@ -794,6 +799,8 @@ public: virtual TVector<TString> GetCollectedSchemeData() = 0; + virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0; + public: using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>; diff --git a/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp b/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp index 5b13979d91..0531f598dd 100644 --- a/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp @@ -416,6 +416,51 @@ Y_UNIT_TEST_SUITE(KqpYql) { # ]])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(EvaluateExpr1) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT EvaluateExpr(2+2); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[ + 4] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(EvaluateExpr2) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT EvaluateExpr(12+20); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[ + 32] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(EvaluateExpr3) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT EvaluateExpr( EvaluateExpr(2+2) + EvaluateExpr(5*5) ); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[ + 29] + ])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKqp |