diff options
author | spuchin <spuchin@ydb.tech> | 2023-04-10 11:07:00 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-04-10 11:07:00 +0300 |
commit | de6ebe03178907936cd22e28f9dc4acf7f189d0d (patch) | |
tree | 7ff57ddf4680408eb53c4a58298cc752ae48af60 | |
parent | b38e48c6cd4b18e00bce01b943325dc772ec941c (diff) | |
download | ydb-de6ebe03178907936cd22e28f9dc4acf7f189d0d.tar.gz |
Avoid CnValue in query results. ()
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_phy_check.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 12 |
4 files changed, 60 insertions, 7 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp index 6b5c08e81c..90ab8da99a 100644 --- a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp +++ b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp @@ -20,6 +20,20 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() { auto query = TKqlQuery(input); YQL_ENSURE(query.Ref().GetTypeAnn()); + for (const auto& result : query.Results()) { + if (!result.Value().Maybe<TDqConnection>()) { + ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), "Failed to build query results.")); + return TStatus::Error; + } + + if (!result.Value().Maybe<TDqCnUnionAll>()) { + ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), TStringBuilder() + << "Unexpected query result connection: " + << result.Value().Cast<TDqConnection>().CallableName())); + return TStatus::Error; + } + } + for (const auto& effect : query.Effects()) { if (!effect.Maybe<TDqOutput>()) { ctx.AddError(TIssue(ctx.GetPosition(effect.Pos()), "Failed to build query effects.")); diff --git a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp index d596b66759..8255e13914 100644 --- a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp +++ b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp @@ -14,6 +14,20 @@ using TStatus = IGraphTransformer::TStatus; namespace { +TExprBase BuildValueResult(const TDqCnValue& cn, TExprContext& ctx) { + YQL_ENSURE(cn.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List); + + return Build<TCoFlatMap>(ctx, cn.Pos()) + .Input<TDqCnUnionAll>() + .Output(cn.Output()) + .Build() + .Lambda() + .Args({"list"}) + .Body("list") + .Build() + .Done(); +} + TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { @@ -68,7 +82,15 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP // TODO: Missing support for DqCnValue results in scan queries if (node.Maybe<TDqPhyPrecompute>() && omitResultPrecomputes && !kqpCtx.IsScanQuery()) { - replaces[node.Raw()] = node.Cast<TDqPhyPrecompute>().Connection().Ptr(); + YQL_CLOG(DEBUG, ProviderKqp) << "Building precompute result #" << node.Raw()->UniqueId(); + + auto connection = node.Cast<TDqPhyPrecompute>().Connection(); + if (connection.Maybe<TDqCnValue>()) { + replaces[node.Raw()] = BuildValueResult(connection.Cast<TDqCnValue>(), ctx).Ptr(); + } else { + YQL_ENSURE(connection.Maybe<TDqCnUnionAll>()); + replaces[node.Raw()] = connection.Ptr(); + } } else { auto result = DqBuildPureExprStage(node, ctx); if (result.Raw() != node.Raw()) { @@ -77,9 +99,13 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP } } } - output = ctx.ReplaceNodes(TExprNode::TPtr(input), replaces); - return TStatus::Ok; + if (replaces.empty()) { + return TStatus::Ok; + } + + output = ctx.ReplaceNodes(TExprNode::TPtr(input), replaces); + return TStatus(TStatus::Repeat, true); } TStatus KqpBuildUnionResult(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 1e16ec91c5..da719d8f2b 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -491,11 +491,12 @@ public: YQL_ENSURE(txIndex < queryProto.TransactionsSize()); YQL_ENSURE(resultIndex < queryProto.GetTransactions(txIndex).ResultsSize()); + YQL_ENSURE(queryProto.GetTransactions(txIndex).GetResults(resultIndex).GetIsStream()); - auto& bindingProto = *queryProto.AddResultBindings(); - auto& txResultProto = *bindingProto.MutableTxResultBinding(); - txResultProto.SetTxIndex(txIndex); - txResultProto.SetResultIndex(resultIndex); + auto& queryBindingProto = *queryProto.AddResultBindings(); + auto& txBindingProto = *queryBindingProto.MutableTxResultBinding(); + txBindingProto.SetTxIndex(txIndex); + txBindingProto.SetResultIndex(resultIndex); } return true; diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 5db75604d8..011dfbac23 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -86,6 +86,18 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { ])", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST(ExecuteQueryScalar) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto result = db.ExecuteQuery(R"( + SELECT COUNT(*) FROM EightShard; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); + } + Y_UNIT_TEST(ExecuteScript) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); |