aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-04-10 11:07:00 +0300
committerspuchin <spuchin@ydb.tech>2023-04-10 11:07:00 +0300
commitde6ebe03178907936cd22e28f9dc4acf7f189d0d (patch)
tree7ff57ddf4680408eb53c4a58298cc752ae48af60
parentb38e48c6cd4b18e00bce01b943325dc772ec941c (diff)
downloadydb-de6ebe03178907936cd22e28f9dc4acf7f189d0d.tar.gz
Avoid CnValue in query results. ()
-rw-r--r--ydb/core/kqp/opt/kqp_opt_phy_check.cpp14
-rw-r--r--ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp32
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp9
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp12
4 files changed, 60 insertions, 7 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
index 6b5c08e81c..90ab8da99a 100644
--- a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
@@ -20,6 +20,20 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() {
auto query = TKqlQuery(input);
YQL_ENSURE(query.Ref().GetTypeAnn());
+ for (const auto& result : query.Results()) {
+ if (!result.Value().Maybe<TDqConnection>()) {
+ ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), "Failed to build query results."));
+ return TStatus::Error;
+ }
+
+ if (!result.Value().Maybe<TDqCnUnionAll>()) {
+ ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), TStringBuilder()
+ << "Unexpected query result connection: "
+ << result.Value().Cast<TDqConnection>().CallableName()));
+ return TStatus::Error;
+ }
+ }
+
for (const auto& effect : query.Effects()) {
if (!effect.Maybe<TDqOutput>()) {
ctx.AddError(TIssue(ctx.GetPosition(effect.Pos()), "Failed to build query effects."));
diff --git a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
index d596b66759..8255e13914 100644
--- a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
@@ -14,6 +14,20 @@ using TStatus = IGraphTransformer::TStatus;
namespace {
+TExprBase BuildValueResult(const TDqCnValue& cn, TExprContext& ctx) {
+ YQL_ENSURE(cn.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List);
+
+ return Build<TCoFlatMap>(ctx, cn.Pos())
+ .Input<TDqCnUnionAll>()
+ .Output(cn.Output())
+ .Build()
+ .Lambda()
+ .Args({"list"})
+ .Body("list")
+ .Build()
+ .Done();
+}
+
TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx)
{
@@ -68,7 +82,15 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP
// TODO: Missing support for DqCnValue results in scan queries
if (node.Maybe<TDqPhyPrecompute>() && omitResultPrecomputes && !kqpCtx.IsScanQuery()) {
- replaces[node.Raw()] = node.Cast<TDqPhyPrecompute>().Connection().Ptr();
+ YQL_CLOG(DEBUG, ProviderKqp) << "Building precompute result #" << node.Raw()->UniqueId();
+
+ auto connection = node.Cast<TDqPhyPrecompute>().Connection();
+ if (connection.Maybe<TDqCnValue>()) {
+ replaces[node.Raw()] = BuildValueResult(connection.Cast<TDqCnValue>(), ctx).Ptr();
+ } else {
+ YQL_ENSURE(connection.Maybe<TDqCnUnionAll>());
+ replaces[node.Raw()] = connection.Ptr();
+ }
} else {
auto result = DqBuildPureExprStage(node, ctx);
if (result.Raw() != node.Raw()) {
@@ -77,9 +99,13 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP
}
}
}
- output = ctx.ReplaceNodes(TExprNode::TPtr(input), replaces);
- return TStatus::Ok;
+ if (replaces.empty()) {
+ return TStatus::Ok;
+ }
+
+ output = ctx.ReplaceNodes(TExprNode::TPtr(input), replaces);
+ return TStatus(TStatus::Repeat, true);
}
TStatus KqpBuildUnionResult(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 1e16ec91c5..da719d8f2b 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -491,11 +491,12 @@ public:
YQL_ENSURE(txIndex < queryProto.TransactionsSize());
YQL_ENSURE(resultIndex < queryProto.GetTransactions(txIndex).ResultsSize());
+ YQL_ENSURE(queryProto.GetTransactions(txIndex).GetResults(resultIndex).GetIsStream());
- auto& bindingProto = *queryProto.AddResultBindings();
- auto& txResultProto = *bindingProto.MutableTxResultBinding();
- txResultProto.SetTxIndex(txIndex);
- txResultProto.SetResultIndex(resultIndex);
+ auto& queryBindingProto = *queryProto.AddResultBindings();
+ auto& txBindingProto = *queryBindingProto.MutableTxResultBinding();
+ txBindingProto.SetTxIndex(txIndex);
+ txBindingProto.SetResultIndex(resultIndex);
}
return true;
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 5db75604d8..011dfbac23 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -86,6 +86,18 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
])", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST(ExecuteQueryScalar) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto result = db.ExecuteQuery(R"(
+ SELECT COUNT(*) FROM EightShard;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
Y_UNIT_TEST(ExecuteScript) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();