aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-10-14 12:13:59 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-10-14 12:29:41 +0300
commitecb10029a72bb21f92858ff3c0d76c8f255cf4ba (patch)
tree8dd17d82f0545180ef8a5da41e678bbdf76e21a1
parent4042ff031c8d4338d6da58c9cdc79ff06485becd (diff)
downloadydb-ecb10029a72bb21f92858ff3c0d76c8f255cf4ba.tar.gz
Fix crash in case of two selects in one query
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp117
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp16
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp1
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp1
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp3
6 files changed, 76 insertions, 63 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 017049d602..ed03c03037 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -280,6 +280,52 @@ struct TKiExploreTxResults {
: HasExecute(false) {}
};
+bool IsDqRead(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types, bool estimateReadSize = true) {
+ if (node.Ref().ChildrenSize() <= 1) {
+ return false;
+ }
+
+ TExprBase providerArg(node.Ref().Child(1));
+ if (auto maybeDataSource = providerArg.Maybe<TCoDataSource>()) {
+ TStringBuf dataSourceCategory = maybeDataSource.Cast().Category();
+ auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory);
+ if (dataSourceProviderIt != types.DataSourceMap.end()) {
+ if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) {
+ if (dqIntegration->CanRead(*node.Ptr(), ctx) &&
+ (!estimateReadSize || dqIntegration->EstimateReadSize(
+ TDqSettings::TDefault::DataSizePerJob,
+ TDqSettings::TDefault::MaxTasksPerStage,
+ {node.Raw()},
+ ctx))) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool IsDqWrite(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& types) {
+ if (node.Ref().ChildrenSize() <= 1) {
+ return false;
+ }
+
+ TExprBase providerArg(node.Ref().Child(1));
+ if (auto maybeDataSink = providerArg.Maybe<TCoDataSink>()) {
+ TStringBuf dataSinkCategory = maybeDataSink.Cast().Category();
+ auto dataSinkProviderIt = types.DataSinkMap.find(dataSinkCategory);
+ if (dataSinkProviderIt != types.DataSinkMap.end()) {
+ if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) {
+ if (auto canWrite = dqIntegration->CanWrite(*node.Ptr(), ctx)) {
+ YQL_ENSURE(*canWrite, "Errors handling write");
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
TIntrusivePtr<TKikimrTablesData> tablesData, TTypeAnnotationContext& types) {
@@ -329,34 +375,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return result;
}
- if (node.Ref().ChildrenSize() > 1) {
- TExprBase dataSourceArg(node.Ref().Child(1));
- if (auto maybeDataSource = dataSourceArg.Maybe<TCoDataSource>()) {
- TStringBuf dataSourceCategory = maybeDataSource.Cast().Category();
- auto dataSourceProviderIt = types.DataSourceMap.find(dataSourceCategory);
- if (dataSourceProviderIt != types.DataSourceMap.end()) {
- if (auto* dqIntegration = dataSourceProviderIt->second->GetDqIntegration()) {
- if (dqIntegration->CanRead(*node.Ptr(), ctx)
- && dqIntegration->EstimateReadSize(
- TDqSettings::TDefault::DataSizePerJob,
- TDqSettings::TDefault::MaxTasksPerStage,
- {node.Raw()},
- ctx))
- {
- txRes.Ops.insert(node.Raw());
- for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) {
- if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) {
- auto* typeAnn = child->GetTypeAnn();
- if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) {
- return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types);
- }
- }
- }
- YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child");
- }
- }
- }
- }
+ if (IsDqRead(node, ctx, types)) {
+ txRes.Ops.insert(node.Raw());
+ TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
+ return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
}
if (auto maybeWrite = node.Maybe<TKiWriteTable>()) {
@@ -391,30 +413,11 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return result;
}
- if (node.Ref().ChildrenSize() > 1) {
- TExprBase dataSinkArg(node.Ref().Child(1));
- if (auto maybeDataSink = dataSinkArg.Maybe<TCoDataSink>()) {
- TStringBuf dataSinkCategory = maybeDataSink.Cast().Category();
- auto dataSinkProviderIt = types.DataSinkMap.find(dataSinkCategory);
- if (dataSinkProviderIt != types.DataSinkMap.end()) {
- if (auto* dqIntegration = dataSinkProviderIt->second->GetDqIntegration()) {
- if (auto canWrite = dqIntegration->CanWrite(node.Ref(), ctx)) {
- YQL_ENSURE(*canWrite, "Errors handling write");
- txRes.Ops.insert(node.Raw());
- txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{});
- for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) {
- if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) {
- auto* typeAnn = child->GetTypeAnn();
- if (typeAnn && typeAnn->GetKind() == ETypeAnnotationKind::World) {
- return ExploreTx(TExprBase(child), ctx, dataSink, txRes, tablesData, types);
- }
- }
- }
- YQL_ENSURE(false, "Node \"" << node.Ref().Content() << "\" is expected to contain world child");
- }
- }
- }
- }
+ if (IsDqWrite(node, ctx, types)) {
+ txRes.Ops.insert(node.Raw());
+ txRes.AddEffect(node, THashMap<TString, TPrimitiveYdbOperations>{});
+ TExprNode::TPtr worldChild = node.Raw()->ChildPtr(0);
+ return ExploreTx(TExprBase(worldChild), ctx, dataSink, txRes, tablesData, types);
}
if (auto maybeUpdate = node.Maybe<TKiUpdateTable>()) {
@@ -689,7 +692,7 @@ TExprNode::TPtr MakeSchemeTx(TCoCommit commit, TExprContext& ctx) {
.Ptr();
}
-TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExploreTxResults& txExplore, TExprContext& ctx) {
+TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExploreTxResults& txExplore, TExprContext& ctx, TTypeAnnotationContext& types) {
TVector<TKiDataQueryBlock> queryBlocks;
queryBlocks.reserve(txExplore.QueryBlocks.size());
@@ -731,7 +734,7 @@ TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExplor
TOptimizeExprSettings optSettings(nullptr);
optSettings.VisitChanges = true;
auto status = OptimizeExpr(queryBlock.Ptr(), optResult,
- [world, &txSyncSet](const TExprNode::TPtr& input, TExprContext &ctx) {
+ [world, &txSyncSet, &types](const TExprNode::TPtr& input, TExprContext& ctx) {
auto node = TExprBase(input);
if (txSyncSet.contains(node.Raw())) {
@@ -741,7 +744,9 @@ TVector<TKiDataQueryBlock> MakeKiDataQueryBlocks(TExprBase node, const TKiExplor
if (node.Maybe<TKiReadTable>() ||
node.Maybe<TKiWriteTable>() ||
node.Maybe<TKiUpdateTable>() ||
- node.Maybe<TKiDeleteTable>())
+ node.Maybe<TKiDeleteTable>() ||
+ IsDqRead(node, ctx, types, false) ||
+ IsDqWrite(node, ctx, types))
{
return ctx.ChangeChild(node.Ref(), 0, world.Ptr());
}
@@ -800,7 +805,7 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
return MakeSchemeTx(commit, ctx);
}
- auto dataQueryBlocks = MakeKiDataQueryBlocks(commit.World(), txExplore, ctx);
+ auto dataQueryBlocks = MakeKiDataQueryBlocks(commit.World(), txExplore, ctx, types);
TKiExecDataQuerySettings execSettings;
if (settings.Mode) {
diff --git a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
index 910194df9f..dce8a7b0c1 100644
--- a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
@@ -201,6 +201,7 @@ namespace NKikimr::NKqp {
const TString query = fmt::format(
R"(
SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
+ SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
)",
"data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
"database_name"_a = DEFAULT_DATABASE,
@@ -210,13 +211,16 @@ namespace NKikimr::NKqp {
auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());
- TResultSetParser resultSet(queryResult.GetResultSetParser(0));
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_COUNT);
-
- // check every row
std::vector<i32> constants(ROWS_COUNT, 42);
- MATCH_RESULT_WITH_INPUT(constants, resultSet, GetInt32);
+
+ for (size_t i = 0; i < 2; ++i) {
+ TResultSetParser resultSet(queryResult.GetResultSetParser(i));
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_COUNT);
+
+ // check every row
+ MATCH_RESULT_WITH_INPUT(constants, resultSet, GetInt32);
+ }
}
Y_UNIT_TEST(PostgreSQLSelectConstant) {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index a73e527176..30a10b7895 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -41,6 +41,7 @@ namespace NYql {
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto maybeGenReadTable = TMaybeNode<TGenReadTable>(read)) {
const auto genReadTable = maybeGenReadTable.Cast();
+ YQL_ENSURE(genReadTable.Ref().GetTypeAnn(), "No type annotation for node " << genReadTable.Ref().Content());
const auto token = TString("cluster:default_") += genReadTable.DataSource().Cluster().StringValue();
const auto rowType = genReadTable.Ref()
.GetTypeAnn()
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
index e47fd3b2b8..f8b58da698 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
@@ -72,6 +72,7 @@ public:
TExprNode::TPtr WrapRead(const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto& maybePqReadTopic = TMaybeNode<TPqReadTopic>(read)) {
const auto& pqReadTopic = maybePqReadTopic.Cast();
+ YQL_ENSURE(pqReadTopic.Ref().GetTypeAnn(), "No type annotation for node " << pqReadTopic.Ref().Content());
const auto rowType = pqReadTopic.Ref().GetTypeAnn()
->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index a008d96d1a..03b76d30b8 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -163,6 +163,7 @@ public:
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto& maybeS3ReadObject = TMaybeNode<TS3ReadObject>(read)) {
const auto& s3ReadObject = maybeS3ReadObject.Cast();
+ YQL_ENSURE(s3ReadObject.Ref().GetTypeAnn(), "No type annotation for node " << s3ReadObject.Ref().Content());
const auto rowType = s3ReadObject.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back()->Cast<TListExprType>()->GetItemType();
const auto& clusterName = s3ReadObject.DataSource().Cluster().StringValue();
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
index cea1b0aaac..ab320337bf 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
@@ -72,7 +72,7 @@ public:
bool CanRead(const TExprNode& read, TExprContext&, bool ) override {
return TYdbReadTable::Match(&read);
}
-
+
TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>& read, TExprContext&) override {
if (AllOf(read, [](const auto val) { return TYdbReadTable::Match(val); })) {
return 0ul; // TODO: return real size
@@ -83,6 +83,7 @@ public:
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto& maybeYdbReadTable = TMaybeNode<TYdbReadTable>(read)) {
const auto& ydbReadTable = maybeYdbReadTable.Cast();
+ YQL_ENSURE(ydbReadTable.Ref().GetTypeAnn(), "No type annotation for node " << ydbReadTable.Ref().Content());
const auto& clusterName = ydbReadTable.DataSource().Cluster().Value();
const auto token = "cluster:default_" + TString(clusterName);
YQL_CLOG(INFO, ProviderYdb) << "Wrap " << read->Content() << " with token: " << token;