diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-10-14 12:13:59 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-10-14 12:29:41 +0300 |
commit | ecb10029a72bb21f92858ff3c0d76c8f255cf4ba (patch) | |
tree | 8dd17d82f0545180ef8a5da41e678bbdf76e21a1 | |
parent | 4042ff031c8d4338d6da58c9cdc79ff06485becd (diff) | |
download | ydb-ecb10029a72bb21f92858ff3c0d76c8f255cf4ba.tar.gz |
Fix crash in case of two selects in one query
6 files changed, 76 insertions, 63 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 017049d602..ed03c03037 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -280,6 +280,52 @@ struct TKiExploreTxResults { : HasExecute(false) {} }; +bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize = true) { + if (node.Ref().ChildrenSize() <= 1) { + return false; + } + + TExprBase providerArg(node.Ref().Child(1)); + if (auto maybeDataSource = providerArg.Maybe<TCoDataSource>()) { + TStringBuf dataSourceCategory = maybeDataSource.Cast().Category(); + auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory); + if (dataSourceProviderIt != types.DataSourceMap.end()) { + if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) { + if (dqIntegration->CanRead(*node.Ptr(), ctx) && + (!estimateReadSize || dqIntegration->EstimateReadSize( + TDqSettings::TDefault::DataSizePerJob, + TDqSettings::TDefault::MaxTasksPerStage, + {node.Raw()}, + ctx))) { + return true; + } + } + } + } + return false; +} + +bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) { + if (node.Ref().ChildrenSize() <= 1) { + return false; + } + + TExprBase providerArg(node.Ref().Child(1)); + if (auto maybeDataSink = providerArg.Maybe<TCoDataSink>()) { + TStringBuf dataSinkCategory = maybeDataSink.Cast().Category(); + auto dataSinkProviderIt = types.DataSinkMap.find(dataSinkCategory); + if (dataSinkProviderIt != types.DataSinkMap.end()) { + if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) { + if (auto canWrite = dqIntegration->CanWrite(*node.Ptr(), ctx)) { + YQL_ENSURE(*canWrite, "Errors handling write"); + return true; + } + } + } + } + return false; +} + bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes, TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types) { @@ -329,34 +375,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T return result; } - if (node.Ref().ChildrenSize() > 1) { - TExprBase dataSourceArg(node.Ref().Child(1)); - if (auto maybeDataSource = dataSourceArg.Maybe<TCoDataSource>()) { - TStringBuf dataSourceCategory = maybeDataSource.Cast().Category(); - auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory); - if (dataSourceProviderIt != types.DataSourceMap.end()) { - if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) { - if (dqIntegration->CanRead(*node.Ptr(), ctx) - && dqIntegration->EstimateReadSize( - TDqSettings::TDefault::DataSizePerJob, - TDqSettings::TDefault::MaxTasksPerStage, - {node.Raw()}, - ctx)) - { - txRes.Ops.insert(node.Raw()); - for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) { - if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) { - auto* typeAnn = child->GetTypeAnn(); - if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) { - return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types); - } - } - } - YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child"); - } - } - } - } + if (IsDqRead(node, ctx, types)) { + txRes.Ops.insert(node.Raw()); + TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0); + return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types); } if (auto maybeWrite = node.Maybe<TKiWriteTable>()) { @@ -391,30 +413,11 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T return result; } - if (node.Ref().ChildrenSize() > 1) { - TExprBase dataSinkArg(node.Ref().Child(1)); - if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) { - TStringBuf dataSinkCategory = maybeDataSink.Cast().Category(); - auto dataSinkProviderIt = types.DataSinkMap.find(dataSinkCategory); - if (dataSinkProviderIt != types.DataSinkMap.end()) { - if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) { - if (auto canWrite = dqIntegration->CanWrite(node.Ref(), ctx)) { - YQL_ENSURE(*canWrite, "Errors handling write"); - txRes.Ops.insert(node.Raw()); - txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{}); - for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) { - if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) { - auto* typeAnn = child->GetTypeAnn(); - if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) { - return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types); - } - } - } - YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child"); - } - } - } - } + if (IsDqWrite(node, ctx, types)) { + txRes.Ops.insert(node.Raw()); + txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{}); + TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0); + return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types); } if (auto maybeUpdate = node.Maybe<TKiUpdateTable>()) { @@ -689,7 +692,7 @@ TExprNode::TPtr MakeSchemeTx(TCoCommit commit, TExprContext& ctx) { .Ptr(); } -TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExploreTxResults& txExplore, TExprContext& ctx) { +TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExploreTxResults& txExplore, TExprContext& ctx, TTypeAnnotationContext& types) { TVector<TKiDataQueryBlock> queryBlocks; queryBlocks.reserve(txExplore.QueryBlocks.size()); @@ -731,7 +734,7 @@ TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExplor TOptimizeExprSettings optSettings(nullptr); optSettings.VisitChanges = true; auto status = OptimizeExpr(queryBlock.Ptr(), optResult, - [world, &txSyncSet](const TExprNode::TPtr& input, TExprContext &ctx) { + [world, &txSyncSet, &types](const TExprNode::TPtr& input, TExprContext& ctx) { auto node = TExprBase(input); if (txSyncSet.contains(node.Raw())) { @@ -741,7 +744,9 @@ TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExplor if (node.Maybe<TKiReadTable>() || node.Maybe<TKiWriteTable>() || node.Maybe<TKiUpdateTable>() || - node.Maybe<TKiDeleteTable>()) + node.Maybe<TKiDeleteTable>() || + IsDqRead(node, ctx, types, false) || + IsDqWrite(node, ctx, types)) { return ctx.ChangeChild(node.Ref(), 0, world.Ptr()); } @@ -800,7 +805,7 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK return MakeSchemeTx(commit, ctx); } - auto dataQueryBlocks = MakeKiDataQueryBlocks(commit.World(), txExplore, ctx); + auto dataQueryBlocks = MakeKiDataQueryBlocks(commit.World(), txExplore, ctx, types); TKiExecDataQuerySettings execSettings; if (settings.Mode) { diff --git a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp index 910194df9f..dce8a7b0c1 100644 --- a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp @@ -201,6 +201,7 @@ namespace NKikimr::NKqp { const TString query = fmt::format( R"( SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`; + SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`; )", "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, "database_name"_a = DEFAULT_DATABASE, @@ -210,13 +211,16 @@ namespace NKikimr::NKqp { auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString()); - TResultSetParser resultSet(queryResult.GetResultSetParser(0)); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_COUNT); - - // check every row std::vector<i32> constants(ROWS_COUNT, 42); - MATCH_RESULT_WITH_INPUT(constants, resultSet, GetInt32); + + for (size_t i = 0; i < 2; ++i) { + TResultSetParser resultSet(queryResult.GetResultSetParser(i)); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_COUNT); + + // check every row + MATCH_RESULT_WITH_INPUT(constants, resultSet, GetInt32); + } } Y_UNIT_TEST(PostgreSQLSelectConstant) { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index a73e527176..30a10b7895 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -41,6 +41,7 @@ namespace NYql { TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) { const auto genReadTable = maybeGenReadTable.Cast(); + YQL_ENSURE(genReadTable.Ref().GetTypeAnn(), "No type annotation for node " << genReadTable.Ref().Content()); const auto token = TString("cluster:default_") += genReadTable.DataSource().Cluster().StringValue(); const auto rowType = genReadTable.Ref() .GetTypeAnn() diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index e47fd3b2b8..f8b58da698 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -72,6 +72,7 @@ public: TExprNode::TPtr WrapRead(const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override { if (const auto& maybePqReadTopic = TMaybeNode<TPqReadTopic>(read)) { const auto& pqReadTopic = maybePqReadTopic.Cast(); + YQL_ENSURE(pqReadTopic.Ref().GetTypeAnn(), "No type annotation for node " << pqReadTopic.Ref().Content()); const auto rowType = pqReadTopic.Ref().GetTypeAnn() ->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>() diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index a008d96d1a..03b76d30b8 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -163,6 +163,7 @@ public: TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { if (const auto& maybeS3ReadObject = TMaybeNode<TS3ReadObject>(read)) { const auto& s3ReadObject = maybeS3ReadObject.Cast(); + YQL_ENSURE(s3ReadObject.Ref().GetTypeAnn(), "No type annotation for node " << s3ReadObject.Ref().Content()); const auto rowType = s3ReadObject.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()->GetItemType(); const auto& clusterName = s3ReadObject.DataSource().Cluster().StringValue(); diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp index cea1b0aaac..ab320337bf 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp @@ -72,7 +72,7 @@ public: bool CanRead(const TExprNode& read, TExprContext&, bool ) override { return TYdbReadTable::Match(&read); } - + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override { if (AllOf(read, [](const auto val) { return TYdbReadTable::Match(val); })) { return 0ul; // TODO: return real size @@ -83,6 +83,7 @@ public: TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { if (const auto& maybeYdbReadTable = TMaybeNode<TYdbReadTable>(read)) { const auto& ydbReadTable = maybeYdbReadTable.Cast(); + YQL_ENSURE(ydbReadTable.Ref().GetTypeAnn(), "No type annotation for node " << ydbReadTable.Ref().Content()); const auto& clusterName = ydbReadTable.DataSource().Cluster().Value(); const auto token = "cluster:default_" + TString(clusterName); YQL_CLOG(INFO, ProviderYdb) << "Wrap " << read->Content() << " with token: " << token; |