diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-10-24 17:49:51 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-10-24 18:56:05 +0300 |
commit | 1ed4742ff7b185c61308ebfa67df74425bec4016 (patch) | |
tree | a60787bb6f25591fe092d2bedc81aaf3274ec60f | |
parent | 670e2deb259d1316f5faa7add5888e8ae4a079aa (diff) | |
download | ydb-1ed4742ff7b185c61308ebfa67df74425bec4016.tar.gz |
YQ Connector:Pushdown of simple filters. First version with support of several predicates/operations
46 files changed, 1227 insertions, 185 deletions
diff --git a/.mapping.json b/.mapping.json index b0c5ba1797..8ea27f5354 100644 --- a/.mapping.json +++ b/.mapping.json @@ -7426,6 +7426,11 @@ "ydb/library/yql/providers/common/provider/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/providers/common/provider/CMakeLists.txt":"", "ydb/library/yql/providers/common/provider/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/providers/common/pushdown/CMakeLists.txt":"", + "ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/providers/common/schema/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/providers/common/schema/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/providers/common/schema/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt index 852ad9e1d5..f78e4a6016 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt @@ -19,12 +19,12 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt index 7204a81d07..cdcab0fa74 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt @@ -20,12 +20,12 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt index 7204a81d07..cdcab0fa74 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt @@ -20,12 +20,12 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp diff --git a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt index 852ad9e1d5..f78e4a6016 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt @@ -19,12 +19,12 @@ target_link_libraries(kqp-opt-physical PUBLIC opt-physical-effects yql-dq-common yql-dq-opt + providers-common-pushdown ) target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp index 57090bfc1e..fc6bb36a28 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp @@ -1,10 +1,11 @@ #include "kqp_opt_phy_rules.h" -#include "kqp_opt_phy_olap_filter_collection.h" #include <ydb/core/formats/arrow/ssa_runtime_version.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/library/yql/core/extract_predicate/extract_predicate.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/providers/common/pushdown/collection.h> +#include <ydb/library/yql/providers/common/pushdown/predicate_node.h> #include <unordered_set> @@ -23,6 +24,15 @@ static const std::unordered_set<std::string> SecondLevelFilters = { "ends_with" }; +struct TPushdownSettings : public NPushdown::TSettings { + TPushdownSettings() { + using EFlag = NPushdown::TSettings::EFeatureFlag; + Enable(EFlag::LikeOperator, NSsa::RuntimeVersion >= 2U); + Enable(EFlag::LikeOperatorOnlyForUtf8, NSsa::RuntimeVersion < 3U); + Enable(EFlag::JsonQueryOperators | EFlag::JsonExistsOperator, NSsa::RuntimeVersion >= 3U); + } +}; + struct TFilterOpsLevels { TFilterOpsLevels(const TMaybeNode<TExprBase>& firstLevel, const TMaybeNode<TExprBase>& secondLevel) : FirstLevelOps(firstLevel) @@ -511,7 +521,7 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, TExprContext& ctx return TFilterOpsLevels(ops, NullNode); } -void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode& predicatesToPush, TPredicateNode& remainingPredicates, +void SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, NPushdown::TPredicateNode& predicatesToPush, NPushdown::TPredicateNode& remainingPredicates, TExprContext& ctx, TPositionHandle pos) { if (predicateTree.CanBePushed) { @@ -520,7 +530,7 @@ void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode return; } - if (predicateTree.Op != EBoolOp::And) { + if (predicateTree.Op != NPushdown::EBoolOp::And) { // We can partially pushdown predicates from AND operator only. // For OR operator we would need to have several read operators which is not acceptable. // TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2). @@ -529,8 +539,8 @@ void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode } bool isFoundNotStrictOp = false; - std::vector<TPredicateNode> pushable; - std::vector<TPredicateNode> remaining; + std::vector<NPushdown::TPredicateNode> pushable; + std::vector<NPushdown::TPredicateNode> remaining; for (auto& predicate : predicateTree.Children) { if (predicate.CanBePushed && !isFoundNotStrictOp) { pushable.emplace_back(predicate); @@ -578,12 +588,12 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz } auto optionalIf = maybeOptionalIf.Cast(); - TPredicateNode predicateTree(optionalIf.Predicate()); - CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg, read.Process().Body()); + NPushdown::TPredicateNode predicateTree(optionalIf.Predicate()); + CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg, read.Process().Body(), TPushdownSettings()); YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid"); - TPredicateNode predicatesToPush; - TPredicateNode remainingPredicates; + NPushdown::TPredicateNode predicatesToPush; + NPushdown::TPredicateNode remainingPredicates; SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos()); if (!predicatesToPush.IsValid()) { return node; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h deleted file mode 100644 index cfec72ac53..0000000000 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h +++ /dev/null @@ -1,55 +0,0 @@ -#pragma once - -#include <ydb/core/kqp/common/kqp_yql.h> - -namespace NKikimr::NKqp::NOpt { - -enum class EBoolOp { - Undefined = 0, - And, - Or, - Xor, - Not -}; - -struct TPredicateNode { - TPredicateNode() - : ExprNode(nullptr) - , Op(EBoolOp::Undefined) - , CanBePushed(false) - {} - - TPredicateNode(NYql::TExprNode::TPtr nodePtr) - : ExprNode(nodePtr) - , Op(EBoolOp::Undefined) - , CanBePushed(false) - {} - - TPredicateNode(NYql::NNodes::TExprBase node) - : ExprNode(node) - , Op(EBoolOp::Undefined) - , CanBePushed(false) - {} - - TPredicateNode(const TPredicateNode& predNode) - : ExprNode(predNode.ExprNode) - , Children(predNode.Children) - , Op(predNode.Op) - , CanBePushed(predNode.CanBePushed) - {} - - ~TPredicateNode() {} - - bool IsValid() const; - void SetPredicates(const std::vector<TPredicateNode>& predicates, NYql::TExprContext& ctx, NYql::TPositionHandle pos); - - NYql::NNodes::TMaybeNode<NYql::NNodes::TExprBase> ExprNode; - std::vector<TPredicateNode> Children; - EBoolOp Op; - bool CanBePushed; -}; - -void CollectPredicates(const NYql::NNodes::TExprBase& predicate, TPredicateNode& predicateTree, - const NYql::TExprNode* lambdaArg, const NYql::NNodes::TExprBase& lambdaBody); - -} // namespace NKikimr::NKqp::NOpt
\ No newline at end of file diff --git a/ydb/core/kqp/opt/physical/ya.make b/ydb/core/kqp/opt/physical/ya.make index ad82055d6e..07b0cdf0e0 100644 --- a/ydb/core/kqp/opt/physical/ya.make +++ b/ydb/core/kqp/opt/physical/ya.make @@ -4,7 +4,6 @@ SRCS( kqp_opt_phy_build_stage.cpp kqp_opt_phy_limit.cpp kqp_opt_phy_olap_agg.cpp - kqp_opt_phy_olap_filter_collection.cpp kqp_opt_phy_olap_filter.cpp kqp_opt_phy_precompute.cpp kqp_opt_phy_sort.cpp @@ -19,6 +18,7 @@ PEERDIR( ydb/core/kqp/opt/physical/effects ydb/library/yql/dq/common ydb/library/yql/dq/opt + ydb/library/yql/providers/common/pushdown ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp index 9029a7477d..54ac7e9335 100644 --- a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp @@ -114,10 +114,9 @@ namespace NKikimr::NKqp { const TString query = fmt::format( R"( - SELECT * FROM {data_source_name}.`{database_name}.{table_name}`; + SELECT * FROM {data_source_name}.{table_name}; )", "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, - "database_name"_a = DEFAULT_DATABASE, "table_name"_a = DEFAULT_TABLE); auto db = kikimr->GetQueryClient(); @@ -206,11 +205,10 @@ namespace NKikimr::NKqp { const TString query = fmt::format( R"( - SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`; - SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`; + SELECT 42 FROM {data_source_name}.{table_name}; + SELECT 42 FROM {data_source_name}.{table_name}; )", "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, - "database_name"_a = DEFAULT_DATABASE, "table_name"_a = DEFAULT_TABLE); auto db = kikimr->GetQueryClient(); @@ -297,10 +295,9 @@ namespace NKikimr::NKqp { const TString query = fmt::format( R"( - SELECT COUNT(*) FROM {data_source_name}.`{database_name}.{table_name}`; + SELECT COUNT(*) FROM {data_source_name}.{table_name}; )", "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, - "database_name"_a = DEFAULT_DATABASE, "table_name"_a = DEFAULT_TABLE); auto db = kikimr->GetQueryClient(); @@ -323,5 +320,109 @@ namespace NKikimr::NKqp { Y_UNIT_TEST(ClickHouseSelectCount) { TestSelectCount(EProviderType::ClickHouse); } + + void TestFilterPushdown(EProviderType providerType) { + // prepare mock + auto clientMock = std::make_shared<TConnectorClientMock>(); + + const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); + // clang-format off + const NApi::TSelect select = TConnectorClientMock::TSelectBuilder<>() + .DataSourceInstance(dataSourceInstance) + .What() + .NullableColumn("data_column", Ydb::Type::STRING) + .NullableColumn("filtered_column", Ydb::Type::INT32) + .Done() + .Where() + .Filter() + .Equal() + .Column("filtered_column") + .Value<i32>(42) + .Done() + .Done() + .Done() + .GetResult(); + // clang-format on + + // step 1: DescribeTable + // clang-format off + clientMock->ExpectDescribeTable() + .DataSourceInstance(dataSourceInstance) + .Response() + .NullableColumn("filtered_column", Ydb::Type::INT32) + .NullableColumn("data_column", Ydb::Type::STRING); + // clang-format on + + // step 2: ListSplits + // clang-format off + clientMock->ExpectListSplits() + .Select(select) + .Result() + .AddResponse(NewSuccess()) + .Description("some binary description") + .Select(select); + // clang-format on + + // step 3: ReadSplits + // Return data such that it contains values not satisfying the filter conditions. + // Then check that, despite that connector reads additional data, + // our generic provider then filters it out. + std::vector<std::string> colData = {"Filtered text", "Text"}; + std::vector<i32> filterColumnData = {42, 24}; + // clang-format off + clientMock->ExpectReadSplits() + .DataSourceInstance(dataSourceInstance) + .Split() + .Description("some binary description") + .Select(select) + .Done() + .Result() + .AddResponse(MakeRecordBatch( + MakeArray<arrow::StringBuilder>("data_column", colData, arrow::utf8()), + MakeArray<arrow::Int32Builder>("filtered_column", filterColumnData, arrow::int32())), + NewSuccess()); + // clang-format on + + // prepare database resolver mock + std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock; + if (providerType == EProviderType::ClickHouse) { + databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>(); + databaseAsyncResolverMock->AddClickHouseCluster(); + } + + // run test + auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock); + + CreateExternalDataSource(providerType, kikimr); + + const TString query = fmt::format( + R"( + PRAGMA generic.UsePredicatePushdown="true"; + SELECT data_column FROM {data_source_name}.{table_name} WHERE filtered_column = 42; + )", + "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, + "table_name"_a = DEFAULT_TABLE); + + auto db = kikimr->GetQueryClient(); + auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString()); + + TResultSetParser resultSet(queryResult.GetResultSetParser(0)); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + + // check every row + // Check that, despite returning nonfiltered data in connector, response will be correct + std::vector<TMaybe<TString>> result = {"Filtered text"}; // Only data satisfying filter conditions + MATCH_RESULT_WITH_INPUT(result, resultSet, GetOptionalString); + } + + Y_UNIT_TEST(PostgreSQLFilterPushdown) { + TestFilterPushdown(EProviderType::PostgreSQL); + } + + Y_UNIT_TEST(ClickHouseFilterPushdown) { + TestFilterPushdown(EProviderType::ClickHouse); + } } } diff --git a/ydb/library/yql/providers/common/CMakeLists.txt b/ydb/library/yql/providers/common/CMakeLists.txt index b227fb9348..298141a06a 100644 --- a/ydb/library/yql/providers/common/CMakeLists.txt +++ b/ydb/library/yql/providers/common/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(metrics) add_subdirectory(mkql) add_subdirectory(proto) add_subdirectory(provider) +add_subdirectory(pushdown) add_subdirectory(schema) add_subdirectory(structured_token) add_subdirectory(token_accessor) diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index a785aeae29..44294909f4 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -610,6 +610,8 @@ message TGenericGatewayConfig { // MDB API endpoint (do not fill in case of on-prem deployment) optional string MdbGateway = 4; + repeated TAttr DefaultSettings = 6; + reserved 1, 2; } diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..e1a6b69143 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-common-pushdown) +target_compile_options(providers-common-pushdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-pushdown PUBLIC + contrib-libs-cxxsupp + yutil + library-yql-ast + library-yql-core + yql-core-expr_nodes + yql-core-expr_nodes_gen + yql-utils-log +) +target_sources(providers-common-pushdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp +) diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..f1022891b5 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-common-pushdown) +target_compile_options(providers-common-pushdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-pushdown PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-ast + library-yql-core + yql-core-expr_nodes + yql-core-expr_nodes_gen + yql-utils-log +) +target_sources(providers-common-pushdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp +) diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..f1022891b5 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-common-pushdown) +target_compile_options(providers-common-pushdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-pushdown PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-yql-ast + library-yql-core + yql-core-expr_nodes + yql-core-expr_nodes_gen + yql-utils-log +) +target_sources(providers-common-pushdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp +) diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..e1a6b69143 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-common-pushdown) +target_compile_options(providers-common-pushdown PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-pushdown PUBLIC + contrib-libs-cxxsupp + yutil + library-yql-ast + library-yql-core + yql-core-expr_nodes + yql-core-expr_nodes_gen + yql-utils-log +) +target_sources(providers-common-pushdown PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp +) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp b/ydb/library/yql/providers/common/pushdown/collection.cpp index 826afbe7d4..253ff2cc32 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp +++ b/ydb/library/yql/providers/common/pushdown/collection.cpp @@ -1,15 +1,13 @@ -#include "kqp_opt_phy_olap_filter_collection.h" +#include "collection.h" -#include <ydb/core/formats/arrow/ssa_runtime_version.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/library/yql/utils/log/log.h> #include <vector> -namespace NKikimr::NKqp::NOpt { +namespace NYql::NPushdown { -using namespace NYql; -using namespace NYql::NNodes; +using namespace NNodes; namespace { @@ -32,7 +30,7 @@ bool IsLikeOperator(const TCoCompare& predicate) { || predicate.Maybe<TCoCmpEndsWith>(); } -bool IsSupportedLike(const TExprBase& left, const TExprBase& right) { +bool IsSupportedLikeForUtf8(const TExprBase& left, const TExprBase& right) { if ((left.Maybe<TCoMember>() && ExprHasUtf8Type(left)) || (right.Maybe<TCoMember>() && ExprHasUtf8Type(right))) { @@ -41,7 +39,7 @@ bool IsSupportedLike(const TExprBase& left, const TExprBase& right) { return false; } -bool IsSupportedPredicate(const TCoCompare& predicate) { +bool IsSupportedPredicate(const TCoCompare& predicate, const TSettings& settings) { if (predicate.Maybe<TCoCmpEqual>()) { return true; } else if (predicate.Maybe<TCoCmpLess>()) { @@ -56,7 +54,7 @@ bool IsSupportedPredicate(const TCoCompare& predicate) { return true; } else if (predicate.Maybe<TCoCmpLessOrEqual>()) { return true; - } else if (NKikimr::NSsa::RuntimeVersion >= 2U && IsLikeOperator(predicate)) { + } else if (settings.IsEnabled(TSettings::EFeatureFlag::LikeOperator) && IsLikeOperator(predicate)) { return true; } @@ -241,7 +239,7 @@ std::vector<TExprBase> GetComparisonNodes(const TExprBase& node) { return res; } -bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lambdaArg) { +bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lambdaArg, const TSettings& settings) { if (auto maybeSafeCast = node.Maybe<TCoSafeCast>()) { if (!IsSupportedCast(maybeSafeCast.Cast())) { return false; @@ -254,7 +252,7 @@ bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lamb if (maybeMember.Cast().Struct().Raw() != lambdaArg) { return false; } - } else if (NKikimr::NSsa::RuntimeVersion >= 3U && node.Maybe<TCoJsonQueryBase>()) { + } else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonQueryOperators) && node.Maybe<TCoJsonQueryBase>()) { if (!node.Maybe<TCoJsonValue>()) { return false; } @@ -270,7 +268,7 @@ bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lamb return true; } -bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& input) { +bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& input, const TSettings& settings) { const TTypeAnnotationNode* inputType = input.Ptr()->GetTypeAnn(); switch (inputType->GetKind()) { case ETypeAnnotationKind::Flow: @@ -279,6 +277,8 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr case ETypeAnnotationKind::Stream: inputType = inputType->Cast<TStreamExprType>()->GetItemType(); break; + case ETypeAnnotationKind::Struct: + break; default: YQL_ENSURE(false, "Unsupported type of incoming data: " << (ui32)inputType->GetKind()); // We do not know how process input that is not a sequence of elements @@ -297,14 +297,14 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr YQL_ENSURE(leftList.size() == rightList.size(), "Different sizes of lists in comparison!"); for (size_t i = 0; i < leftList.size(); ++i) { - if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg)) { + if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg, settings)) { return false; } if (!IsComparableTypes(leftList[i], rightList[i], equality, inputType)) { return false; } - if (IsLikeOperator(compare) && NKikimr::NSsa::RuntimeVersion < 3U && !IsSupportedLike(leftList[i], rightList[i])) { - // In SSA_RUNTIME_VERSION == 2 Column Shard doesn't have LIKE kernel for binary strings + if (IsLikeOperator(compare) && settings.IsEnabled(TSettings::EFeatureFlag::LikeOperatorOnlyForUtf8) && !IsSupportedLikeForUtf8(leftList[i], rightList[i])) { + // (KQP OLAP) If SSA_RUNTIME_VERSION == 2 Column Shard doesn't have LIKE kernel for binary strings return false; } } @@ -312,19 +312,19 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr return true; } -bool CompareCanBePushed(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& lambdaBody) { - if (!IsSupportedPredicate(compare)) { +bool CompareCanBePushed(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) { + if (!IsSupportedPredicate(compare, settings)) { return false; } - if (!CheckComparisonParametersForPushdown(compare, lambdaArg, lambdaBody)) { + if (!CheckComparisonParametersForPushdown(compare, lambdaArg, lambdaBody, settings)) { return false; } return true; } -bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg) { +bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg, const TSettings& settings) { /* * There are three ways of comparison in following format: * @@ -345,7 +345,7 @@ bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg) YQL_ENSURE(leftList.size() == rightList.size(), "Different sizes of lists in comparison!"); for (size_t i = 0; i < leftList.size(); ++i) { - if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg)) { + if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg, settings)) { return false; } } @@ -361,7 +361,7 @@ bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg) } auto predicate = maybePredicate.Cast(); - if (!IsSupportedPredicate(predicate)) { + if (!IsSupportedPredicate(predicate, settings)) { return false; } @@ -380,17 +380,17 @@ bool JsonExistsCanBePushed(const TCoJsonExists& jsonExists, const TExprNode* lam return true; } -bool CoalesceCanBePushed(const TCoCoalesce& coalesce, const TExprNode* lambdaArg, const TExprBase& lambdaBody) { +bool CoalesceCanBePushed(const TCoCoalesce& coalesce, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) { if (!coalesce.Value().Maybe<TCoBool>()) { return false; } auto predicate = coalesce.Predicate(); if (auto maybeCompare = predicate.Maybe<TCoCompare>()) { - return CompareCanBePushed(maybeCompare.Cast(), lambdaArg, lambdaBody); + return CompareCanBePushed(maybeCompare.Cast(), lambdaArg, lambdaBody, settings); } else if (auto maybeFlatmap = predicate.Maybe<TCoFlatMap>()) { - return SafeCastCanBePushed(maybeFlatmap.Cast(), lambdaArg); - } else if (NKikimr::NSsa::RuntimeVersion >= 3U && predicate.Maybe<TCoJsonExists>()) { + return SafeCastCanBePushed(maybeFlatmap.Cast(), lambdaArg, settings); + } else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonExistsOperator) && predicate.Maybe<TCoJsonExists>()) { auto jsonExists = predicate.Cast<TCoJsonExists>(); return JsonExistsCanBePushed(jsonExists, lambdaArg); } @@ -409,7 +409,7 @@ bool ExistsCanBePushed(const TCoExists& exists, const TExprNode* lambdaArg) { return true; } -void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) { +void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) { if (!opNode.Maybe<TCoAnd>() && !opNode.Maybe<TCoOr>() && !opNode.Maybe<TCoOr>()) { return; } @@ -417,7 +417,7 @@ void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicate predicateTree.CanBePushed = true; for (auto& childNodePtr: opNode.Ptr()->Children()) { TPredicateNode child(childNodePtr); - CollectPredicates(TExprBase(childNodePtr), child, lambdaArg, lambdaBody); + CollectPredicates(TExprBase(childNodePtr), child, lambdaArg, lambdaBody, settings); predicateTree.Children.emplace_back(child); predicateTree.CanBePushed &= child.CanBePushed; } @@ -425,48 +425,13 @@ void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicate } // anonymous namespace end -bool TPredicateNode::IsValid() const { - bool res = true; - if (Op != EBoolOp::Undefined) { - res &= !Children.empty(); - for (auto& child : Children) { - res &= child.IsValid(); - } - } - - return res && ExprNode.IsValid(); -} - -void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) { - auto predicatesSize = predicates.size(); - if (predicatesSize == 0) { - return; - } else if (predicatesSize == 1) { - *this = predicates[0]; - } else { - Op = EBoolOp::And; - Children = predicates; - CanBePushed = true; - - TVector<TExprBase> exprNodes; - exprNodes.reserve(predicatesSize); - for (auto& pred : predicates) { - exprNodes.emplace_back(pred.ExprNode.Cast()); - CanBePushed &= pred.CanBePushed; - } - ExprNode = Build<TCoAnd>(ctx, pos) - .Add(exprNodes) - .Done(); - } -} - -void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) { +void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) { if (predicate.Maybe<TCoCoalesce>()) { auto coalesce = predicate.Cast<TCoCoalesce>(); - predicateTree.CanBePushed = CoalesceCanBePushed(coalesce, lambdaArg, lambdaBody); + predicateTree.CanBePushed = CoalesceCanBePushed(coalesce, lambdaArg, lambdaBody, settings); } else if (predicate.Maybe<TCoCompare>()) { auto compare = predicate.Cast<TCoCompare>(); - predicateTree.CanBePushed = CompareCanBePushed(compare, lambdaArg, lambdaBody); + predicateTree.CanBePushed = CompareCanBePushed(compare, lambdaArg, lambdaBody, settings); } else if (predicate.Maybe<TCoExists>()) { auto exists = predicate.Cast<TCoExists>(); predicateTree.CanBePushed = ExistsCanBePushed(exists, lambdaArg); @@ -474,19 +439,19 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree predicateTree.Op = EBoolOp::Not; auto notOp = predicate.Cast<TCoNot>(); TPredicateNode child(notOp.Value()); - CollectPredicates(notOp.Value(), child, lambdaArg, lambdaBody); + CollectPredicates(notOp.Value(), child, lambdaArg, lambdaBody, settings); predicateTree.CanBePushed = child.CanBePushed; predicateTree.Children.emplace_back(child); } else if (predicate.Maybe<TCoAnd>()) { predicateTree.Op = EBoolOp::And; - CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoAnd>(), predicateTree, lambdaArg, lambdaBody); + CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoAnd>(), predicateTree, lambdaArg, lambdaBody, settings); } else if (predicate.Maybe<TCoOr>()) { predicateTree.Op = EBoolOp::Or; - CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoOr>(), predicateTree, lambdaArg, lambdaBody); + CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoOr>(), predicateTree, lambdaArg, lambdaBody, settings); } else if (predicate.Maybe<TCoXor>()) { predicateTree.Op = EBoolOp::Xor; - CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoXor>(), predicateTree, lambdaArg, lambdaBody); - } else if (NKikimr::NSsa::RuntimeVersion >= 3U && predicate.Maybe<TCoJsonExists>()) { + CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoXor>(), predicateTree, lambdaArg, lambdaBody, settings); + } else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonExistsOperator) && predicate.Maybe<TCoJsonExists>()) { auto jsonExists = predicate.Cast<TCoJsonExists>(); predicateTree.CanBePushed = JsonExistsCanBePushed(jsonExists, lambdaArg); } else { @@ -494,4 +459,4 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree } } -} // namespace NKikimr::NKqp::NOpt
\ No newline at end of file +} // namespace NYql::NPushdown diff --git a/ydb/library/yql/providers/common/pushdown/collection.h b/ydb/library/yql/providers/common/pushdown/collection.h new file mode 100644 index 0000000000..05373efc65 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/collection.h @@ -0,0 +1,14 @@ +#pragma once +#include <ydb/library/yql/providers/common/pushdown/predicate_node.h> +#include <ydb/library/yql/providers/common/pushdown/settings.h> + +#include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h> + +namespace NYql::NPushdown { + +void CollectPredicates(const NNodes::TExprBase& predicate, TPredicateNode& predicateTree, + const TExprNode* lambdaArg, const NNodes::TExprBase& lambdaBody, + const TSettings& settings); + +} // namespace NYql::NPushdown diff --git a/ydb/library/yql/providers/common/pushdown/predicate_node.cpp b/ydb/library/yql/providers/common/pushdown/predicate_node.cpp new file mode 100644 index 0000000000..68bd089b81 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/predicate_node.cpp @@ -0,0 +1,56 @@ +#include "predicate_node.h" + +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> + +namespace NYql::NPushdown { + +TPredicateNode::TPredicateNode() = default; + +TPredicateNode::TPredicateNode(const TExprNode::TPtr& nodePtr) + : ExprNode(nodePtr) +{} + +TPredicateNode::TPredicateNode(const NNodes::TExprBase& node) + : ExprNode(node) +{} + +TPredicateNode::TPredicateNode(const TPredicateNode& predNode) = default; + +TPredicateNode::~TPredicateNode() = default; + +bool TPredicateNode::IsValid() const { + bool res = true; + if (Op != EBoolOp::Undefined) { + res &= !Children.empty(); + for (auto& child : Children) { + res &= child.IsValid(); + } + } + + return res && ExprNode.IsValid(); +} + +void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) { + auto predicatesSize = predicates.size(); + if (predicatesSize == 0) { + return; + } else if (predicatesSize == 1) { + *this = predicates[0]; + } else { + Op = EBoolOp::And; + Children = predicates; + CanBePushed = true; + + TVector<NNodes::TExprBase> exprNodes; + exprNodes.reserve(predicatesSize); + for (auto& pred : predicates) { + exprNodes.emplace_back(pred.ExprNode.Cast()); + CanBePushed &= pred.CanBePushed; + } + ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos) + .Add(exprNodes) + .Done(); + } +} + +} // namespace NYql::NPushdown diff --git a/ydb/library/yql/providers/common/pushdown/predicate_node.h b/ydb/library/yql/providers/common/pushdown/predicate_node.h new file mode 100644 index 0000000000..06a167b60b --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/predicate_node.h @@ -0,0 +1,34 @@ +#pragma once + +#include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/ast/yql_pos_handle.h> +#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h> + +namespace NYql::NPushdown { + +enum class EBoolOp { + Undefined = 0, + And, + Or, + Xor, + Not +}; + +struct TPredicateNode { + TPredicateNode(); + TPredicateNode(const TExprNode::TPtr& nodePtr); + TPredicateNode(const NNodes::TExprBase& node); + TPredicateNode(const TPredicateNode& predNode); + + ~TPredicateNode(); + + bool IsValid() const; + void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos); + + NNodes::TMaybeNode<NNodes::TExprBase> ExprNode; + std::vector<TPredicateNode> Children; + EBoolOp Op = EBoolOp::Undefined; + bool CanBePushed = false; +}; + +} // namespace NYql::NPushdown diff --git a/ydb/library/yql/providers/common/pushdown/settings.cpp b/ydb/library/yql/providers/common/pushdown/settings.cpp new file mode 100644 index 0000000000..7917740ddb --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/settings.cpp @@ -0,0 +1,17 @@ +#include "settings.h" + +namespace NYql::NPushdown { + +void TSettings::Enable(ui64 flagsMask, bool set) { + if (set) { + FeatureFlags |= flagsMask; + } else { + FeatureFlags &= ~flagsMask; + } +} + +bool TSettings::IsEnabled(EFeatureFlag flagMask) const { + return (FeatureFlags & flagMask) != 0; +} + +} // namespace NYql::NPushdown diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h new file mode 100644 index 0000000000..c53e1bf9fe --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/settings.h @@ -0,0 +1,26 @@ +#pragma once + +#include <util/system/types.h> + +namespace NYql::NPushdown { + +struct TSettings { + enum EFeatureFlag : ui64 { + LikeOperator = 1, + LikeOperatorOnlyForUtf8 = 1 << 1, + JsonQueryOperators = 1 << 2, + JsonExistsOperator = 1 << 3, + }; + + TSettings() = default; + TSettings(const TSettings&) = default; + + void Enable(ui64 flagsMask, bool set = true); + + bool IsEnabled(EFeatureFlag flagMask) const; + +private: + ui64 FeatureFlags = 0; +}; + +} // namespace NYql::NPushdown diff --git a/ydb/library/yql/providers/common/pushdown/ya.make b/ydb/library/yql/providers/common/pushdown/ya.make new file mode 100644 index 0000000000..f488c383a9 --- /dev/null +++ b/ydb/library/yql/providers/common/pushdown/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +SRCS( + collection.cpp + predicate_node.cpp + settings.cpp +) + +PEERDIR( + ydb/library/yql/ast + ydb/library/yql/core + ydb/library/yql/core/expr_nodes + ydb/library/yql/core/expr_nodes_gen + ydb/library/yql/utils/log +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/providers/common/ya.make b/ydb/library/yql/providers/common/ya.make index 7bef7eaedd..be609dbd0d 100644 --- a/ydb/library/yql/providers/common/ya.make +++ b/ydb/library/yql/providers/common/ya.make @@ -14,6 +14,7 @@ RECURSE( proto proto/python provider + pushdown schema structured_token token_accessor diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go index 3a5e81102d..33e63b6e2e 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go @@ -157,6 +157,16 @@ func (h *handlerImpl[CONN]) ReadSplit( sb.WriteString(" FROM ") sb.WriteString(tableName) + if split.Select.Where != nil { + statement, err := FormatWhereStatement(split.Select.Where) + if err != nil { + logger.Error("Failed to format WHERE statement", log.Error(err), log.String("where", split.Select.Where.String())) + } else { + sb.WriteString(" ") + sb.WriteString(statement) + } + } + // execute query query := sb.String() diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go new file mode 100644 index 0000000000..876c0025a6 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go @@ -0,0 +1,86 @@ +package rdbms + +import ( + "fmt" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" + api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos" +) + +func FormatValue(value *Ydb.TypedValue) (string, error) { + switch v := value.Value.Value.(type) { + case *Ydb.Value_BoolValue: + return fmt.Sprintf("%t", v.BoolValue), nil + case *Ydb.Value_Int32Value: + return fmt.Sprintf("%d", v.Int32Value), nil + case *Ydb.Value_Uint32Value: + return fmt.Sprintf("%d", v.Uint32Value), nil + default: + return "", fmt.Errorf("%w, type: %T", utils.ErrUnimplementedTypedValue, v) + } +} + +func FormatExpression(expression *api_service_protos.TExpression) (string, error) { + switch e := expression.Payload.(type) { + case *api_service_protos.TExpression_Column: + return e.Column, nil + case *api_service_protos.TExpression_TypedValue: + return FormatValue(e.TypedValue) + default: + return "", fmt.Errorf("%w, type: %T", utils.ErrUnimplementedExpression, e) + } +} + +func FormatComparison(comparison *api_service_protos.TPredicate_TComparison) (string, error) { + var operation string + + switch op := comparison.Operation; op { + case api_service_protos.TPredicate_TComparison_EQ: + operation = " = " + default: + return "", fmt.Errorf("%w, op: %d", utils.ErrUnimplementedOperation, op) + } + + var ( + left string + right string + err error + ) + + left, err = FormatExpression(comparison.LeftValue) + if err != nil { + return "", fmt.Errorf("failed to format left argument: %w", err) + } + + right, err = FormatExpression(comparison.RightValue) + if err != nil { + return "", fmt.Errorf("failed to format right argument: %w", err) + } + + return fmt.Sprintf("(%s%s%s)", left, operation, right), nil +} + +func FormatPredicate(predicate *api_service_protos.TPredicate) (string, error) { + switch p := predicate.Payload.(type) { + case *api_service_protos.TPredicate_Comparison: + return FormatComparison(p.Comparison) + default: + return "", fmt.Errorf("%w, type: %T", utils.ErrUnimplementedPredicateType, p) + } +} + +func FormatWhereStatement(where *api_service_protos.TSelect_TWhere) (string, error) { + if where.FilterTyped == nil { + return "", utils.ErrUnimplemented + } + + formatted, err := FormatPredicate(where.FilterTyped) + if err != nil { + return "", err + } + + result := "WHERE " + formatted + + return result, nil +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make b/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make index e9a5b03e24..5bc3bfc6ac 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make @@ -4,6 +4,7 @@ SRCS( doc.go handler.go handler_factory.go + predicate_builder.go schema_builder.go ) @@ -13,4 +14,4 @@ GO_TEST_SRCS( END() -RECURSE_FOR_TESTS(ut)
\ No newline at end of file +RECURSE_FOR_TESTS(ut) diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go index a9093be650..08d114ebaf 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go @@ -10,12 +10,17 @@ import ( ) var ( - ErrTableDoesNotExist = fmt.Errorf("table does not exist") - ErrDataSourceNotSupported = fmt.Errorf("data source not supported") - ErrDataTypeNotSupported = fmt.Errorf("data type not supported") - ErrReadLimitExceeded = fmt.Errorf("read limit exceeded") - ErrInvalidRequest = fmt.Errorf("invalid request") - ErrValueOutOfTypeBounds = fmt.Errorf("value is out of possible range of values for the type") + ErrTableDoesNotExist = fmt.Errorf("table does not exist") + ErrDataSourceNotSupported = fmt.Errorf("data source not supported") + ErrDataTypeNotSupported = fmt.Errorf("data type not supported") + ErrReadLimitExceeded = fmt.Errorf("read limit exceeded") + ErrInvalidRequest = fmt.Errorf("invalid request") + ErrValueOutOfTypeBounds = fmt.Errorf("value is out of possible range of values for the type") + ErrUnimplemented = fmt.Errorf("unimplemented") + ErrUnimplementedTypedValue = fmt.Errorf("unimplemented typed value") + ErrUnimplementedExpression = fmt.Errorf("unimplemented expression") + ErrUnimplementedOperation = fmt.Errorf("unimplemented operation") + ErrUnimplementedPredicateType = fmt.Errorf("unimplemented predicate type") ) func NewSuccess() *api_service_protos.TError { @@ -50,6 +55,16 @@ func NewAPIErrorFromStdError(err error) *api_service_protos.TError { status = Ydb.StatusIds_UNSUPPORTED case errors.Is(err, ErrValueOutOfTypeBounds): status = Ydb.StatusIds_UNSUPPORTED + case errors.Is(err, ErrUnimplementedTypedValue): + status = Ydb.StatusIds_UNSUPPORTED + case errors.Is(err, ErrUnimplementedExpression): + status = Ydb.StatusIds_UNSUPPORTED + case errors.Is(err, ErrUnimplementedOperation): + status = Ydb.StatusIds_UNSUPPORTED + case errors.Is(err, ErrUnimplementedPredicateType): + status = Ydb.StatusIds_UNSUPPORTED + case errors.Is(err, ErrUnimplemented): + status = Ydb.StatusIds_UNSUPPORTED default: status = Ydb.StatusIds_INTERNAL_ERROR } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h index 0dc5536695..963964efbc 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h @@ -94,8 +94,7 @@ namespace NYql::NConnector { virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request) = 0; virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request) = 0; virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request) = 0; - virtual ~IClient() { - } + virtual ~IClient() = default; }; IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg); diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp index 326b438e52..172194397a 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp @@ -6,6 +6,17 @@ namespace NYql::NConnector::NTest { using namespace fmt::literals; +#define DEFINE_SIMPLE_TYPE_SETTER(T, primitiveTypeId, value_name) \ + template <> \ + void SetSimpleValue(const T& value, Ydb::TypedValue* proto) { \ + proto->mutable_type()->set_type_id(::Ydb::Type::primitiveTypeId); \ + proto->mutable_value()->Y_CAT(set_, value_name)(value); \ + } + + DEFINE_SIMPLE_TYPE_SETTER(bool, BOOL, bool_value); + DEFINE_SIMPLE_TYPE_SETTER(i32, INT32, int32_value); + DEFINE_SIMPLE_TYPE_SETTER(ui32, UINT32, uint32_value); + void CreatePostgreSQLExternalDataSource( const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr, const TString& dataSourceName, diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h index 61cb85dc90..19e9fa06fb 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -18,6 +18,7 @@ #include <google/protobuf/util/message_differencer.h> #include <memory> +#include <type_traits> namespace NYql::NConnector::NTest { using namespace testing; @@ -67,23 +68,54 @@ namespace NYql::NConnector::NTest { } \ } + // Make arrow array for one column. + // Returns field for schema and array with data. + // Designed for call with MakeRecordBatch function. template <class TArrowBuilderType, class TColDataType> - std::shared_ptr<arrow::RecordBatch> MakeRecordBatch( - const TString& columnName, - const std::vector<TColDataType>& input, - std::shared_ptr<arrow::DataType> dataType) { + std::tuple<std::shared_ptr<arrow::Field>, std::shared_ptr<arrow::Array>> + MakeArray(const TString& columnName, + const std::vector<TColDataType>& input, + std::shared_ptr<arrow::DataType> dataType) { TArrowBuilderType builder; UNIT_ASSERT_EQUAL(builder.AppendValues(input), arrow::Status::OK()); std::shared_ptr<arrow::Array> columnData; UNIT_ASSERT_EQUAL(builder.Finish(&columnData), arrow::Status::OK()); auto field = arrow::field(columnName, std::move(dataType)); + return {std::move(field), std::move(columnData)}; + } + + // Make record batch with the only column. + template <class TArrowBuilderType, class TColDataType> + std::shared_ptr<arrow::RecordBatch> MakeRecordBatch( + const TString& columnName, + const std::vector<TColDataType>& input, + std::shared_ptr<arrow::DataType> dataType) { + auto [field, columnData] = MakeArray<TArrowBuilderType, TColDataType>(columnName, input, dataType); auto schema = arrow::schema({field}); return arrow::RecordBatch::Make(schema, columnData->length(), {columnData}); } + template <class T> + concept TFieldAndArray = std::is_same_v<T, std::tuple<std::shared_ptr<arrow::Field>, std::shared_ptr<arrow::Array>>>; + + // Make record batch from several results of MakeArray calls with different params. + template <TFieldAndArray... TArrayFieldTuple> + std::shared_ptr<arrow::RecordBatch> MakeRecordBatch(const TArrayFieldTuple&... fields) { + auto schema = arrow::schema({std::get<0>(fields)...}); + std::vector<std::shared_ptr<arrow::Array>> data = {std::get<1>(fields)...}; + UNIT_ASSERT(data.size()); + for (const auto& d : data) { + UNIT_ASSERT_VALUES_EQUAL(d->length(), data[0]->length()); + } + return arrow::RecordBatch::Make(schema, data[0]->length(), std::move(data)); + } + // Make record batch with schema with no columns std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount); + template <class T> + void SetSimpleValue(const T& value, Ydb::TypedValue* proto); + template <class TParent> struct TWithParentBuilder { explicit TWithParentBuilder(TParent* parent) @@ -170,9 +202,9 @@ namespace NYql::NConnector::NTest { class TConnectorClientMock: public NYql::NConnector::IClient { public: - MOCK_METHOD(TDescribeTableAsyncResult, DescribeTable, (const NApi::TDescribeTableRequest& request), (override)); - MOCK_METHOD(TIteratorAsyncResult<IListSplitsStreamIterator>, ListSplits, (const NApi::TListSplitsRequest& request), (override)); - MOCK_METHOD(TIteratorAsyncResult<IReadSplitsStreamIterator>, ReadSplits, (const NApi::TReadSplitsRequest& request), (override)); + MOCK_METHOD(TResult<NApi::TDescribeTableResponse>, DescribeTableImpl, (const NApi::TDescribeTableRequest& request)); + MOCK_METHOD(TIteratorResult<IListSplitsStreamIterator>, ListSplitsImpl, (const NApi::TListSplitsRequest& request)); + MOCK_METHOD(TIteratorResult<IReadSplitsStreamIterator>, ReadSplitsImpl, (const NApi::TReadSplitsRequest& request)); // // Expectation helpers @@ -260,7 +292,7 @@ namespace NYql::NConnector::NTest { TBuilder& Status(const Ydb::StatusIds_StatusCode value) { this->Result_->mutable_error()->set_status(value); return static_cast<TBuilder&>(*this); - }; + } // TODO: add nonprimitive types TBuilder& Column(const TString& name, Ydb::Type::PrimitiveTypeId typeId) { @@ -270,6 +302,13 @@ namespace NYql::NConnector::NTest { return *this; } + TBuilder& NullableColumn(const TString& name, Ydb::Type::PrimitiveTypeId typeId) { + auto* col = this->Result_->mutable_schema()->add_columns(); + col->set_name(name); + col->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(typeId); + return *this; + } + void FillWithDefaults() { Status(Ydb::StatusIds::SUCCESS); } @@ -308,10 +347,11 @@ namespace NYql::NConnector::NTest { private: void SetExpectation() { - auto future = NThreading::MakeFuture<TResult<NApi::TDescribeTableResponse>>({NGrpc::TGrpcStatus(), *ResponseResult_}); - - EXPECT_CALL(*Mock_, DescribeTable(ProtobufRequestMatcher(*Result_))) - .WillOnce(Return(future)); + EXPECT_CALL(*Mock_, DescribeTableImpl(ProtobufRequestMatcher(*Result_))) + .WillOnce(Return( + TResult<NApi::TDescribeTableResponse>( + {NGrpc::TGrpcStatus(), + *ResponseResult_}))); } private: @@ -320,6 +360,140 @@ namespace NYql::NConnector::NTest { }; template <class TParent = void /* no parent by default */> + struct TExpressionBuilder: public TProtoBuilder<TParent, NApi::TExpression> { + using TBuilder = TExpressionBuilder<TParent>; + + TExpressionBuilder(NApi::TExpression* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TExpression>(result, parent) + { + FillWithDefaults(); + } + + SETTER(Column, column); + + template <class T> + TBuilder& Value(const T& value) { + SetSimpleValue(value, this->Result_->mutable_typed_value()); + return *this; + } + + void FillWithDefaults() { + } + }; + + template <class TParent = void /* no parent by default */> + struct TComparisonBaseBuilder: public TProtoBuilder<TParent, NApi::TPredicate::TComparison> { + using TBuilder = TComparisonBaseBuilder<TParent>; + + TComparisonBaseBuilder(NApi::TPredicate::TComparison* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TPredicate::TComparison>(result, parent) + { + FillWithDefaults(); + } + + TBuilder& Arg(const NApi::TExpression& expr) { + MutableArg()->CopyFrom(expr); + return *this; + } + TExpressionBuilder<TBuilder> Arg() { + return TExpressionBuilder<TBuilder>(MutableArg(), this); + } + + TBuilder& Column(const TString& name) { + return Arg().Column(name).Done(); + } + + TBuilder& Value(const auto& value) { + return Arg().Value(value).Done(); + } + + void FillWithDefaults() { + } + + protected: + SETTER(Operation, operation); + + private: + NApi::TExpression* MutableArg() { + if (!this->Result_->has_left_value()) { + return this->Result_->mutable_left_value(); + } + UNIT_ASSERT(!this->Result_->has_right_value()); + return this->Result_->mutable_right_value(); + } + }; + + template <NApi::TPredicate::TComparison::EOperation operation, class TParent = void /* no parent by default */> + struct TComparisonBuilder: public TComparisonBaseBuilder<TParent> { + using TBuilder = TComparisonBuilder<operation, TParent>; + + TComparisonBuilder(NApi::TPredicate::TComparison* result = nullptr, TParent* parent = nullptr) + : TComparisonBaseBuilder<TParent>(result, parent) + { + FillWithDefaults(); + } + + void FillWithDefaults() { + this->Operation(operation); + } + }; + + template <class TParent> + using TEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::EQ, TParent>; + + template <class TParent> + using TNotEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::NE, TParent>; + + template <class TParent> + using TLessBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::L, TParent>; + + template <class TParent> + using TLessOrEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::LE, TParent>; + + template <class TParent> + using TGreaterBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::G, TParent>; + + template <class TParent> + using TGreaterOrEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::GE, TParent>; + + template <class TParent = void /* no parent by default */> + struct TPredicateBuilder: public TProtoBuilder<TParent, NApi::TPredicate> { + using TBuilder = TPredicateBuilder<TParent>; + + explicit TPredicateBuilder(NApi::TPredicate* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TPredicate>(result, parent) + { + FillWithDefaults(); + } + + SUBPROTO_BUILDER(Equal, mutable_comparison, NApi::TPredicate::TComparison, TEqualBuilder<TBuilder>); + SUBPROTO_BUILDER(NotEqual, mutable_comparison, NApi::TPredicate::TComparison, TNotEqualBuilder<TBuilder>); + SUBPROTO_BUILDER(Less, mutable_comparison, NApi::TPredicate::TComparison, TLessBuilder<TBuilder>); + SUBPROTO_BUILDER(LessOrEqual, mutable_comparison, NApi::TPredicate::TComparison, TLessOrEqualBuilder<TBuilder>); + SUBPROTO_BUILDER(Greater, mutable_comparison, NApi::TPredicate::TComparison, TGreaterBuilder<TBuilder>); + SUBPROTO_BUILDER(GreaterOrEqual, mutable_comparison, NApi::TPredicate::TComparison, TGreaterOrEqualBuilder<TBuilder>); + + void FillWithDefaults() { + } + }; + + template <class TParent = void /* no parent by default */> + struct TWhereBuilder: public TProtoBuilder<TParent, NApi::TSelect::TWhere> { + using TBuilder = TWhereBuilder<TParent>; + + explicit TWhereBuilder(NApi::TSelect::TWhere* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TSelect::TWhere>(result, parent) + { + FillWithDefaults(); + } + + SUBPROTO_BUILDER(Filter, mutable_filter_typed, NApi::TPredicate, TPredicateBuilder<TBuilder>); + + void FillWithDefaults() { + } + }; + + template <class TParent = void /* no parent by default */> struct TWhatBuilder: public TProtoBuilder<TParent, NApi::TSelect::TWhat> { using TBuilder = TWhatBuilder<TParent>; @@ -337,6 +511,13 @@ namespace NYql::NConnector::NTest { return *this; } + TBuilder& NullableColumn(const TString& name, Ydb::Type::PrimitiveTypeId typeId) { + auto* col = this->Result_->add_items()->mutable_column(); + col->set_name(name); + col->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(typeId); + return *this; + } + void FillWithDefaults() { } }; @@ -354,6 +535,7 @@ namespace NYql::NConnector::NTest { EXPR_SETTER(Table, mutable_from()->set_table); DATA_SOURCE_INSTANCE_SUBBUILDER(); SUBPROTO_BUILDER(What, mutable_what, NApi::TSelect::TWhat, TWhatBuilder<TBuilder>); + SUBPROTO_BUILDER(Where, mutable_where, NApi::TSelect::TWhere, TWhereBuilder<TBuilder>); void FillWithDefaults() { Table(DEFAULT_TABLE); @@ -433,10 +615,8 @@ namespace NYql::NConnector::NTest { private: void SetExpectation() { - auto future = NThreading::MakeFuture<TIteratorResult<IListSplitsStreamIterator>>( - TIteratorResult<IListSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_}); - EXPECT_CALL(*Mock_, ListSplits(ProtobufRequestMatcher(*Result_))) - .WillOnce(Return(future)); + EXPECT_CALL(*Mock_, ListSplitsImpl(ProtobufRequestMatcher(*Result_))) + .WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_})); } private: @@ -464,7 +644,7 @@ namespace NYql::NConnector::NTest { response.mutable_error()->CopyFrom(error); response.set_arrow_ipc_streaming(ser.Serialize(recordBatch)); return static_cast<TBuilder&>(*this); - }; + } void FillWithDefaults() { } @@ -504,10 +684,8 @@ namespace NYql::NConnector::NTest { private: void SetExpectation() { - auto future = NThreading::MakeFuture<TIteratorResult<IReadSplitsStreamIterator>>( - TIteratorResult<IReadSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_}); - EXPECT_CALL(*Mock_, ReadSplits(ProtobufRequestMatcher(*Result_))) - .WillOnce(Return(future)); + EXPECT_CALL(*Mock_, ReadSplitsImpl(ProtobufRequestMatcher(*Result_))) + .WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_})); } private: @@ -526,5 +704,53 @@ namespace NYql::NConnector::NTest { TReadSplitsExpectationBuilder ExpectReadSplits() { return TReadSplitsExpectationBuilder(this); } + + TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request) override { + Cerr << "Call DescribeTable.\n" + << request.Utf8DebugString() << Endl; + auto result = DescribeTableImpl(request); + Cerr << "DescribeTable result.\n" + << StatusToDebugString(result.Status); + if (result.Response) { + Cerr << '\n' + << result.Response->Utf8DebugString(); + } + Cerr << Endl; + return NThreading::MakeFuture(std::move(result)); + } + + TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request) override { + Cerr << "Call ListSplits.\n" + << request.Utf8DebugString() << Endl; + auto result = ListSplitsImpl(request); + Cerr << "ListSplits result.\n" + << StatusToDebugString(result.Status) << Endl; + return NThreading::MakeFuture(std::move(result)); + } + + TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request) override { + Cerr << "Call ReadSplits.\n" + << request.Utf8DebugString() << Endl; + auto result = ReadSplitsImpl(request); + Cerr << "ReadSplits result.\n" + << StatusToDebugString(result.Status) << Endl; + return NThreading::MakeFuture(std::move(result)); + } + + protected: + static TString StatusToDebugString(const NGrpc::TGrpcStatus& status) { + TStringBuilder s; + s << "GRpcStatusCode: " << status.GRpcStatusCode << '\n'; + if (status.Msg) { + s << status.Msg; + } + if (status.Details) { + s << " (" << status.Details << ')'; + } + if (status.InternalError) { + s << " (internal error)"; + } + return std::move(s); + } }; } // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json index a36acc049e..88e63d1583 100644 --- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json +++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json @@ -32,7 +32,8 @@ {"Index": 0, "Name": "World", "Type": "TExprBase"}, {"Index": 1, "Name": "DataSource", "Type": "TGenDataSource"}, {"Index": 2, "Name": "Table", "Type": "TCoAtom"}, - {"Index": 3, "Name": "Columns", "Type": "TExprBase"} + {"Index": 3, "Name": "Columns", "Type": "TExprBase"}, + {"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"} ] }, { @@ -43,7 +44,8 @@ {"Index": 0, "Name": "Cluster", "Type": "TCoAtom"}, {"Index": 1, "Name": "Table", "Type": "TCoAtom"}, {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}, - {"Index": 3, "Name": "Columns", "Type": "TCoAtomList"} + {"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"} ] } ] diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt index 971f4dac73..e85bf56b3b 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt @@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-pushdown providers-common-structured_token providers-common-transform providers-dq-common @@ -52,6 +53,7 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt index 7e3022b156..3e80e407cd 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt @@ -32,6 +32,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-pushdown providers-common-structured_token providers-common-transform providers-dq-common @@ -53,6 +54,7 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt index 7e3022b156..3e80e407cd 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-pushdown providers-common-structured_token providers-common-transform providers-dq-common @@ -53,6 +54,7 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt index 971f4dac73..e85bf56b3b 100644 --- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt @@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC providers-common-mkql providers-common-proto providers-common-provider + providers-common-pushdown providers-common-structured_token providers-common-transform providers-dq-common @@ -52,6 +53,7 @@ target_sources(providers-generic-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index 06a700d75d..21c6fd2b18 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -13,6 +13,7 @@ SRCS( yql_generic_logical_opt.cpp yql_generic_mkql_compiler.cpp yql_generic_physical_opt.cpp + yql_generic_predicate_pushdown.cpp yql_generic_provider.cpp yql_generic_provider.h yql_generic_provider_impl.h @@ -42,6 +43,7 @@ PEERDIR( ydb/library/yql/providers/common/mkql ydb/library/yql/providers/common/proto ydb/library/yql/providers/common/provider + ydb/library/yql/providers/common/pushdown ydb/library/yql/providers/common/structured_token ydb/library/yql/providers/common/transform ydb/library/yql/providers/dq/common diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp index 8a6ece89e0..2c3122cc27 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp @@ -3,6 +3,7 @@ #include "yql_generic_provider_impl.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> @@ -21,6 +22,7 @@ namespace NYql { public: TGenericDataSource(TGenericState::TPtr state) : State_(state) + , ConfigurationTransformer_(MakeHolder<NCommon::TProviderConfigurationTransformer>(State_->Configuration, *State_->Types, TString{GenericProviderName})) , IODiscoveryTransformer_(CreateGenericIODiscoveryTransformer(State_)) , LoadMetaDataTransformer_(CreateGenericLoadTableMetadataTransformer(State_)) , TypeAnnotationTransformer_(CreateGenericDataSourceTypeAnnotationTransformer(State_)) @@ -36,7 +38,7 @@ namespace NYql { if (node.IsCallable(TCoDataSource::CallableName())) { if (node.Child(0)->Content() == GenericProviderName) { auto clusterName = node.Child(1)->Content(); - if (!State_->Configuration->HasCluster(clusterName)) { + if (clusterName != NCommon::ALL_CLUSTERS && !State_->Configuration->HasCluster(clusterName)) { ctx.AddError(TIssue(ctx.GetPosition(node.Child(1)->Pos()), TStringBuilder() << "Unknown cluster name: " << clusterName)); return false; @@ -56,6 +58,10 @@ namespace NYql { return TypeAnnotationTransformer_->CanParse(node); } + IGraphTransformer& GetConfigurationTransformer() override { + return *ConfigurationTransformer_; + } + IGraphTransformer& GetIODiscoveryTransformer() override { return *IODiscoveryTransformer_; } @@ -140,6 +146,7 @@ namespace NYql { private: const TGenericState::TPtr State_; + const THolder<IGraphTransformer> ConfigurationTransformer_; const THolder<IGraphTransformer> IODiscoveryTransformer_; const THolder<IGraphTransformer> LoadMetaDataTransformer_; const THolder<TVisitorTransformerBase> TypeAnnotationTransformer_; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp index c36aaa1eca..a7b14b26cf 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp @@ -25,12 +25,58 @@ namespace NYql { , State_(state) { using TSelf = TGenericDataSourceTypeAnnotationTransformer; + AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig)); AddHandler({TGenReadTable::CallableName()}, Hndl(&TSelf::HandleReadTable)); AddHandler({TGenSourceSettings::CallableName()}, Hndl(&TSelf::HandleSourceSettings)); } + TStatus HandleConfig(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureMinArgsCount(*input, 2, ctx)) { + return TStatus::Error; + } + + if (!EnsureWorldType(*input->Child(TCoConfigure::idx_World), ctx)) { + return TStatus::Error; + } + + if (!EnsureSpecificDataSource(*input->Child(TCoConfigure::idx_DataSource), GenericProviderName, ctx)) { + return TStatus::Error; + } + + input->SetTypeAnn(input->Child(TCoConfigure::idx_World)->GetTypeAnn()); + return TStatus::Ok; + } + + TStatus AnnotateFilterPredicate(const TExprNode::TPtr& input, size_t childIndex, const TStructExprType* itemType, TExprContext& ctx) { + if (childIndex >= input->ChildrenSize()) { + return TStatus::Error; + } + + auto& filterLambda = input->ChildRef(childIndex); + if (!EnsureLambda(*filterLambda, ctx)) { + return TStatus::Error; + } + + if (!UpdateLambdaAllArgumentsTypes(filterLambda, {itemType}, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + if (const auto* filterLambdaType = filterLambda->GetTypeAnn()) { + if (filterLambdaType->GetKind() != ETypeAnnotationKind::Data) { + return IGraphTransformer::TStatus::Error; + } + const TDataExprType* dataExprType = static_cast<const TDataExprType*>(filterLambdaType); + if (dataExprType->GetSlot() != EDataSlot::Bool) { + return IGraphTransformer::TStatus::Error; + } + } else { + return IGraphTransformer::TStatus::Repeat; + } + return TStatus::Ok; + } + TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 4, ctx)) { + if (!EnsureArgsCount(*input, 5, ctx)) { return TStatus::Error; } @@ -76,6 +122,12 @@ namespace NYql { } } + // Filter + const TStatus filterAnnotationStatus = AnnotateFilterPredicate(input, TGenSourceSettings::idx_FilterPredicate, structExprType, ctx); + if (filterAnnotationStatus != TStatus::Ok) { + return filterAnnotationStatus; + } + blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>( BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)))); const TTypeAnnotationNode* typeAnnotationNode = ctx.MakeType<TStructExprType>(blockRowTypeItems); @@ -90,7 +142,7 @@ namespace NYql { TStatus HandleReadTable(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { Y_UNUSED(output); - if (!EnsureArgsCount(*input, 4, ctx)) { + if (!EnsureArgsCount(*input, 5, ctx)) { return TStatus::Error; } @@ -151,6 +203,12 @@ namespace NYql { YQL_CLOG(DEBUG, ProviderGeneric) << "struct column order" << (static_cast<const TStructExprType*>(itemType))->ToString(); } + // Filter + const TStatus filterAnnotationStatus = AnnotateFilterPredicate(input, TGenReadTable::idx_FilterPredicate, itemType, ctx); + if (filterAnnotationStatus != TStatus::Ok) { + return filterAnnotationStatus; + } + input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{ input->Child(TGenReadTable::idx_World)->GetTypeAnn(), ctx.MakeType<TListExprType>(itemType)})); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 30a10b7895..39adc3d0e8 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -1,6 +1,7 @@ #include "yql_generic_dq_integration.h" #include "yql_generic_mkql_compiler.h" +#include "yql_generic_predicate_pushdown.h" #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> @@ -70,6 +71,7 @@ namespace NYql { .Name().Build(token) .Build() .Columns(std::move(columns)) + .FilterPredicate(genReadTable.FilterPredicate()) .Build() .RowType(ExpandType(genReadTable.Pos(), *rowType, ctx)) .DataSource(genReadTable.DataSource().Cast<TCoDataSource>()) @@ -139,6 +141,13 @@ namespace NYql { column->mutable_type()->CopyFrom(type); } + if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) { + TStringBuilder err; + if (!SerializeFilterPredicate(predicate, select->mutable_where()->mutable_filter_typed(), err)) { + ythrow yexception() << "Failed to serialize filter predicate for source: " << err; + } + } + // store data source instance srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index b6c41cd3c5..3baedc42cd 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -231,11 +231,22 @@ namespace NYql { tableMeta.ItemType = parse; if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) { // clang-format off + auto row = Build<TCoArgument>(ctx, read.Pos()) + .Name("row") + .Done(); + auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos()) + .Args({row}) + .Body<TCoBool>() + .Literal().Build("true") + .Build() + .Done().Ptr(); + ins.first->second = Build<TGenReadTable>(ctx, read.Pos()) .World(read.World()) .DataSource(read.DataSource()) .Table().Value(tableName).Build() .Columns<TCoVoid>().Build() + .FilterPredicate(emptyPredicate) .Done().Ptr(); // clang-format on } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp index be30204547..3620d5e14e 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp @@ -1,14 +1,18 @@ #include "yql_generic_provider_impl.h" +#include "yql_generic_predicate_pushdown.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/pushdown/collection.h> +#include <ydb/library/yql/providers/common/pushdown/predicate_node.h> #include <ydb/library/yql/providers/common/transform/yql_optimize.h> #include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/providers/common/provider/yql_provider.h> namespace NYql { @@ -16,6 +20,13 @@ namespace NYql { namespace { + struct TPushdownSettings: public NPushdown::TSettings { + TPushdownSettings() { + using EFlag = NPushdown::TSettings::EFeatureFlag; + Enable(EFlag::LikeOperator | EFlag::JsonQueryOperators | EFlag::JsonExistsOperator); + } + }; + class TGenericPhysicalOptProposalTransformer: public TOptimizeTransformerBase { public: TGenericPhysicalOptProposalTransformer(TGenericState::TPtr state) @@ -25,6 +36,8 @@ namespace NYql { #define HNDL(name) "PhysicalOptimizer-" #name, Hndl(&TGenericPhysicalOptProposalTransformer::name) AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld)); AddHandler(0, &TCoNarrowMap::Match, HNDL(ReadZeroColumns)); + AddHandler(0, &TCoFlatMap::Match, HNDL(PushFilterToReadTable)); + AddHandler(0, &TCoFlatMap::Match, HNDL(PushFilterToDqSourceWrap)); #undef HNDL } @@ -89,10 +102,153 @@ namespace NYql { return node; } + static NPushdown::TPredicateNode BuildEmptyPredicate(TExprContext& ctx, TPositionHandle pos) { + auto emptyPredicate = Build<TCoBool>(ctx, pos).Literal().Build("true").Done(); + NPushdown::TPredicateNode p; + p.ExprNode = emptyPredicate; + return p; + } + + static NPushdown::TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, + TExprContext& ctx, TPositionHandle pos) + { + if (predicateTree.CanBePushed) { + return predicateTree; + } + + if (predicateTree.Op != NPushdown::EBoolOp::And) { + return BuildEmptyPredicate(ctx, pos); + } + + std::vector<NPushdown::TPredicateNode> pushable; + for (auto& predicate : predicateTree.Children) { + if (predicate.CanBePushed) { + pushable.emplace_back(predicate); + } + } + NPushdown::TPredicateNode predicateToPush; + predicateToPush.SetPredicates(pushable, ctx, pos); + return predicateToPush; + } + + TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos) const { + auto lambdaArg = lambda.Args().Arg(0).Ptr(); + + YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Initial filter lambda: " << NCommon::ExprToPrettyString(ctx, lambda.Ref()); + + auto maybeOptionalIf = lambda.Body().Maybe<TCoOptionalIf>(); + if (!maybeOptionalIf.IsValid()) { + return {}; + } + + TCoOptionalIf optionalIf = maybeOptionalIf.Cast(); + NPushdown::TPredicateNode predicateTree(optionalIf.Predicate()); + NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), TPushdownSettings()); + YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid"); + + NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos); + if (!predicateToPush.IsValid()) { + return {}; + } + + // clang-format off + auto newFilterLambda = Build<TCoLambda>(ctx, pos) + .Args({"filter_row"}) + .Body<TExprApplier>() + .Apply(predicateToPush.ExprNode.Cast()) + .With(TExprBase(lambdaArg), "filter_row") + .Build() + .Done(); + // clang-format on + + YQL_CLOG(INFO, ProviderGeneric) << "Push filter lambda: " << NCommon::ExprToPrettyString(ctx, *newFilterLambda.Ptr()); + return newFilterLambda; + } + + TMaybeNode<TExprBase> PushFilterToReadTable(TExprBase node, TExprContext& ctx) const { + if (!State_->Configuration->UsePredicatePushdown.Get().GetOrElse(TGenericSettings::TDefault::UsePredicatePushdown)) { + return node; + } + + auto flatmap = node.Cast<TCoFlatMap>(); + auto maybeRight = flatmap.Input().Maybe<TCoRight>(); + if (!maybeRight) { + return node; + } + auto maybeGenericRead = maybeRight.Cast().Input().Maybe<TGenReadTable>(); + if (!maybeGenericRead) { + return node; + } + + TGenReadTable genericRead = maybeGenericRead.Cast(); + if (!IsEmptyFilterPredicate(genericRead.FilterPredicate())) { + YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Lambda is already not empty"; + return node; + } + + auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos()); + if (!newFilterLambda) { + return node; + } + + // clang-format off + return Build<TCoFlatMap>(ctx, flatmap.Pos()) + .InitFrom(flatmap) // Leave existing filter in flatmap for the case of not applying predicate in connector + .Input<TCoRight>() + .Input<TGenReadTable>() + .InitFrom(genericRead) + .FilterPredicate(newFilterLambda.Cast()) + .Build() + .Build() + .Done(); + // clang-format on + } + + TMaybeNode<TExprBase> PushFilterToDqSourceWrap(TExprBase node, TExprContext& ctx) const { + if (!State_->Configuration->UsePredicatePushdown.Get().GetOrElse(TGenericSettings::TDefault::UsePredicatePushdown)) { + return node; + } + + auto flatmap = node.Cast<TCoFlatMap>(); + auto maybeSourceWrap = flatmap.Input().Maybe<TDqSourceWrap>(); + if (!maybeSourceWrap) { + return node; + } + + TDqSourceWrap sourceWrap = maybeSourceWrap.Cast(); + auto maybeGenericSourceSettings = sourceWrap.Input().Maybe<TGenSourceSettings>(); + if (!maybeGenericSourceSettings) { + return node; + } + + TGenSourceSettings genericSourceSettings = maybeGenericSourceSettings.Cast(); + if (!IsEmptyFilterPredicate(genericSourceSettings.FilterPredicate())) { + YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Lambda is already not empty"; + return node; + } + + auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos()); + if (!newFilterLambda) { + return node; + } + + // clang-format off + return Build<TCoFlatMap>(ctx, flatmap.Pos()) + .InitFrom(flatmap) // Leave existing filter in flatmap for the case of not applying predicate in connector + .Input<TDqSourceWrap>() + .InitFrom(sourceWrap) + .Input<TGenSourceSettings>() + .InitFrom(genericSourceSettings) + .FilterPredicate(newFilterLambda.Cast()) + .Build() + .Build() + .Done(); + // clang-format on + } + private: const TGenericState::TPtr State_; }; - } THolder<IGraphTransformer> CreateGenericPhysicalOptProposalTransformer(TGenericState::TPtr state) { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp new file mode 100644 index 0000000000..8f24d0f58a --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp @@ -0,0 +1,92 @@ +#include "yql_generic_predicate_pushdown.h" + +#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h> + +namespace NYql { + + using namespace NNodes; + using namespace NConnector::NApi; + + namespace { + + bool SerializeMember(const TCoMember& member, TExpression* proto, const TCoArgument& arg, TStringBuilder& err) { + if (member.Struct().Raw() != arg.Raw()) { // member callable called not for lambda argument + err << "member callable called not for lambda argument"; + return false; + } + proto->set_column(member.Name().StringValue()); + return true; + } + + bool SerializeInt32(const TCoInt32& constant, TExpression* proto, TStringBuilder&) { + auto* value = proto->mutable_typed_value(); + auto* t = value->mutable_type(); + t->set_type_id(Ydb::Type::INT32); + auto* v = value->mutable_value(); + v->set_int32_value(FromString<i32>(constant.Literal())); + return true; + } + + bool SerializeExpression(const TExprBase& expression, TExpression* proto, const TCoArgument& arg, TStringBuilder& err) { + if (auto member = expression.Maybe<TCoMember>()) { + return SerializeMember(member.Cast(), proto, arg, err); + } + + // data + if (auto int32Atom = expression.Maybe<TCoInt32>()) { + return SerializeInt32(int32Atom.Cast(), proto, err); + } + + err << "unknown expression: " << expression.Raw()->Content(); + return false; + } + + bool SerializeCompare(const TCoCompare& compare, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err) { + TPredicate::TComparison* proto = predicateProto->mutable_comparison(); + if (compare.Maybe<TCoCmpEqual>()) { + proto->set_operation(TPredicate::TComparison::EQ); + } + if (proto->operation() == TPredicate::TComparison::RESERVED) { // Unknown operation + err << "unknown operation: " << compare.Raw()->Content(); + return false; + } + return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err); + } + + bool SerializeCoalesce(const TCoCoalesce& coalesce, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) { + auto predicate = coalesce.Predicate(); + if (auto compare = predicate.Maybe<TCoCompare>()) { + return SerializeCompare(compare.Cast(), proto, arg, err); + } + + err << "unknown coalesce predicate: " << predicate.Raw()->Content(); + return false; + } + + bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) { + if (auto compare = predicate.Maybe<TCoCompare>()) { + return SerializeCompare(compare.Cast(), proto, arg, err); + } + if (auto coalesce = predicate.Maybe<TCoCoalesce>()) { + return SerializeCoalesce(coalesce.Cast(), proto, arg, err); + } + + err << "unknown predicate: " << predicate.Raw()->Content(); + return false; + } + + } // namespace + + bool IsEmptyFilterPredicate(const TCoLambda& lambda) { + auto maybeBool = lambda.Body().Maybe<TCoBool>(); + if (!maybeBool) { + return false; + } + return TStringBuf(maybeBool.Cast().Literal()) == "true"sv; + } + + bool SerializeFilterPredicate(const TCoLambda& predicate, TPredicate* proto, TStringBuilder& err) { + return SerializePredicate(predicate.Body(), proto, predicate.Args().Arg(0), err); + } + +} // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h new file mode 100644 index 0000000000..121ab50527 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h @@ -0,0 +1,14 @@ +#pragma once + +#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> + +namespace NYql::NConnector::NApi { + class TPredicate; +} // namespace NYql::NConnector::NApi + +namespace NYql { + + bool IsEmptyFilterPredicate(const NNodes::TCoLambda& lambda); + bool SerializeFilterPredicate(const NNodes::TCoLambda& predicate, NConnector::NApi::TPredicate* proto, TStringBuilder& err); + +} // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp index c050651f63..29dfe55f56 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp @@ -7,17 +7,23 @@ namespace NYql { + TGenericConfiguration::TGenericConfiguration() { + REGISTER_SETTING(*this, UsePredicatePushdown); + } + void TGenericConfiguration::Init(const NYql::TGenericGatewayConfig& gatewayConfig, const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver, NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials) { + Dispatch(gatewayConfig.GetDefaultSettings()); + for (const auto& cluster : gatewayConfig.GetClusterMapping()) { AddCluster(cluster, databaseResolver, databaseAuth, credentials); } // TODO: check if it's necessary - this->FreezeDefaults(); + FreezeDefaults(); } void TGenericConfiguration::AddCluster(const TGenericClusterConfig& clusterConfig, @@ -93,4 +99,4 @@ namespace NYql { return ValidClusters.contains(cluster); } -}
\ No newline at end of file +} diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index 685f0004ee..639db317d3 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -9,12 +9,18 @@ namespace NYql { struct TGenericSettings { using TConstPtr = std::shared_ptr<const TGenericSettings>; + + NCommon::TConfSetting<bool, false> UsePredicatePushdown; + + struct TDefault { + static constexpr bool UsePredicatePushdown = false; + }; }; struct TGenericConfiguration: public TGenericSettings, public NCommon::TSettingDispatcher { using TPtr = TIntrusivePtr<TGenericConfiguration>; - TGenericConfiguration(){}; + TGenericConfiguration(); TGenericConfiguration(const TGenericConfiguration&) = delete; void Init(const NYql::TGenericGatewayConfig& gatewayConfig, |