aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-10-24 17:49:51 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-10-24 18:56:05 +0300
commit1ed4742ff7b185c61308ebfa67df74425bec4016 (patch)
treea60787bb6f25591fe092d2bedc81aaf3274ec60f
parent670e2deb259d1316f5faa7add5888e8ae4a079aa (diff)
downloadydb-1ed4742ff7b185c61308ebfa67df74425bec4016.tar.gz
YQ Connector:Pushdown of simple filters. First version with support of several predicates/operations
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp28
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h55
-rw-r--r--ydb/core/kqp/opt/physical/ya.make2
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp115
-rw-r--r--ydb/library/yql/providers/common/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto2
-rw-r--r--ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt27
-rw-r--r--ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt28
-rw-r--r--ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt28
-rw-r--r--ydb/library/yql/providers/common/pushdown/CMakeLists.txt17
-rw-r--r--ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt27
-rw-r--r--ydb/library/yql/providers/common/pushdown/collection.cpp (renamed from ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp)105
-rw-r--r--ydb/library/yql/providers/common/pushdown/collection.h14
-rw-r--r--ydb/library/yql/providers/common/pushdown/predicate_node.cpp56
-rw-r--r--ydb/library/yql/providers/common/pushdown/predicate_node.h34
-rw-r--r--ydb/library/yql/providers/common/pushdown/settings.cpp17
-rw-r--r--ydb/library/yql/providers/common/pushdown/settings.h26
-rw-r--r--ydb/library/yql/providers/common/pushdown/ya.make19
-rw-r--r--ydb/library/yql/providers/common/ya.make1
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go10
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go86
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make3
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/errors.go27
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/client.h3
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp11
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h268
-rw-r--r--ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json6
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp9
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp62
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp9
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp11
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp158
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp92
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h14
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp10
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h8
46 files changed, 1227 insertions, 185 deletions
diff --git a/.mapping.json b/.mapping.json
index b0c5ba1797..8ea27f5354 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -7426,6 +7426,11 @@
"ydb/library/yql/providers/common/provider/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/providers/common/provider/CMakeLists.txt":"",
"ydb/library/yql/providers/common/provider/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/providers/common/pushdown/CMakeLists.txt":"",
+ "ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/providers/common/schema/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/providers/common/schema/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/providers/common/schema/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt
index 852ad9e1d5..f78e4a6016 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt
@@ -19,12 +19,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
opt-physical-effects
yql-dq-common
yql-dq-opt
+ providers-common-pushdown
)
target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt
index 7204a81d07..cdcab0fa74 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt
@@ -20,12 +20,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
opt-physical-effects
yql-dq-common
yql-dq-opt
+ providers-common-pushdown
)
target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt
index 7204a81d07..cdcab0fa74 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt
@@ -20,12 +20,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
opt-physical-effects
yql-dq-common
yql-dq-opt
+ providers-common-pushdown
)
target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt
index 852ad9e1d5..f78e4a6016 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt
@@ -19,12 +19,12 @@ target_link_libraries(kqp-opt-physical PUBLIC
opt-physical-effects
yql-dq-common
yql-dq-opt
+ providers-common-pushdown
)
target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_agg.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
index 57090bfc1e..fc6bb36a28 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
@@ -1,10 +1,11 @@
#include "kqp_opt_phy_rules.h"
-#include "kqp_opt_phy_olap_filter_collection.h"
#include <ydb/core/formats/arrow/ssa_runtime_version.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/library/yql/core/extract_predicate/extract_predicate.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/providers/common/pushdown/collection.h>
+#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
#include <unordered_set>
@@ -23,6 +24,15 @@ static const std::unordered_set<std::string> SecondLevelFilters = {
"ends_with"
};
+struct TPushdownSettings : public NPushdown::TSettings {
+ TPushdownSettings() {
+ using EFlag = NPushdown::TSettings::EFeatureFlag;
+ Enable(EFlag::LikeOperator, NSsa::RuntimeVersion >= 2U);
+ Enable(EFlag::LikeOperatorOnlyForUtf8, NSsa::RuntimeVersion < 3U);
+ Enable(EFlag::JsonQueryOperators | EFlag::JsonExistsOperator, NSsa::RuntimeVersion >= 3U);
+ }
+};
+
struct TFilterOpsLevels {
TFilterOpsLevels(const TMaybeNode<TExprBase>& firstLevel, const TMaybeNode<TExprBase>& secondLevel)
: FirstLevelOps(firstLevel)
@@ -511,7 +521,7 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, TExprContext& ctx
return TFilterOpsLevels(ops, NullNode);
}
-void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode& predicatesToPush, TPredicateNode& remainingPredicates,
+void SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, NPushdown::TPredicateNode& predicatesToPush, NPushdown::TPredicateNode& remainingPredicates,
TExprContext& ctx, TPositionHandle pos)
{
if (predicateTree.CanBePushed) {
@@ -520,7 +530,7 @@ void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode
return;
}
- if (predicateTree.Op != EBoolOp::And) {
+ if (predicateTree.Op != NPushdown::EBoolOp::And) {
// We can partially pushdown predicates from AND operator only.
// For OR operator we would need to have several read operators which is not acceptable.
// TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2).
@@ -529,8 +539,8 @@ void SplitForPartialPushdown(const TPredicateNode& predicateTree, TPredicateNode
}
bool isFoundNotStrictOp = false;
- std::vector<TPredicateNode> pushable;
- std::vector<TPredicateNode> remaining;
+ std::vector<NPushdown::TPredicateNode> pushable;
+ std::vector<NPushdown::TPredicateNode> remaining;
for (auto& predicate : predicateTree.Children) {
if (predicate.CanBePushed && !isFoundNotStrictOp) {
pushable.emplace_back(predicate);
@@ -578,12 +588,12 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
}
auto optionalIf = maybeOptionalIf.Cast();
- TPredicateNode predicateTree(optionalIf.Predicate());
- CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg, read.Process().Body());
+ NPushdown::TPredicateNode predicateTree(optionalIf.Predicate());
+ CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg, read.Process().Body(), TPushdownSettings());
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
- TPredicateNode predicatesToPush;
- TPredicateNode remainingPredicates;
+ NPushdown::TPredicateNode predicatesToPush;
+ NPushdown::TPredicateNode remainingPredicates;
SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos());
if (!predicatesToPush.IsValid()) {
return node;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h
deleted file mode 100644
index cfec72ac53..0000000000
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#pragma once
-
-#include <ydb/core/kqp/common/kqp_yql.h>
-
-namespace NKikimr::NKqp::NOpt {
-
-enum class EBoolOp {
- Undefined = 0,
- And,
- Or,
- Xor,
- Not
-};
-
-struct TPredicateNode {
- TPredicateNode()
- : ExprNode(nullptr)
- , Op(EBoolOp::Undefined)
- , CanBePushed(false)
- {}
-
- TPredicateNode(NYql::TExprNode::TPtr nodePtr)
- : ExprNode(nodePtr)
- , Op(EBoolOp::Undefined)
- , CanBePushed(false)
- {}
-
- TPredicateNode(NYql::NNodes::TExprBase node)
- : ExprNode(node)
- , Op(EBoolOp::Undefined)
- , CanBePushed(false)
- {}
-
- TPredicateNode(const TPredicateNode& predNode)
- : ExprNode(predNode.ExprNode)
- , Children(predNode.Children)
- , Op(predNode.Op)
- , CanBePushed(predNode.CanBePushed)
- {}
-
- ~TPredicateNode() {}
-
- bool IsValid() const;
- void SetPredicates(const std::vector<TPredicateNode>& predicates, NYql::TExprContext& ctx, NYql::TPositionHandle pos);
-
- NYql::NNodes::TMaybeNode<NYql::NNodes::TExprBase> ExprNode;
- std::vector<TPredicateNode> Children;
- EBoolOp Op;
- bool CanBePushed;
-};
-
-void CollectPredicates(const NYql::NNodes::TExprBase& predicate, TPredicateNode& predicateTree,
- const NYql::TExprNode* lambdaArg, const NYql::NNodes::TExprBase& lambdaBody);
-
-} // namespace NKikimr::NKqp::NOpt \ No newline at end of file
diff --git a/ydb/core/kqp/opt/physical/ya.make b/ydb/core/kqp/opt/physical/ya.make
index ad82055d6e..07b0cdf0e0 100644
--- a/ydb/core/kqp/opt/physical/ya.make
+++ b/ydb/core/kqp/opt/physical/ya.make
@@ -4,7 +4,6 @@ SRCS(
kqp_opt_phy_build_stage.cpp
kqp_opt_phy_limit.cpp
kqp_opt_phy_olap_agg.cpp
- kqp_opt_phy_olap_filter_collection.cpp
kqp_opt_phy_olap_filter.cpp
kqp_opt_phy_precompute.cpp
kqp_opt_phy_sort.cpp
@@ -19,6 +18,7 @@ PEERDIR(
ydb/core/kqp/opt/physical/effects
ydb/library/yql/dq/common
ydb/library/yql/dq/opt
+ ydb/library/yql/providers/common/pushdown
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
index 9029a7477d..54ac7e9335 100644
--- a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
@@ -114,10 +114,9 @@ namespace NKikimr::NKqp {
const TString query = fmt::format(
R"(
- SELECT * FROM {data_source_name}.`{database_name}.{table_name}`;
+ SELECT * FROM {data_source_name}.{table_name};
)",
"data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
- "database_name"_a = DEFAULT_DATABASE,
"table_name"_a = DEFAULT_TABLE);
auto db = kikimr->GetQueryClient();
@@ -206,11 +205,10 @@ namespace NKikimr::NKqp {
const TString query = fmt::format(
R"(
- SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
- SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
+ SELECT 42 FROM {data_source_name}.{table_name};
+ SELECT 42 FROM {data_source_name}.{table_name};
)",
"data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
- "database_name"_a = DEFAULT_DATABASE,
"table_name"_a = DEFAULT_TABLE);
auto db = kikimr->GetQueryClient();
@@ -297,10 +295,9 @@ namespace NKikimr::NKqp {
const TString query = fmt::format(
R"(
- SELECT COUNT(*) FROM {data_source_name}.`{database_name}.{table_name}`;
+ SELECT COUNT(*) FROM {data_source_name}.{table_name};
)",
"data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
- "database_name"_a = DEFAULT_DATABASE,
"table_name"_a = DEFAULT_TABLE);
auto db = kikimr->GetQueryClient();
@@ -323,5 +320,109 @@ namespace NKikimr::NKqp {
Y_UNIT_TEST(ClickHouseSelectCount) {
TestSelectCount(EProviderType::ClickHouse);
}
+
+ void TestFilterPushdown(EProviderType providerType) {
+ // prepare mock
+ auto clientMock = std::make_shared<TConnectorClientMock>();
+
+ const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
+ // clang-format off
+ const NApi::TSelect select = TConnectorClientMock::TSelectBuilder<>()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ .NullableColumn("data_column", Ydb::Type::STRING)
+ .NullableColumn("filtered_column", Ydb::Type::INT32)
+ .Done()
+ .Where()
+ .Filter()
+ .Equal()
+ .Column("filtered_column")
+ .Value<i32>(42)
+ .Done()
+ .Done()
+ .Done()
+ .GetResult();
+ // clang-format on
+
+ // step 1: DescribeTable
+ // clang-format off
+ clientMock->ExpectDescribeTable()
+ .DataSourceInstance(dataSourceInstance)
+ .Response()
+ .NullableColumn("filtered_column", Ydb::Type::INT32)
+ .NullableColumn("data_column", Ydb::Type::STRING);
+ // clang-format on
+
+ // step 2: ListSplits
+ // clang-format off
+ clientMock->ExpectListSplits()
+ .Select(select)
+ .Result()
+ .AddResponse(NewSuccess())
+ .Description("some binary description")
+ .Select(select);
+ // clang-format on
+
+ // step 3: ReadSplits
+ // Return data such that it contains values not satisfying the filter conditions.
+ // Then check that, despite that connector reads additional data,
+ // our generic provider then filters it out.
+ std::vector<std::string> colData = {"Filtered text", "Text"};
+ std::vector<i32> filterColumnData = {42, 24};
+ // clang-format off
+ clientMock->ExpectReadSplits()
+ .DataSourceInstance(dataSourceInstance)
+ .Split()
+ .Description("some binary description")
+ .Select(select)
+ .Done()
+ .Result()
+ .AddResponse(MakeRecordBatch(
+ MakeArray<arrow::StringBuilder>("data_column", colData, arrow::utf8()),
+ MakeArray<arrow::Int32Builder>("filtered_column", filterColumnData, arrow::int32())),
+ NewSuccess());
+ // clang-format on
+
+ // prepare database resolver mock
+ std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock;
+ if (providerType == EProviderType::ClickHouse) {
+ databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>();
+ databaseAsyncResolverMock->AddClickHouseCluster();
+ }
+
+ // run test
+ auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock);
+
+ CreateExternalDataSource(providerType, kikimr);
+
+ const TString query = fmt::format(
+ R"(
+ PRAGMA generic.UsePredicatePushdown="true";
+ SELECT data_column FROM {data_source_name}.{table_name} WHERE filtered_column = 42;
+ )",
+ "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
+ "table_name"_a = DEFAULT_TABLE);
+
+ auto db = kikimr->GetQueryClient();
+ auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());
+
+ TResultSetParser resultSet(queryResult.GetResultSetParser(0));
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+
+ // check every row
+ // Check that, despite returning nonfiltered data in connector, response will be correct
+ std::vector<TMaybe<TString>> result = {"Filtered text"}; // Only data satisfying filter conditions
+ MATCH_RESULT_WITH_INPUT(result, resultSet, GetOptionalString);
+ }
+
+ Y_UNIT_TEST(PostgreSQLFilterPushdown) {
+ TestFilterPushdown(EProviderType::PostgreSQL);
+ }
+
+ Y_UNIT_TEST(ClickHouseFilterPushdown) {
+ TestFilterPushdown(EProviderType::ClickHouse);
+ }
}
}
diff --git a/ydb/library/yql/providers/common/CMakeLists.txt b/ydb/library/yql/providers/common/CMakeLists.txt
index b227fb9348..298141a06a 100644
--- a/ydb/library/yql/providers/common/CMakeLists.txt
+++ b/ydb/library/yql/providers/common/CMakeLists.txt
@@ -20,6 +20,7 @@ add_subdirectory(metrics)
add_subdirectory(mkql)
add_subdirectory(proto)
add_subdirectory(provider)
+add_subdirectory(pushdown)
add_subdirectory(schema)
add_subdirectory(structured_token)
add_subdirectory(token_accessor)
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index a785aeae29..44294909f4 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -610,6 +610,8 @@ message TGenericGatewayConfig {
// MDB API endpoint (do not fill in case of on-prem deployment)
optional string MdbGateway = 4;
+ repeated TAttr DefaultSettings = 6;
+
reserved 1, 2;
}
diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..e1a6b69143
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,27 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-common-pushdown)
+target_compile_options(providers-common-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-pushdown PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-ast
+ library-yql-core
+ yql-core-expr_nodes
+ yql-core-expr_nodes_gen
+ yql-utils-log
+)
+target_sources(providers-common-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp
+)
diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..f1022891b5
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-common-pushdown)
+target_compile_options(providers-common-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-pushdown PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-ast
+ library-yql-core
+ yql-core-expr_nodes
+ yql-core-expr_nodes_gen
+ yql-utils-log
+)
+target_sources(providers-common-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp
+)
diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..f1022891b5
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-common-pushdown)
+target_compile_options(providers-common-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-pushdown PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-ast
+ library-yql-core
+ yql-core-expr_nodes
+ yql-core-expr_nodes_gen
+ yql-utils-log
+)
+target_sources(providers-common-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp
+)
diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..e1a6b69143
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,27 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-common-pushdown)
+target_compile_options(providers-common-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-pushdown PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-yql-ast
+ library-yql-core
+ yql-core-expr_nodes
+ yql-core-expr_nodes_gen
+ yql-utils-log
+)
+target_sources(providers-common-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/collection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/predicate_node.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/pushdown/settings.cpp
+)
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp b/ydb/library/yql/providers/common/pushdown/collection.cpp
index 826afbe7d4..253ff2cc32 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter_collection.cpp
+++ b/ydb/library/yql/providers/common/pushdown/collection.cpp
@@ -1,15 +1,13 @@
-#include "kqp_opt_phy_olap_filter_collection.h"
+#include "collection.h"
-#include <ydb/core/formats/arrow/ssa_runtime_version.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/utils/log/log.h>
#include <vector>
-namespace NKikimr::NKqp::NOpt {
+namespace NYql::NPushdown {
-using namespace NYql;
-using namespace NYql::NNodes;
+using namespace NNodes;
namespace {
@@ -32,7 +30,7 @@ bool IsLikeOperator(const TCoCompare& predicate) {
|| predicate.Maybe<TCoCmpEndsWith>();
}
-bool IsSupportedLike(const TExprBase& left, const TExprBase& right) {
+bool IsSupportedLikeForUtf8(const TExprBase& left, const TExprBase& right) {
if ((left.Maybe<TCoMember>() && ExprHasUtf8Type(left))
|| (right.Maybe<TCoMember>() && ExprHasUtf8Type(right)))
{
@@ -41,7 +39,7 @@ bool IsSupportedLike(const TExprBase& left, const TExprBase& right) {
return false;
}
-bool IsSupportedPredicate(const TCoCompare& predicate) {
+bool IsSupportedPredicate(const TCoCompare& predicate, const TSettings& settings) {
if (predicate.Maybe<TCoCmpEqual>()) {
return true;
} else if (predicate.Maybe<TCoCmpLess>()) {
@@ -56,7 +54,7 @@ bool IsSupportedPredicate(const TCoCompare& predicate) {
return true;
} else if (predicate.Maybe<TCoCmpLessOrEqual>()) {
return true;
- } else if (NKikimr::NSsa::RuntimeVersion >= 2U && IsLikeOperator(predicate)) {
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::LikeOperator) && IsLikeOperator(predicate)) {
return true;
}
@@ -241,7 +239,7 @@ std::vector<TExprBase> GetComparisonNodes(const TExprBase& node) {
return res;
}
-bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lambdaArg) {
+bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lambdaArg, const TSettings& settings) {
if (auto maybeSafeCast = node.Maybe<TCoSafeCast>()) {
if (!IsSupportedCast(maybeSafeCast.Cast())) {
return false;
@@ -254,7 +252,7 @@ bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lamb
if (maybeMember.Cast().Struct().Raw() != lambdaArg) {
return false;
}
- } else if (NKikimr::NSsa::RuntimeVersion >= 3U && node.Maybe<TCoJsonQueryBase>()) {
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonQueryOperators) && node.Maybe<TCoJsonQueryBase>()) {
if (!node.Maybe<TCoJsonValue>()) {
return false;
}
@@ -270,7 +268,7 @@ bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lamb
return true;
}
-bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& input) {
+bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& input, const TSettings& settings) {
const TTypeAnnotationNode* inputType = input.Ptr()->GetTypeAnn();
switch (inputType->GetKind()) {
case ETypeAnnotationKind::Flow:
@@ -279,6 +277,8 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr
case ETypeAnnotationKind::Stream:
inputType = inputType->Cast<TStreamExprType>()->GetItemType();
break;
+ case ETypeAnnotationKind::Struct:
+ break;
default:
YQL_ENSURE(false, "Unsupported type of incoming data: " << (ui32)inputType->GetKind());
// We do not know how process input that is not a sequence of elements
@@ -297,14 +297,14 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr
YQL_ENSURE(leftList.size() == rightList.size(), "Different sizes of lists in comparison!");
for (size_t i = 0; i < leftList.size(); ++i) {
- if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg)) {
+ if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg, settings)) {
return false;
}
if (!IsComparableTypes(leftList[i], rightList[i], equality, inputType)) {
return false;
}
- if (IsLikeOperator(compare) && NKikimr::NSsa::RuntimeVersion < 3U && !IsSupportedLike(leftList[i], rightList[i])) {
- // In SSA_RUNTIME_VERSION == 2 Column Shard doesn't have LIKE kernel for binary strings
+ if (IsLikeOperator(compare) && settings.IsEnabled(TSettings::EFeatureFlag::LikeOperatorOnlyForUtf8) && !IsSupportedLikeForUtf8(leftList[i], rightList[i])) {
+ // (KQP OLAP) If SSA_RUNTIME_VERSION == 2 Column Shard doesn't have LIKE kernel for binary strings
return false;
}
}
@@ -312,19 +312,19 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr
return true;
}
-bool CompareCanBePushed(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
- if (!IsSupportedPredicate(compare)) {
+bool CompareCanBePushed(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
+ if (!IsSupportedPredicate(compare, settings)) {
return false;
}
- if (!CheckComparisonParametersForPushdown(compare, lambdaArg, lambdaBody)) {
+ if (!CheckComparisonParametersForPushdown(compare, lambdaArg, lambdaBody, settings)) {
return false;
}
return true;
}
-bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg) {
+bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg, const TSettings& settings) {
/*
* There are three ways of comparison in following format:
*
@@ -345,7 +345,7 @@ bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg)
YQL_ENSURE(leftList.size() == rightList.size(), "Different sizes of lists in comparison!");
for (size_t i = 0; i < leftList.size(); ++i) {
- if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg)) {
+ if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg, settings)) {
return false;
}
}
@@ -361,7 +361,7 @@ bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg)
}
auto predicate = maybePredicate.Cast();
- if (!IsSupportedPredicate(predicate)) {
+ if (!IsSupportedPredicate(predicate, settings)) {
return false;
}
@@ -380,17 +380,17 @@ bool JsonExistsCanBePushed(const TCoJsonExists& jsonExists, const TExprNode* lam
return true;
}
-bool CoalesceCanBePushed(const TCoCoalesce& coalesce, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
+bool CoalesceCanBePushed(const TCoCoalesce& coalesce, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
if (!coalesce.Value().Maybe<TCoBool>()) {
return false;
}
auto predicate = coalesce.Predicate();
if (auto maybeCompare = predicate.Maybe<TCoCompare>()) {
- return CompareCanBePushed(maybeCompare.Cast(), lambdaArg, lambdaBody);
+ return CompareCanBePushed(maybeCompare.Cast(), lambdaArg, lambdaBody, settings);
} else if (auto maybeFlatmap = predicate.Maybe<TCoFlatMap>()) {
- return SafeCastCanBePushed(maybeFlatmap.Cast(), lambdaArg);
- } else if (NKikimr::NSsa::RuntimeVersion >= 3U && predicate.Maybe<TCoJsonExists>()) {
+ return SafeCastCanBePushed(maybeFlatmap.Cast(), lambdaArg, settings);
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonExistsOperator) && predicate.Maybe<TCoJsonExists>()) {
auto jsonExists = predicate.Cast<TCoJsonExists>();
return JsonExistsCanBePushed(jsonExists, lambdaArg);
}
@@ -409,7 +409,7 @@ bool ExistsCanBePushed(const TCoExists& exists, const TExprNode* lambdaArg) {
return true;
}
-void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
+void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
if (!opNode.Maybe<TCoAnd>() && !opNode.Maybe<TCoOr>() && !opNode.Maybe<TCoOr>()) {
return;
}
@@ -417,7 +417,7 @@ void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicate
predicateTree.CanBePushed = true;
for (auto& childNodePtr: opNode.Ptr()->Children()) {
TPredicateNode child(childNodePtr);
- CollectPredicates(TExprBase(childNodePtr), child, lambdaArg, lambdaBody);
+ CollectPredicates(TExprBase(childNodePtr), child, lambdaArg, lambdaBody, settings);
predicateTree.Children.emplace_back(child);
predicateTree.CanBePushed &= child.CanBePushed;
}
@@ -425,48 +425,13 @@ void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicate
} // anonymous namespace end
-bool TPredicateNode::IsValid() const {
- bool res = true;
- if (Op != EBoolOp::Undefined) {
- res &= !Children.empty();
- for (auto& child : Children) {
- res &= child.IsValid();
- }
- }
-
- return res && ExprNode.IsValid();
-}
-
-void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
- auto predicatesSize = predicates.size();
- if (predicatesSize == 0) {
- return;
- } else if (predicatesSize == 1) {
- *this = predicates[0];
- } else {
- Op = EBoolOp::And;
- Children = predicates;
- CanBePushed = true;
-
- TVector<TExprBase> exprNodes;
- exprNodes.reserve(predicatesSize);
- for (auto& pred : predicates) {
- exprNodes.emplace_back(pred.ExprNode.Cast());
- CanBePushed &= pred.CanBePushed;
- }
- ExprNode = Build<TCoAnd>(ctx, pos)
- .Add(exprNodes)
- .Done();
- }
-}
-
-void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
+void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
if (predicate.Maybe<TCoCoalesce>()) {
auto coalesce = predicate.Cast<TCoCoalesce>();
- predicateTree.CanBePushed = CoalesceCanBePushed(coalesce, lambdaArg, lambdaBody);
+ predicateTree.CanBePushed = CoalesceCanBePushed(coalesce, lambdaArg, lambdaBody, settings);
} else if (predicate.Maybe<TCoCompare>()) {
auto compare = predicate.Cast<TCoCompare>();
- predicateTree.CanBePushed = CompareCanBePushed(compare, lambdaArg, lambdaBody);
+ predicateTree.CanBePushed = CompareCanBePushed(compare, lambdaArg, lambdaBody, settings);
} else if (predicate.Maybe<TCoExists>()) {
auto exists = predicate.Cast<TCoExists>();
predicateTree.CanBePushed = ExistsCanBePushed(exists, lambdaArg);
@@ -474,19 +439,19 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree
predicateTree.Op = EBoolOp::Not;
auto notOp = predicate.Cast<TCoNot>();
TPredicateNode child(notOp.Value());
- CollectPredicates(notOp.Value(), child, lambdaArg, lambdaBody);
+ CollectPredicates(notOp.Value(), child, lambdaArg, lambdaBody, settings);
predicateTree.CanBePushed = child.CanBePushed;
predicateTree.Children.emplace_back(child);
} else if (predicate.Maybe<TCoAnd>()) {
predicateTree.Op = EBoolOp::And;
- CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoAnd>(), predicateTree, lambdaArg, lambdaBody);
+ CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoAnd>(), predicateTree, lambdaArg, lambdaBody, settings);
} else if (predicate.Maybe<TCoOr>()) {
predicateTree.Op = EBoolOp::Or;
- CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoOr>(), predicateTree, lambdaArg, lambdaBody);
+ CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoOr>(), predicateTree, lambdaArg, lambdaBody, settings);
} else if (predicate.Maybe<TCoXor>()) {
predicateTree.Op = EBoolOp::Xor;
- CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoXor>(), predicateTree, lambdaArg, lambdaBody);
- } else if (NKikimr::NSsa::RuntimeVersion >= 3U && predicate.Maybe<TCoJsonExists>()) {
+ CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoXor>(), predicateTree, lambdaArg, lambdaBody, settings);
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonExistsOperator) && predicate.Maybe<TCoJsonExists>()) {
auto jsonExists = predicate.Cast<TCoJsonExists>();
predicateTree.CanBePushed = JsonExistsCanBePushed(jsonExists, lambdaArg);
} else {
@@ -494,4 +459,4 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree
}
}
-} // namespace NKikimr::NKqp::NOpt \ No newline at end of file
+} // namespace NYql::NPushdown
diff --git a/ydb/library/yql/providers/common/pushdown/collection.h b/ydb/library/yql/providers/common/pushdown/collection.h
new file mode 100644
index 0000000000..05373efc65
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/collection.h
@@ -0,0 +1,14 @@
+#pragma once
+#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
+#include <ydb/library/yql/providers/common/pushdown/settings.h>
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
+
+namespace NYql::NPushdown {
+
+void CollectPredicates(const NNodes::TExprBase& predicate, TPredicateNode& predicateTree,
+ const TExprNode* lambdaArg, const NNodes::TExprBase& lambdaBody,
+ const TSettings& settings);
+
+} // namespace NYql::NPushdown
diff --git a/ydb/library/yql/providers/common/pushdown/predicate_node.cpp b/ydb/library/yql/providers/common/pushdown/predicate_node.cpp
new file mode 100644
index 0000000000..68bd089b81
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/predicate_node.cpp
@@ -0,0 +1,56 @@
+#include "predicate_node.h"
+
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+
+namespace NYql::NPushdown {
+
+TPredicateNode::TPredicateNode() = default;
+
+TPredicateNode::TPredicateNode(const TExprNode::TPtr& nodePtr)
+ : ExprNode(nodePtr)
+{}
+
+TPredicateNode::TPredicateNode(const NNodes::TExprBase& node)
+ : ExprNode(node)
+{}
+
+TPredicateNode::TPredicateNode(const TPredicateNode& predNode) = default;
+
+TPredicateNode::~TPredicateNode() = default;
+
+bool TPredicateNode::IsValid() const {
+ bool res = true;
+ if (Op != EBoolOp::Undefined) {
+ res &= !Children.empty();
+ for (auto& child : Children) {
+ res &= child.IsValid();
+ }
+ }
+
+ return res && ExprNode.IsValid();
+}
+
+void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
+ auto predicatesSize = predicates.size();
+ if (predicatesSize == 0) {
+ return;
+ } else if (predicatesSize == 1) {
+ *this = predicates[0];
+ } else {
+ Op = EBoolOp::And;
+ Children = predicates;
+ CanBePushed = true;
+
+ TVector<NNodes::TExprBase> exprNodes;
+ exprNodes.reserve(predicatesSize);
+ for (auto& pred : predicates) {
+ exprNodes.emplace_back(pred.ExprNode.Cast());
+ CanBePushed &= pred.CanBePushed;
+ }
+ ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
+ .Add(exprNodes)
+ .Done();
+ }
+}
+
+} // namespace NYql::NPushdown
diff --git a/ydb/library/yql/providers/common/pushdown/predicate_node.h b/ydb/library/yql/providers/common/pushdown/predicate_node.h
new file mode 100644
index 0000000000..06a167b60b
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/predicate_node.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/ast/yql_pos_handle.h>
+#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
+
+namespace NYql::NPushdown {
+
+enum class EBoolOp {
+ Undefined = 0,
+ And,
+ Or,
+ Xor,
+ Not
+};
+
+struct TPredicateNode {
+ TPredicateNode();
+ TPredicateNode(const TExprNode::TPtr& nodePtr);
+ TPredicateNode(const NNodes::TExprBase& node);
+ TPredicateNode(const TPredicateNode& predNode);
+
+ ~TPredicateNode();
+
+ bool IsValid() const;
+ void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos);
+
+ NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;
+ std::vector<TPredicateNode> Children;
+ EBoolOp Op = EBoolOp::Undefined;
+ bool CanBePushed = false;
+};
+
+} // namespace NYql::NPushdown
diff --git a/ydb/library/yql/providers/common/pushdown/settings.cpp b/ydb/library/yql/providers/common/pushdown/settings.cpp
new file mode 100644
index 0000000000..7917740ddb
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/settings.cpp
@@ -0,0 +1,17 @@
+#include "settings.h"
+
+namespace NYql::NPushdown {
+
+void TSettings::Enable(ui64 flagsMask, bool set) {
+ if (set) {
+ FeatureFlags |= flagsMask;
+ } else {
+ FeatureFlags &= ~flagsMask;
+ }
+}
+
+bool TSettings::IsEnabled(EFeatureFlag flagMask) const {
+ return (FeatureFlags & flagMask) != 0;
+}
+
+} // namespace NYql::NPushdown
diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h
new file mode 100644
index 0000000000..c53e1bf9fe
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/settings.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <util/system/types.h>
+
+namespace NYql::NPushdown {
+
+struct TSettings {
+ enum EFeatureFlag : ui64 {
+ LikeOperator = 1,
+ LikeOperatorOnlyForUtf8 = 1 << 1,
+ JsonQueryOperators = 1 << 2,
+ JsonExistsOperator = 1 << 3,
+ };
+
+ TSettings() = default;
+ TSettings(const TSettings&) = default;
+
+ void Enable(ui64 flagsMask, bool set = true);
+
+ bool IsEnabled(EFeatureFlag flagMask) const;
+
+private:
+ ui64 FeatureFlags = 0;
+};
+
+} // namespace NYql::NPushdown
diff --git a/ydb/library/yql/providers/common/pushdown/ya.make b/ydb/library/yql/providers/common/pushdown/ya.make
new file mode 100644
index 0000000000..f488c383a9
--- /dev/null
+++ b/ydb/library/yql/providers/common/pushdown/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+SRCS(
+ collection.cpp
+ predicate_node.cpp
+ settings.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/ast
+ ydb/library/yql/core
+ ydb/library/yql/core/expr_nodes
+ ydb/library/yql/core/expr_nodes_gen
+ ydb/library/yql/utils/log
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/common/ya.make b/ydb/library/yql/providers/common/ya.make
index 7bef7eaedd..be609dbd0d 100644
--- a/ydb/library/yql/providers/common/ya.make
+++ b/ydb/library/yql/providers/common/ya.make
@@ -14,6 +14,7 @@ RECURSE(
proto
proto/python
provider
+ pushdown
schema
structured_token
token_accessor
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
index 3a5e81102d..33e63b6e2e 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
@@ -157,6 +157,16 @@ func (h *handlerImpl[CONN]) ReadSplit(
sb.WriteString(" FROM ")
sb.WriteString(tableName)
+ if split.Select.Where != nil {
+ statement, err := FormatWhereStatement(split.Select.Where)
+ if err != nil {
+ logger.Error("Failed to format WHERE statement", log.Error(err), log.String("where", split.Select.Where.String()))
+ } else {
+ sb.WriteString(" ")
+ sb.WriteString(statement)
+ }
+ }
+
// execute query
query := sb.String()
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go
new file mode 100644
index 0000000000..876c0025a6
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/predicate_builder.go
@@ -0,0 +1,86 @@
+package rdbms
+
+import (
+ "fmt"
+
+ "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+func FormatValue(value *Ydb.TypedValue) (string, error) {
+ switch v := value.Value.Value.(type) {
+ case *Ydb.Value_BoolValue:
+ return fmt.Sprintf("%t", v.BoolValue), nil
+ case *Ydb.Value_Int32Value:
+ return fmt.Sprintf("%d", v.Int32Value), nil
+ case *Ydb.Value_Uint32Value:
+ return fmt.Sprintf("%d", v.Uint32Value), nil
+ default:
+ return "", fmt.Errorf("%w, type: %T", utils.ErrUnimplementedTypedValue, v)
+ }
+}
+
+func FormatExpression(expression *api_service_protos.TExpression) (string, error) {
+ switch e := expression.Payload.(type) {
+ case *api_service_protos.TExpression_Column:
+ return e.Column, nil
+ case *api_service_protos.TExpression_TypedValue:
+ return FormatValue(e.TypedValue)
+ default:
+ return "", fmt.Errorf("%w, type: %T", utils.ErrUnimplementedExpression, e)
+ }
+}
+
+func FormatComparison(comparison *api_service_protos.TPredicate_TComparison) (string, error) {
+ var operation string
+
+ switch op := comparison.Operation; op {
+ case api_service_protos.TPredicate_TComparison_EQ:
+ operation = " = "
+ default:
+ return "", fmt.Errorf("%w, op: %d", utils.ErrUnimplementedOperation, op)
+ }
+
+ var (
+ left string
+ right string
+ err error
+ )
+
+ left, err = FormatExpression(comparison.LeftValue)
+ if err != nil {
+ return "", fmt.Errorf("failed to format left argument: %w", err)
+ }
+
+ right, err = FormatExpression(comparison.RightValue)
+ if err != nil {
+ return "", fmt.Errorf("failed to format right argument: %w", err)
+ }
+
+ return fmt.Sprintf("(%s%s%s)", left, operation, right), nil
+}
+
+func FormatPredicate(predicate *api_service_protos.TPredicate) (string, error) {
+ switch p := predicate.Payload.(type) {
+ case *api_service_protos.TPredicate_Comparison:
+ return FormatComparison(p.Comparison)
+ default:
+ return "", fmt.Errorf("%w, type: %T", utils.ErrUnimplementedPredicateType, p)
+ }
+}
+
+func FormatWhereStatement(where *api_service_protos.TSelect_TWhere) (string, error) {
+ if where.FilterTyped == nil {
+ return "", utils.ErrUnimplemented
+ }
+
+ formatted, err := FormatPredicate(where.FilterTyped)
+ if err != nil {
+ return "", err
+ }
+
+ result := "WHERE " + formatted
+
+ return result, nil
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make b/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make
index e9a5b03e24..5bc3bfc6ac 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/ya.make
@@ -4,6 +4,7 @@ SRCS(
doc.go
handler.go
handler_factory.go
+ predicate_builder.go
schema_builder.go
)
@@ -13,4 +14,4 @@ GO_TEST_SRCS(
END()
-RECURSE_FOR_TESTS(ut) \ No newline at end of file
+RECURSE_FOR_TESTS(ut)
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go
index a9093be650..08d114ebaf 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/errors.go
@@ -10,12 +10,17 @@ import (
)
var (
- ErrTableDoesNotExist = fmt.Errorf("table does not exist")
- ErrDataSourceNotSupported = fmt.Errorf("data source not supported")
- ErrDataTypeNotSupported = fmt.Errorf("data type not supported")
- ErrReadLimitExceeded = fmt.Errorf("read limit exceeded")
- ErrInvalidRequest = fmt.Errorf("invalid request")
- ErrValueOutOfTypeBounds = fmt.Errorf("value is out of possible range of values for the type")
+ ErrTableDoesNotExist = fmt.Errorf("table does not exist")
+ ErrDataSourceNotSupported = fmt.Errorf("data source not supported")
+ ErrDataTypeNotSupported = fmt.Errorf("data type not supported")
+ ErrReadLimitExceeded = fmt.Errorf("read limit exceeded")
+ ErrInvalidRequest = fmt.Errorf("invalid request")
+ ErrValueOutOfTypeBounds = fmt.Errorf("value is out of possible range of values for the type")
+ ErrUnimplemented = fmt.Errorf("unimplemented")
+ ErrUnimplementedTypedValue = fmt.Errorf("unimplemented typed value")
+ ErrUnimplementedExpression = fmt.Errorf("unimplemented expression")
+ ErrUnimplementedOperation = fmt.Errorf("unimplemented operation")
+ ErrUnimplementedPredicateType = fmt.Errorf("unimplemented predicate type")
)
func NewSuccess() *api_service_protos.TError {
@@ -50,6 +55,16 @@ func NewAPIErrorFromStdError(err error) *api_service_protos.TError {
status = Ydb.StatusIds_UNSUPPORTED
case errors.Is(err, ErrValueOutOfTypeBounds):
status = Ydb.StatusIds_UNSUPPORTED
+ case errors.Is(err, ErrUnimplementedTypedValue):
+ status = Ydb.StatusIds_UNSUPPORTED
+ case errors.Is(err, ErrUnimplementedExpression):
+ status = Ydb.StatusIds_UNSUPPORTED
+ case errors.Is(err, ErrUnimplementedOperation):
+ status = Ydb.StatusIds_UNSUPPORTED
+ case errors.Is(err, ErrUnimplementedPredicateType):
+ status = Ydb.StatusIds_UNSUPPORTED
+ case errors.Is(err, ErrUnimplemented):
+ status = Ydb.StatusIds_UNSUPPORTED
default:
status = Ydb.StatusIds_INTERNAL_ERROR
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h
index 0dc5536695..963964efbc 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/client.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h
@@ -94,8 +94,7 @@ namespace NYql::NConnector {
virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request) = 0;
virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request) = 0;
virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request) = 0;
- virtual ~IClient() {
- }
+ virtual ~IClient() = default;
};
IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg);
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
index 326b438e52..172194397a 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
@@ -6,6 +6,17 @@ namespace NYql::NConnector::NTest {
using namespace fmt::literals;
+#define DEFINE_SIMPLE_TYPE_SETTER(T, primitiveTypeId, value_name) \
+ template <> \
+ void SetSimpleValue(const T& value, Ydb::TypedValue* proto) { \
+ proto->mutable_type()->set_type_id(::Ydb::Type::primitiveTypeId); \
+ proto->mutable_value()->Y_CAT(set_, value_name)(value); \
+ }
+
+ DEFINE_SIMPLE_TYPE_SETTER(bool, BOOL, bool_value);
+ DEFINE_SIMPLE_TYPE_SETTER(i32, INT32, int32_value);
+ DEFINE_SIMPLE_TYPE_SETTER(ui32, UINT32, uint32_value);
+
void CreatePostgreSQLExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
const TString& dataSourceName,
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
index 61cb85dc90..19e9fa06fb 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
@@ -18,6 +18,7 @@
#include <google/protobuf/util/message_differencer.h>
#include <memory>
+#include <type_traits>
namespace NYql::NConnector::NTest {
using namespace testing;
@@ -67,23 +68,54 @@ namespace NYql::NConnector::NTest {
} \
}
+ // Make arrow array for one column.
+ // Returns field for schema and array with data.
+ // Designed for call with MakeRecordBatch function.
template <class TArrowBuilderType, class TColDataType>
- std::shared_ptr<arrow::RecordBatch> MakeRecordBatch(
- const TString& columnName,
- const std::vector<TColDataType>& input,
- std::shared_ptr<arrow::DataType> dataType) {
+ std::tuple<std::shared_ptr<arrow::Field>, std::shared_ptr<arrow::Array>>
+ MakeArray(const TString& columnName,
+ const std::vector<TColDataType>& input,
+ std::shared_ptr<arrow::DataType> dataType) {
TArrowBuilderType builder;
UNIT_ASSERT_EQUAL(builder.AppendValues(input), arrow::Status::OK());
std::shared_ptr<arrow::Array> columnData;
UNIT_ASSERT_EQUAL(builder.Finish(&columnData), arrow::Status::OK());
auto field = arrow::field(columnName, std::move(dataType));
+ return {std::move(field), std::move(columnData)};
+ }
+
+ // Make record batch with the only column.
+ template <class TArrowBuilderType, class TColDataType>
+ std::shared_ptr<arrow::RecordBatch> MakeRecordBatch(
+ const TString& columnName,
+ const std::vector<TColDataType>& input,
+ std::shared_ptr<arrow::DataType> dataType) {
+ auto [field, columnData] = MakeArray<TArrowBuilderType, TColDataType>(columnName, input, dataType);
auto schema = arrow::schema({field});
return arrow::RecordBatch::Make(schema, columnData->length(), {columnData});
}
+ template <class T>
+ concept TFieldAndArray = std::is_same_v<T, std::tuple<std::shared_ptr<arrow::Field>, std::shared_ptr<arrow::Array>>>;
+
+ // Make record batch from several results of MakeArray calls with different params.
+ template <TFieldAndArray... TArrayFieldTuple>
+ std::shared_ptr<arrow::RecordBatch> MakeRecordBatch(const TArrayFieldTuple&... fields) {
+ auto schema = arrow::schema({std::get<0>(fields)...});
+ std::vector<std::shared_ptr<arrow::Array>> data = {std::get<1>(fields)...};
+ UNIT_ASSERT(data.size());
+ for (const auto& d : data) {
+ UNIT_ASSERT_VALUES_EQUAL(d->length(), data[0]->length());
+ }
+ return arrow::RecordBatch::Make(schema, data[0]->length(), std::move(data));
+ }
+
// Make record batch with schema with no columns
std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount);
+ template <class T>
+ void SetSimpleValue(const T& value, Ydb::TypedValue* proto);
+
template <class TParent>
struct TWithParentBuilder {
explicit TWithParentBuilder(TParent* parent)
@@ -170,9 +202,9 @@ namespace NYql::NConnector::NTest {
class TConnectorClientMock: public NYql::NConnector::IClient {
public:
- MOCK_METHOD(TDescribeTableAsyncResult, DescribeTable, (const NApi::TDescribeTableRequest& request), (override));
- MOCK_METHOD(TIteratorAsyncResult<IListSplitsStreamIterator>, ListSplits, (const NApi::TListSplitsRequest& request), (override));
- MOCK_METHOD(TIteratorAsyncResult<IReadSplitsStreamIterator>, ReadSplits, (const NApi::TReadSplitsRequest& request), (override));
+ MOCK_METHOD(TResult<NApi::TDescribeTableResponse>, DescribeTableImpl, (const NApi::TDescribeTableRequest& request));
+ MOCK_METHOD(TIteratorResult<IListSplitsStreamIterator>, ListSplitsImpl, (const NApi::TListSplitsRequest& request));
+ MOCK_METHOD(TIteratorResult<IReadSplitsStreamIterator>, ReadSplitsImpl, (const NApi::TReadSplitsRequest& request));
//
// Expectation helpers
@@ -260,7 +292,7 @@ namespace NYql::NConnector::NTest {
TBuilder& Status(const Ydb::StatusIds_StatusCode value) {
this->Result_->mutable_error()->set_status(value);
return static_cast<TBuilder&>(*this);
- };
+ }
// TODO: add nonprimitive types
TBuilder& Column(const TString& name, Ydb::Type::PrimitiveTypeId typeId) {
@@ -270,6 +302,13 @@ namespace NYql::NConnector::NTest {
return *this;
}
+ TBuilder& NullableColumn(const TString& name, Ydb::Type::PrimitiveTypeId typeId) {
+ auto* col = this->Result_->mutable_schema()->add_columns();
+ col->set_name(name);
+ col->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(typeId);
+ return *this;
+ }
+
void FillWithDefaults() {
Status(Ydb::StatusIds::SUCCESS);
}
@@ -308,10 +347,11 @@ namespace NYql::NConnector::NTest {
private:
void SetExpectation() {
- auto future = NThreading::MakeFuture<TResult<NApi::TDescribeTableResponse>>({NGrpc::TGrpcStatus(), *ResponseResult_});
-
- EXPECT_CALL(*Mock_, DescribeTable(ProtobufRequestMatcher(*Result_)))
- .WillOnce(Return(future));
+ EXPECT_CALL(*Mock_, DescribeTableImpl(ProtobufRequestMatcher(*Result_)))
+ .WillOnce(Return(
+ TResult<NApi::TDescribeTableResponse>(
+ {NGrpc::TGrpcStatus(),
+ *ResponseResult_})));
}
private:
@@ -320,6 +360,140 @@ namespace NYql::NConnector::NTest {
};
template <class TParent = void /* no parent by default */>
+ struct TExpressionBuilder: public TProtoBuilder<TParent, NApi::TExpression> {
+ using TBuilder = TExpressionBuilder<TParent>;
+
+ TExpressionBuilder(NApi::TExpression* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TExpression>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ SETTER(Column, column);
+
+ template <class T>
+ TBuilder& Value(const T& value) {
+ SetSimpleValue(value, this->Result_->mutable_typed_value());
+ return *this;
+ }
+
+ void FillWithDefaults() {
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TComparisonBaseBuilder: public TProtoBuilder<TParent, NApi::TPredicate::TComparison> {
+ using TBuilder = TComparisonBaseBuilder<TParent>;
+
+ TComparisonBaseBuilder(NApi::TPredicate::TComparison* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TPredicate::TComparison>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ TBuilder& Arg(const NApi::TExpression& expr) {
+ MutableArg()->CopyFrom(expr);
+ return *this;
+ }
+ TExpressionBuilder<TBuilder> Arg() {
+ return TExpressionBuilder<TBuilder>(MutableArg(), this);
+ }
+
+ TBuilder& Column(const TString& name) {
+ return Arg().Column(name).Done();
+ }
+
+ TBuilder& Value(const auto& value) {
+ return Arg().Value(value).Done();
+ }
+
+ void FillWithDefaults() {
+ }
+
+ protected:
+ SETTER(Operation, operation);
+
+ private:
+ NApi::TExpression* MutableArg() {
+ if (!this->Result_->has_left_value()) {
+ return this->Result_->mutable_left_value();
+ }
+ UNIT_ASSERT(!this->Result_->has_right_value());
+ return this->Result_->mutable_right_value();
+ }
+ };
+
+ template <NApi::TPredicate::TComparison::EOperation operation, class TParent = void /* no parent by default */>
+ struct TComparisonBuilder: public TComparisonBaseBuilder<TParent> {
+ using TBuilder = TComparisonBuilder<operation, TParent>;
+
+ TComparisonBuilder(NApi::TPredicate::TComparison* result = nullptr, TParent* parent = nullptr)
+ : TComparisonBaseBuilder<TParent>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ void FillWithDefaults() {
+ this->Operation(operation);
+ }
+ };
+
+ template <class TParent>
+ using TEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::EQ, TParent>;
+
+ template <class TParent>
+ using TNotEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::NE, TParent>;
+
+ template <class TParent>
+ using TLessBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::L, TParent>;
+
+ template <class TParent>
+ using TLessOrEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::LE, TParent>;
+
+ template <class TParent>
+ using TGreaterBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::G, TParent>;
+
+ template <class TParent>
+ using TGreaterOrEqualBuilder = TComparisonBuilder<NApi::TPredicate::TComparison::GE, TParent>;
+
+ template <class TParent = void /* no parent by default */>
+ struct TPredicateBuilder: public TProtoBuilder<TParent, NApi::TPredicate> {
+ using TBuilder = TPredicateBuilder<TParent>;
+
+ explicit TPredicateBuilder(NApi::TPredicate* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TPredicate>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ SUBPROTO_BUILDER(Equal, mutable_comparison, NApi::TPredicate::TComparison, TEqualBuilder<TBuilder>);
+ SUBPROTO_BUILDER(NotEqual, mutable_comparison, NApi::TPredicate::TComparison, TNotEqualBuilder<TBuilder>);
+ SUBPROTO_BUILDER(Less, mutable_comparison, NApi::TPredicate::TComparison, TLessBuilder<TBuilder>);
+ SUBPROTO_BUILDER(LessOrEqual, mutable_comparison, NApi::TPredicate::TComparison, TLessOrEqualBuilder<TBuilder>);
+ SUBPROTO_BUILDER(Greater, mutable_comparison, NApi::TPredicate::TComparison, TGreaterBuilder<TBuilder>);
+ SUBPROTO_BUILDER(GreaterOrEqual, mutable_comparison, NApi::TPredicate::TComparison, TGreaterOrEqualBuilder<TBuilder>);
+
+ void FillWithDefaults() {
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TWhereBuilder: public TProtoBuilder<TParent, NApi::TSelect::TWhere> {
+ using TBuilder = TWhereBuilder<TParent>;
+
+ explicit TWhereBuilder(NApi::TSelect::TWhere* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TSelect::TWhere>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ SUBPROTO_BUILDER(Filter, mutable_filter_typed, NApi::TPredicate, TPredicateBuilder<TBuilder>);
+
+ void FillWithDefaults() {
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
struct TWhatBuilder: public TProtoBuilder<TParent, NApi::TSelect::TWhat> {
using TBuilder = TWhatBuilder<TParent>;
@@ -337,6 +511,13 @@ namespace NYql::NConnector::NTest {
return *this;
}
+ TBuilder& NullableColumn(const TString& name, Ydb::Type::PrimitiveTypeId typeId) {
+ auto* col = this->Result_->add_items()->mutable_column();
+ col->set_name(name);
+ col->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(typeId);
+ return *this;
+ }
+
void FillWithDefaults() {
}
};
@@ -354,6 +535,7 @@ namespace NYql::NConnector::NTest {
EXPR_SETTER(Table, mutable_from()->set_table);
DATA_SOURCE_INSTANCE_SUBBUILDER();
SUBPROTO_BUILDER(What, mutable_what, NApi::TSelect::TWhat, TWhatBuilder<TBuilder>);
+ SUBPROTO_BUILDER(Where, mutable_where, NApi::TSelect::TWhere, TWhereBuilder<TBuilder>);
void FillWithDefaults() {
Table(DEFAULT_TABLE);
@@ -433,10 +615,8 @@ namespace NYql::NConnector::NTest {
private:
void SetExpectation() {
- auto future = NThreading::MakeFuture<TIteratorResult<IListSplitsStreamIterator>>(
- TIteratorResult<IListSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_});
- EXPECT_CALL(*Mock_, ListSplits(ProtobufRequestMatcher(*Result_)))
- .WillOnce(Return(future));
+ EXPECT_CALL(*Mock_, ListSplitsImpl(ProtobufRequestMatcher(*Result_)))
+ .WillOnce(Return(TIteratorResult<IListSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_}));
}
private:
@@ -464,7 +644,7 @@ namespace NYql::NConnector::NTest {
response.mutable_error()->CopyFrom(error);
response.set_arrow_ipc_streaming(ser.Serialize(recordBatch));
return static_cast<TBuilder&>(*this);
- };
+ }
void FillWithDefaults() {
}
@@ -504,10 +684,8 @@ namespace NYql::NConnector::NTest {
private:
void SetExpectation() {
- auto future = NThreading::MakeFuture<TIteratorResult<IReadSplitsStreamIterator>>(
- TIteratorResult<IReadSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_});
- EXPECT_CALL(*Mock_, ReadSplits(ProtobufRequestMatcher(*Result_)))
- .WillOnce(Return(future));
+ EXPECT_CALL(*Mock_, ReadSplitsImpl(ProtobufRequestMatcher(*Result_)))
+ .WillOnce(Return(TIteratorResult<IReadSplitsStreamIterator>{NGrpc::TGrpcStatus(), ResponseResult_}));
}
private:
@@ -526,5 +704,53 @@ namespace NYql::NConnector::NTest {
TReadSplitsExpectationBuilder ExpectReadSplits() {
return TReadSplitsExpectationBuilder(this);
}
+
+ TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request) override {
+ Cerr << "Call DescribeTable.\n"
+ << request.Utf8DebugString() << Endl;
+ auto result = DescribeTableImpl(request);
+ Cerr << "DescribeTable result.\n"
+ << StatusToDebugString(result.Status);
+ if (result.Response) {
+ Cerr << '\n'
+ << result.Response->Utf8DebugString();
+ }
+ Cerr << Endl;
+ return NThreading::MakeFuture(std::move(result));
+ }
+
+ TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request) override {
+ Cerr << "Call ListSplits.\n"
+ << request.Utf8DebugString() << Endl;
+ auto result = ListSplitsImpl(request);
+ Cerr << "ListSplits result.\n"
+ << StatusToDebugString(result.Status) << Endl;
+ return NThreading::MakeFuture(std::move(result));
+ }
+
+ TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request) override {
+ Cerr << "Call ReadSplits.\n"
+ << request.Utf8DebugString() << Endl;
+ auto result = ReadSplitsImpl(request);
+ Cerr << "ReadSplits result.\n"
+ << StatusToDebugString(result.Status) << Endl;
+ return NThreading::MakeFuture(std::move(result));
+ }
+
+ protected:
+ static TString StatusToDebugString(const NGrpc::TGrpcStatus& status) {
+ TStringBuilder s;
+ s << "GRpcStatusCode: " << status.GRpcStatusCode << '\n';
+ if (status.Msg) {
+ s << status.Msg;
+ }
+ if (status.Details) {
+ s << " (" << status.Details << ')';
+ }
+ if (status.InternalError) {
+ s << " (internal error)";
+ }
+ return std::move(s);
+ }
};
} // namespace NYql::NConnector::NTest
diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
index a36acc049e..88e63d1583 100644
--- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
+++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json
@@ -32,7 +32,8 @@
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSource", "Type": "TGenDataSource"},
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
- {"Index": 3, "Name": "Columns", "Type": "TExprBase"}
+ {"Index": 3, "Name": "Columns", "Type": "TExprBase"},
+ {"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
},
{
@@ -43,7 +44,8 @@
{"Index": 0, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
- {"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
+ {"Index": 3, "Name": "Columns", "Type": "TCoAtomList"},
+ {"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
}
]
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
index 971f4dac73..e85bf56b3b 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
@@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-pushdown
providers-common-structured_token
providers-common-transform
providers-dq-common
@@ -52,6 +53,7 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
index 7e3022b156..3e80e407cd 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
@@ -32,6 +32,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-pushdown
providers-common-structured_token
providers-common-transform
providers-dq-common
@@ -53,6 +54,7 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
index 7e3022b156..3e80e407cd 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-pushdown
providers-common-structured_token
providers-common-transform
providers-dq-common
@@ -53,6 +54,7 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
index 971f4dac73..e85bf56b3b 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
@@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-pushdown
providers-common-structured_token
providers-common-transform
providers-dq-common
@@ -52,6 +53,7 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make
index 06a700d75d..21c6fd2b18 100644
--- a/ydb/library/yql/providers/generic/provider/ya.make
+++ b/ydb/library/yql/providers/generic/provider/ya.make
@@ -13,6 +13,7 @@ SRCS(
yql_generic_logical_opt.cpp
yql_generic_mkql_compiler.cpp
yql_generic_physical_opt.cpp
+ yql_generic_predicate_pushdown.cpp
yql_generic_provider.cpp
yql_generic_provider.h
yql_generic_provider_impl.h
@@ -42,6 +43,7 @@ PEERDIR(
ydb/library/yql/providers/common/mkql
ydb/library/yql/providers/common/proto
ydb/library/yql/providers/common/provider
+ ydb/library/yql/providers/common/pushdown
ydb/library/yql/providers/common/structured_token
ydb/library/yql/providers/common/transform
ydb/library/yql/providers/dq/common
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp
index 8a6ece89e0..2c3122cc27 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource.cpp
@@ -3,6 +3,7 @@
#include "yql_generic_provider_impl.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
@@ -21,6 +22,7 @@ namespace NYql {
public:
TGenericDataSource(TGenericState::TPtr state)
: State_(state)
+ , ConfigurationTransformer_(MakeHolder<NCommon::TProviderConfigurationTransformer>(State_->Configuration, *State_->Types, TString{GenericProviderName}))
, IODiscoveryTransformer_(CreateGenericIODiscoveryTransformer(State_))
, LoadMetaDataTransformer_(CreateGenericLoadTableMetadataTransformer(State_))
, TypeAnnotationTransformer_(CreateGenericDataSourceTypeAnnotationTransformer(State_))
@@ -36,7 +38,7 @@ namespace NYql {
if (node.IsCallable(TCoDataSource::CallableName())) {
if (node.Child(0)->Content() == GenericProviderName) {
auto clusterName = node.Child(1)->Content();
- if (!State_->Configuration->HasCluster(clusterName)) {
+ if (clusterName != NCommon::ALL_CLUSTERS && !State_->Configuration->HasCluster(clusterName)) {
ctx.AddError(TIssue(ctx.GetPosition(node.Child(1)->Pos()),
TStringBuilder() << "Unknown cluster name: " << clusterName));
return false;
@@ -56,6 +58,10 @@ namespace NYql {
return TypeAnnotationTransformer_->CanParse(node);
}
+ IGraphTransformer& GetConfigurationTransformer() override {
+ return *ConfigurationTransformer_;
+ }
+
IGraphTransformer& GetIODiscoveryTransformer() override {
return *IODiscoveryTransformer_;
}
@@ -140,6 +146,7 @@ namespace NYql {
private:
const TGenericState::TPtr State_;
+ const THolder<IGraphTransformer> ConfigurationTransformer_;
const THolder<IGraphTransformer> IODiscoveryTransformer_;
const THolder<IGraphTransformer> LoadMetaDataTransformer_;
const THolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
index c36aaa1eca..a7b14b26cf 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
@@ -25,12 +25,58 @@ namespace NYql {
, State_(state)
{
using TSelf = TGenericDataSourceTypeAnnotationTransformer;
+ AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig));
AddHandler({TGenReadTable::CallableName()}, Hndl(&TSelf::HandleReadTable));
AddHandler({TGenSourceSettings::CallableName()}, Hndl(&TSelf::HandleSourceSettings));
}
+ TStatus HandleConfig(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureMinArgsCount(*input, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureWorldType(*input->Child(TCoConfigure::idx_World), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureSpecificDataSource(*input->Child(TCoConfigure::idx_DataSource), GenericProviderName, ctx)) {
+ return TStatus::Error;
+ }
+
+ input->SetTypeAnn(input->Child(TCoConfigure::idx_World)->GetTypeAnn());
+ return TStatus::Ok;
+ }
+
+ TStatus AnnotateFilterPredicate(const TExprNode::TPtr& input, size_t childIndex, const TStructExprType* itemType, TExprContext& ctx) {
+ if (childIndex >= input->ChildrenSize()) {
+ return TStatus::Error;
+ }
+
+ auto& filterLambda = input->ChildRef(childIndex);
+ if (!EnsureLambda(*filterLambda, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!UpdateLambdaAllArgumentsTypes(filterLambda, {itemType}, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (const auto* filterLambdaType = filterLambda->GetTypeAnn()) {
+ if (filterLambdaType->GetKind() != ETypeAnnotationKind::Data) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ const TDataExprType* dataExprType = static_cast<const TDataExprType*>(filterLambdaType);
+ if (dataExprType->GetSlot() != EDataSlot::Bool) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else {
+ return IGraphTransformer::TStatus::Repeat;
+ }
+ return TStatus::Ok;
+ }
+
TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureArgsCount(*input, 4, ctx)) {
+ if (!EnsureArgsCount(*input, 5, ctx)) {
return TStatus::Error;
}
@@ -76,6 +122,12 @@ namespace NYql {
}
}
+ // Filter
+ const TStatus filterAnnotationStatus = AnnotateFilterPredicate(input, TGenSourceSettings::idx_FilterPredicate, structExprType, ctx);
+ if (filterAnnotationStatus != TStatus::Ok) {
+ return filterAnnotationStatus;
+ }
+
blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(
BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))));
const TTypeAnnotationNode* typeAnnotationNode = ctx.MakeType<TStructExprType>(blockRowTypeItems);
@@ -90,7 +142,7 @@ namespace NYql {
TStatus HandleReadTable(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 4, ctx)) {
+ if (!EnsureArgsCount(*input, 5, ctx)) {
return TStatus::Error;
}
@@ -151,6 +203,12 @@ namespace NYql {
YQL_CLOG(DEBUG, ProviderGeneric) << "struct column order" << (static_cast<const TStructExprType*>(itemType))->ToString();
}
+ // Filter
+ const TStatus filterAnnotationStatus = AnnotateFilterPredicate(input, TGenReadTable::idx_FilterPredicate, itemType, ctx);
+ if (filterAnnotationStatus != TStatus::Ok) {
+ return filterAnnotationStatus;
+ }
+
input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
input->Child(TGenReadTable::idx_World)->GetTypeAnn(), ctx.MakeType<TListExprType>(itemType)}));
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index 30a10b7895..39adc3d0e8 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -1,6 +1,7 @@
#include "yql_generic_dq_integration.h"
#include "yql_generic_mkql_compiler.h"
+#include "yql_generic_predicate_pushdown.h"
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
@@ -70,6 +71,7 @@ namespace NYql {
.Name().Build(token)
.Build()
.Columns(std::move(columns))
+ .FilterPredicate(genReadTable.FilterPredicate())
.Build()
.RowType(ExpandType(genReadTable.Pos(), *rowType, ctx))
.DataSource(genReadTable.DataSource().Cast<TCoDataSource>())
@@ -139,6 +141,13 @@ namespace NYql {
column->mutable_type()->CopyFrom(type);
}
+ if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) {
+ TStringBuilder err;
+ if (!SerializeFilterPredicate(predicate, select->mutable_where()->mutable_filter_typed(), err)) {
+ ythrow yexception() << "Failed to serialize filter predicate for source: " << err;
+ }
+ }
+
// store data source instance
srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index b6c41cd3c5..3baedc42cd 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -231,11 +231,22 @@ namespace NYql {
tableMeta.ItemType = parse;
if (const auto ins = replaces.emplace(read.Raw(), TExprNode::TPtr()); ins.second) {
// clang-format off
+ auto row = Build<TCoArgument>(ctx, read.Pos())
+ .Name("row")
+ .Done();
+ auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
+ .Args({row})
+ .Body<TCoBool>()
+ .Literal().Build("true")
+ .Build()
+ .Done().Ptr();
+
ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Table().Value(tableName).Build()
.Columns<TCoVoid>().Build()
+ .FilterPredicate(emptyPredicate)
.Done().Ptr();
// clang-format on
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
index be30204547..3620d5e14e 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
@@ -1,14 +1,18 @@
#include "yql_generic_provider_impl.h"
+#include "yql_generic_predicate_pushdown.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/common/pushdown/collection.h>
+#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider.h>
namespace NYql {
@@ -16,6 +20,13 @@ namespace NYql {
namespace {
+ struct TPushdownSettings: public NPushdown::TSettings {
+ TPushdownSettings() {
+ using EFlag = NPushdown::TSettings::EFeatureFlag;
+ Enable(EFlag::LikeOperator | EFlag::JsonQueryOperators | EFlag::JsonExistsOperator);
+ }
+ };
+
class TGenericPhysicalOptProposalTransformer: public TOptimizeTransformerBase {
public:
TGenericPhysicalOptProposalTransformer(TGenericState::TPtr state)
@@ -25,6 +36,8 @@ namespace NYql {
#define HNDL(name) "PhysicalOptimizer-" #name, Hndl(&TGenericPhysicalOptProposalTransformer::name)
AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld));
AddHandler(0, &TCoNarrowMap::Match, HNDL(ReadZeroColumns));
+ AddHandler(0, &TCoFlatMap::Match, HNDL(PushFilterToReadTable));
+ AddHandler(0, &TCoFlatMap::Match, HNDL(PushFilterToDqSourceWrap));
#undef HNDL
}
@@ -89,10 +102,153 @@ namespace NYql {
return node;
}
+ static NPushdown::TPredicateNode BuildEmptyPredicate(TExprContext& ctx, TPositionHandle pos) {
+ auto emptyPredicate = Build<TCoBool>(ctx, pos).Literal().Build("true").Done();
+ NPushdown::TPredicateNode p;
+ p.ExprNode = emptyPredicate;
+ return p;
+ }
+
+ static NPushdown::TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree,
+ TExprContext& ctx, TPositionHandle pos)
+ {
+ if (predicateTree.CanBePushed) {
+ return predicateTree;
+ }
+
+ if (predicateTree.Op != NPushdown::EBoolOp::And) {
+ return BuildEmptyPredicate(ctx, pos);
+ }
+
+ std::vector<NPushdown::TPredicateNode> pushable;
+ for (auto& predicate : predicateTree.Children) {
+ if (predicate.CanBePushed) {
+ pushable.emplace_back(predicate);
+ }
+ }
+ NPushdown::TPredicateNode predicateToPush;
+ predicateToPush.SetPredicates(pushable, ctx, pos);
+ return predicateToPush;
+ }
+
+ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos) const {
+ auto lambdaArg = lambda.Args().Arg(0).Ptr();
+
+ YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Initial filter lambda: " << NCommon::ExprToPrettyString(ctx, lambda.Ref());
+
+ auto maybeOptionalIf = lambda.Body().Maybe<TCoOptionalIf>();
+ if (!maybeOptionalIf.IsValid()) {
+ return {};
+ }
+
+ TCoOptionalIf optionalIf = maybeOptionalIf.Cast();
+ NPushdown::TPredicateNode predicateTree(optionalIf.Predicate());
+ NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), TPushdownSettings());
+ YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");
+
+ NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos);
+ if (!predicateToPush.IsValid()) {
+ return {};
+ }
+
+ // clang-format off
+ auto newFilterLambda = Build<TCoLambda>(ctx, pos)
+ .Args({"filter_row"})
+ .Body<TExprApplier>()
+ .Apply(predicateToPush.ExprNode.Cast())
+ .With(TExprBase(lambdaArg), "filter_row")
+ .Build()
+ .Done();
+ // clang-format on
+
+ YQL_CLOG(INFO, ProviderGeneric) << "Push filter lambda: " << NCommon::ExprToPrettyString(ctx, *newFilterLambda.Ptr());
+ return newFilterLambda;
+ }
+
+ TMaybeNode<TExprBase> PushFilterToReadTable(TExprBase node, TExprContext& ctx) const {
+ if (!State_->Configuration->UsePredicatePushdown.Get().GetOrElse(TGenericSettings::TDefault::UsePredicatePushdown)) {
+ return node;
+ }
+
+ auto flatmap = node.Cast<TCoFlatMap>();
+ auto maybeRight = flatmap.Input().Maybe<TCoRight>();
+ if (!maybeRight) {
+ return node;
+ }
+ auto maybeGenericRead = maybeRight.Cast().Input().Maybe<TGenReadTable>();
+ if (!maybeGenericRead) {
+ return node;
+ }
+
+ TGenReadTable genericRead = maybeGenericRead.Cast();
+ if (!IsEmptyFilterPredicate(genericRead.FilterPredicate())) {
+ YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Lambda is already not empty";
+ return node;
+ }
+
+ auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos());
+ if (!newFilterLambda) {
+ return node;
+ }
+
+ // clang-format off
+ return Build<TCoFlatMap>(ctx, flatmap.Pos())
+ .InitFrom(flatmap) // Leave existing filter in flatmap for the case of not applying predicate in connector
+ .Input<TCoRight>()
+ .Input<TGenReadTable>()
+ .InitFrom(genericRead)
+ .FilterPredicate(newFilterLambda.Cast())
+ .Build()
+ .Build()
+ .Done();
+ // clang-format on
+ }
+
+ TMaybeNode<TExprBase> PushFilterToDqSourceWrap(TExprBase node, TExprContext& ctx) const {
+ if (!State_->Configuration->UsePredicatePushdown.Get().GetOrElse(TGenericSettings::TDefault::UsePredicatePushdown)) {
+ return node;
+ }
+
+ auto flatmap = node.Cast<TCoFlatMap>();
+ auto maybeSourceWrap = flatmap.Input().Maybe<TDqSourceWrap>();
+ if (!maybeSourceWrap) {
+ return node;
+ }
+
+ TDqSourceWrap sourceWrap = maybeSourceWrap.Cast();
+ auto maybeGenericSourceSettings = sourceWrap.Input().Maybe<TGenSourceSettings>();
+ if (!maybeGenericSourceSettings) {
+ return node;
+ }
+
+ TGenSourceSettings genericSourceSettings = maybeGenericSourceSettings.Cast();
+ if (!IsEmptyFilterPredicate(genericSourceSettings.FilterPredicate())) {
+ YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Lambda is already not empty";
+ return node;
+ }
+
+ auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos());
+ if (!newFilterLambda) {
+ return node;
+ }
+
+ // clang-format off
+ return Build<TCoFlatMap>(ctx, flatmap.Pos())
+ .InitFrom(flatmap) // Leave existing filter in flatmap for the case of not applying predicate in connector
+ .Input<TDqSourceWrap>()
+ .InitFrom(sourceWrap)
+ .Input<TGenSourceSettings>()
+ .InitFrom(genericSourceSettings)
+ .FilterPredicate(newFilterLambda.Cast())
+ .Build()
+ .Build()
+ .Done();
+ // clang-format on
+ }
+
private:
const TGenericState::TPtr State_;
};
-
}
THolder<IGraphTransformer> CreateGenericPhysicalOptProposalTransformer(TGenericState::TPtr state) {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
new file mode 100644
index 0000000000..8f24d0f58a
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
@@ -0,0 +1,92 @@
+#include "yql_generic_predicate_pushdown.h"
+
+#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
+
+namespace NYql {
+
+ using namespace NNodes;
+ using namespace NConnector::NApi;
+
+ namespace {
+
+ bool SerializeMember(const TCoMember& member, TExpression* proto, const TCoArgument& arg, TStringBuilder& err) {
+ if (member.Struct().Raw() != arg.Raw()) { // member callable called not for lambda argument
+ err << "member callable called not for lambda argument";
+ return false;
+ }
+ proto->set_column(member.Name().StringValue());
+ return true;
+ }
+
+ bool SerializeInt32(const TCoInt32& constant, TExpression* proto, TStringBuilder&) {
+ auto* value = proto->mutable_typed_value();
+ auto* t = value->mutable_type();
+ t->set_type_id(Ydb::Type::INT32);
+ auto* v = value->mutable_value();
+ v->set_int32_value(FromString<i32>(constant.Literal()));
+ return true;
+ }
+
+ bool SerializeExpression(const TExprBase& expression, TExpression* proto, const TCoArgument& arg, TStringBuilder& err) {
+ if (auto member = expression.Maybe<TCoMember>()) {
+ return SerializeMember(member.Cast(), proto, arg, err);
+ }
+
+ // data
+ if (auto int32Atom = expression.Maybe<TCoInt32>()) {
+ return SerializeInt32(int32Atom.Cast(), proto, err);
+ }
+
+ err << "unknown expression: " << expression.Raw()->Content();
+ return false;
+ }
+
+ bool SerializeCompare(const TCoCompare& compare, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err) {
+ TPredicate::TComparison* proto = predicateProto->mutable_comparison();
+ if (compare.Maybe<TCoCmpEqual>()) {
+ proto->set_operation(TPredicate::TComparison::EQ);
+ }
+ if (proto->operation() == TPredicate::TComparison::RESERVED) { // Unknown operation
+ err << "unknown operation: " << compare.Raw()->Content();
+ return false;
+ }
+ return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err);
+ }
+
+ bool SerializeCoalesce(const TCoCoalesce& coalesce, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
+ auto predicate = coalesce.Predicate();
+ if (auto compare = predicate.Maybe<TCoCompare>()) {
+ return SerializeCompare(compare.Cast(), proto, arg, err);
+ }
+
+ err << "unknown coalesce predicate: " << predicate.Raw()->Content();
+ return false;
+ }
+
+ bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
+ if (auto compare = predicate.Maybe<TCoCompare>()) {
+ return SerializeCompare(compare.Cast(), proto, arg, err);
+ }
+ if (auto coalesce = predicate.Maybe<TCoCoalesce>()) {
+ return SerializeCoalesce(coalesce.Cast(), proto, arg, err);
+ }
+
+ err << "unknown predicate: " << predicate.Raw()->Content();
+ return false;
+ }
+
+ } // namespace
+
+ bool IsEmptyFilterPredicate(const TCoLambda& lambda) {
+ auto maybeBool = lambda.Body().Maybe<TCoBool>();
+ if (!maybeBool) {
+ return false;
+ }
+ return TStringBuf(maybeBool.Cast().Literal()) == "true"sv;
+ }
+
+ bool SerializeFilterPredicate(const TCoLambda& predicate, TPredicate* proto, TStringBuilder& err) {
+ return SerializePredicate(predicate.Body(), proto, predicate.Args().Arg(0), err);
+ }
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h
new file mode 100644
index 0000000000..121ab50527
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
+
+namespace NYql::NConnector::NApi {
+ class TPredicate;
+} // namespace NYql::NConnector::NApi
+
+namespace NYql {
+
+ bool IsEmptyFilterPredicate(const NNodes::TCoLambda& lambda);
+ bool SerializeFilterPredicate(const NNodes::TCoLambda& predicate, NConnector::NApi::TPredicate* proto, TStringBuilder& err);
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
index c050651f63..29dfe55f56 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
@@ -7,17 +7,23 @@
namespace NYql {
+ TGenericConfiguration::TGenericConfiguration() {
+ REGISTER_SETTING(*this, UsePredicatePushdown);
+ }
+
void TGenericConfiguration::Init(const NYql::TGenericGatewayConfig& gatewayConfig,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> databaseResolver,
NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth,
const TCredentials::TPtr& credentials)
{
+ Dispatch(gatewayConfig.GetDefaultSettings());
+
for (const auto& cluster : gatewayConfig.GetClusterMapping()) {
AddCluster(cluster, databaseResolver, databaseAuth, credentials);
}
// TODO: check if it's necessary
- this->FreezeDefaults();
+ FreezeDefaults();
}
void TGenericConfiguration::AddCluster(const TGenericClusterConfig& clusterConfig,
@@ -93,4 +99,4 @@ namespace NYql {
return ValidClusters.contains(cluster);
}
-} \ No newline at end of file
+}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index 685f0004ee..639db317d3 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -9,12 +9,18 @@ namespace NYql {
struct TGenericSettings {
using TConstPtr = std::shared_ptr<const TGenericSettings>;
+
+ NCommon::TConfSetting<bool, false> UsePredicatePushdown;
+
+ struct TDefault {
+ static constexpr bool UsePredicatePushdown = false;
+ };
};
struct TGenericConfiguration: public TGenericSettings, public NCommon::TSettingDispatcher {
using TPtr = TIntrusivePtr<TGenericConfiguration>;
- TGenericConfiguration(){};
+ TGenericConfiguration();
TGenericConfiguration(const TGenericConfiguration&) = delete;
void Init(const NYql::TGenericGatewayConfig& gatewayConfig,