aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-07-12 22:24:46 +0300
committerhor911 <hor911@ydb.tech>2023-07-12 22:24:46 +0300
commitd316ccc1ba779ac6d0e017e3d90a753bb3466835 (patch)
tree1f0b9edd31ff508a6ebe89b26c01f60f12b97cad
parentdba71ef6b2793b48a7a459fc452ac0ecf80babb1 (diff)
downloadydb-d316ccc1ba779ac6d0e017e3d90a753bb3466835.tar.gz
EvaluateExpr
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h1
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp49
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp7
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp108
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h7
-rw-r--r--ydb/core/kqp/ut/yql/kqp_yql_ut.cpp45
7 files changed, 212 insertions, 7 deletions
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index c4be1e7fa2..793cf008fe 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -170,6 +170,7 @@ public:
virtual NThreading::TFuture<TGenericResult> ModifyScheme(NKikimrSchemeOp::TModifyScheme&& modifyScheme) = 0;
/* Compute */
+ using NYql::IKikimrGateway::ExecuteLiteral;
virtual NThreading::TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request,
TQueryData::TPtr params, ui32 txIndex) = 0;
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 5079f336e6..c9d51a9ce6 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1777,6 +1777,55 @@ public:
}
}
+ TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override {
+ auto preparedQuery = std::make_unique<NKikimrKqp::TPreparedQuery>();
+ auto& phyQuery = *preparedQuery->MutablePhysicalQuery();
+ NKikimr::NKqp::IKqpGateway::TExecPhysicalRequest literalRequest(txAlloc);
+
+ literalRequest.NeedTxId = false;
+ literalRequest.MaxAffectedShards = 0;
+ literalRequest.TotalReadSizeLimitBytes = 0;
+ literalRequest.MkqlMemoryLimit = 100_MB;
+
+ auto& transaction = *phyQuery.AddTransactions();
+ transaction.SetType(NKqpProto::TKqpPhyTx::TYPE_COMPUTE);
+
+ auto& stage = *transaction.AddStages();
+ auto& stageProgram = *stage.MutableProgram();
+ stageProgram.SetRuntimeVersion(NYql::NDqProto::RUNTIME_VERSION_YQL_1_0);
+ stageProgram.SetRaw(program);
+ stage.SetOutputsCount(1);
+
+ auto& taskResult = *transaction.AddResults();
+ *taskResult.MutableItemType() = resultType;
+ auto& taskConnection = *taskResult.MutableConnection();
+ taskConnection.SetStageIndex(0);
+
+ NKikimr::NKqp::TPreparedQueryHolder queryHolder(preparedQuery.release(), txAlloc->HolderFactory.GetFunctionRegistry());
+
+ NKikimr::NKqp::TQueryData::TPtr params = std::make_shared<NKikimr::NKqp::TQueryData>(txAlloc);
+
+ literalRequest.Transactions.emplace_back(queryHolder.GetPhyTx(0), params);
+
+ return ExecuteLiteral(std::move(literalRequest), params, 0).Apply([](const auto& future) {
+ const auto& result = future.GetValue();
+
+ TExecuteLiteralResult literalResult;
+
+ if (result.Success()) {
+ YQL_ENSURE(result.Results.size() == 1);
+ literalResult.SetSuccess();
+ literalResult.Result = result.Results[0];
+ } else {
+ literalResult.SetStatus(result.Status());
+ literalResult.AddIssues(result.Issues());
+ }
+
+ return literalResult;
+ });
+ }
+
+
TFuture<TExecPhysicalResult> ExecuteLiteral(TExecPhysicalRequest&& request, TQueryData::TPtr params, ui32 txIndex) override {
YQL_ENSURE(!request.Transactions.empty());
YQL_ENSURE(request.DataShardLocks.empty());
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 70cd35bb36..71d75c0181 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -630,6 +630,13 @@ public:
return Gateway->GetCollectedSchemeData();
}
+ TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program,
+ const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) override
+ {
+ return Gateway->ExecuteLiteral(program, resultType, txAlloc);
+ }
+
+
private:
bool IsPrepare() const {
if (!SessionCtx) {
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 8738792815..d1e06b6948 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1574,7 +1574,7 @@ private:
.Add(TLogExprTransformer::Sync("YqlTransformer", NYql::NLog::EComponent::ProviderKqp,
NYql::NLog::ELevel::TRACE), "LogYqlTransform")
.AddPreTypeAnnotation()
- // TODO: .AddExpressionEvaluation(*FuncRegistry)
+ .AddExpressionEvaluation(*FuncRegistry)
.Add(new TFailExpressionEvaluation(), "FailExpressionEvaluation")
.AddIOAnnotation()
.AddTypeAnnotation()
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 3ac9147363..e659c7610c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -9,6 +9,8 @@
#include <ydb/library/yql/core/type_ann/type_ann_expr.h>
#include <ydb/library/yql/core/type_ann/type_ann_core.h>
#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
+#include <ydb/library/yql/providers/common/mkql/yql_provider_mkql.h>
+#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
@@ -16,6 +18,12 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
+#include <ydb/library/yql/dq/tasks/dq_task_program.h>
+
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+
+#include <ydb/core/kqp/provider/yql_kikimr_results.h>
+
namespace NYql {
namespace {
@@ -521,6 +529,68 @@ private:
};
class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<TKiSourceCallableExecutionTransformer> {
+
+private:
+
+ TString GetLambdaBody(TExprNode::TPtr resInput, NKikimrMiniKQL::TType& resultType, TExprContext& ctx) {
+
+ auto pos = resInput->Pos();
+ auto typeAnn = resInput->GetTypeAnn();
+
+ const auto kind = resInput->GetTypeAnn()->GetKind();
+ const bool data = kind != ETypeAnnotationKind::Flow && kind != ETypeAnnotationKind::List && kind != ETypeAnnotationKind::Stream && kind != ETypeAnnotationKind::Optional;
+ auto node = ctx.WrapByCallableIf(kind != ETypeAnnotationKind::Stream, "ToStream",
+ ctx.WrapByCallableIf(data, "Just", std::move(resInput)));
+
+ auto guard = Guard(SessionCtx->Query().QueryData->GetAllocState()->Alloc);
+
+ auto input = Build<TDqPhyStage>(ctx, pos)
+ .Inputs()
+ .Build()
+ .Program<TCoLambda>()
+ .Args({})
+ .Body(node)
+ .Build()
+ .Settings().Build()
+ .Done().Ptr();
+
+ NCommon::TMkqlCommonCallableCompiler compiler;
+
+ auto programLambda = TDqPhyStage(input).Program();
+
+ TVector<TExprBase> fakeReads;
+ auto paramsType = NDq::CollectParameters(programLambda, ctx);
+ auto lambda = NDq::BuildProgram(
+ programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
+ *SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
+ ctx, fakeReads);
+
+ NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
+ *SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());
+
+ TStringStream errorStream;
+ auto type = NYql::NCommon::BuildType(*typeAnn, programBuilder, errorStream);
+ ExportTypeToProto(type, resultType);
+
+ return lambda;
+ }
+
+ TString EncodeResultToYson(const NKikimrMiniKQL::TResult& result, bool& truncated) {
+ TStringStream ysonStream;
+ NYson::TYsonWriter writer(&ysonStream, NYson::EYsonFormat::Binary);
+ NYql::IDataProvider::TFillSettings fillSettings;
+ KikimrResultToYson(ysonStream, writer, result, {}, fillSettings, truncated);
+
+ TStringStream out;
+ NYson::TYsonWriter writer2((IOutputStream*)&out);
+ writer2.OnBeginMap();
+ writer2.OnKeyedItem("Data");
+ writer2.OnRaw(ysonStream.Str());
+ writer2.OnEndMap();
+
+ return out.Str();
+ }
+
public:
TKiSourceCallableExecutionTransformer(TIntrusivePtr<IKikimrGateway> gateway,
TIntrusivePtr<TKikimrSessionContext> sessionCtx)
@@ -546,14 +616,40 @@ public:
}
if (input->Content() == "Result") {
- auto resultInput = TExprBase(input->ChildPtr(0));
- auto exec = resultInput.Maybe<TCoNth>().Tuple().Maybe<TCoRight>().Input();
- YQL_ENSURE(exec.Maybe<TKiExecDataQuery>());
+ auto result = TMaybeNode<TResult>(input).Cast();
+ NKikimrMiniKQL::TType resultType;
+ auto program = GetLambdaBody(result.Input().Ptr(), resultType, ctx);
+ auto asyncResult = Gateway->ExecuteLiteral(program, resultType, SessionCtx->Query().QueryData->GetAllocState());
- ui32 index = FromString<ui32>(resultInput.Cast<TCoNth>().Index().Value());
- YQL_ENSURE(index == 0);
+ return std::make_pair(IGraphTransformer::TStatus::Async, asyncResult.Apply(
+ [this](const NThreading::TFuture<IKikimrGateway::TExecuteLiteralResult>& future) {
+ return TAsyncTransformCallback(
+ [future, this](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- return RunResOrPullForExec(TResOrPullBase(input), exec.Cast(), ctx, 0);
+ const auto& literalResult = future.GetValueSync();
+
+ if (!literalResult.Success()) {
+ for (const auto& issue : literalResult.Issues()) {
+ ctx.AddError(issue);
+ }
+ input->SetState(TExprNode::EState::Error);
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool truncated = false;
+ auto yson = this->EncodeResultToYson(literalResult.Result, truncated);
+ if (truncated) {
+ input->SetState(TExprNode::EState::Error);
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "EvaluteExpr result is too big and was truncated"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ output = input;
+ input->SetState(TExprNode::EState::ExecutionComplete);
+ input->SetResult(ctx.NewAtom(input->Pos(), yson));
+ return IGraphTransformer::TStatus::Ok;
+ });
+ }));
}
if (input->Content() == ConfigureName) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 8c977865f5..60bbcff13e 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -15,6 +15,7 @@
#include <ydb/services/metadata/abstract/kqp_common.h>
#include <ydb/services/metadata/manager/abstract.h>
+#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -689,6 +690,10 @@ public:
google::protobuf::RepeatedPtrField<NKqpProto::TResultSetMeta> ResultSetsMeta;
};
+ struct TExecuteLiteralResult : public TGenericResult {
+ NKikimrMiniKQL::TResult Result;
+ };
+
struct TLoadTableMetadataSettings {
TLoadTableMetadataSettings& WithTableStats(bool enable) {
RequestStats_ = enable;
@@ -794,6 +799,8 @@ public:
virtual TVector<TString> GetCollectedSchemeData() = 0;
+ virtual NThreading::TFuture<TExecuteLiteralResult> ExecuteLiteral(const TString& program, const NKikimrMiniKQL::TType& resultType, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc) = 0;
+
public:
using TCreateDirFunc = std::function<void(const TString&, const TString&, NThreading::TPromise<TGenericResult>)>;
diff --git a/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp b/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp
index 5b13979d91..0531f598dd 100644
--- a/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_yql_ut.cpp
@@ -416,6 +416,51 @@ Y_UNIT_TEST_SUITE(KqpYql) {
#
]])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST(EvaluateExpr1) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT EvaluateExpr(2+2);
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[
+ 4]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(EvaluateExpr2) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT EvaluateExpr(12+20);
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[
+ 32]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(EvaluateExpr3) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT EvaluateExpr( EvaluateExpr(2+2) + EvaluateExpr(5*5) );
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[
+ 29]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKqp