aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-11-02 17:34:34 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-11-02 18:54:19 +0300
commitc336c58dfb573341b17f27c7b128e80416d87422 (patch)
tree60de87226a138bafb81744e6266ea14b33017021
parenta91b63ffcedbcc0a38b0907fea8ddf189b498897 (diff)
downloadydb-c336c58dfb573341b17f27c7b128e80416d87422.tar.gz
Support of the first part of features for pushdown. For now without rendering SQL in connector (will be in the next code review)
-rw-r--r--.mapping.json6
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp12
-rw-r--r--ydb/library/yql/providers/common/pushdown/collection.cpp149
-rw-r--r--ydb/library/yql/providers/common/pushdown/settings.h22
-rw-r--r--ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto33
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/CMakeLists.txt9
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.darwin-x86_64.txt83
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-aarch64.txt86
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-x86_64.txt88
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.txt17
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.windows-x86_64.txt76
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp605
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/pushdown/ya.make27
-rw-r--r--ydb/library/yql/providers/generic/provider/ut/ya.make3
-rw-r--r--ydb/library/yql/providers/generic/provider/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp19
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp138
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h4
24 files changed, 1320 insertions, 67 deletions
diff --git a/.mapping.json b/.mapping.json
index 16eed80203..8d3f543c94 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -7786,6 +7786,12 @@
"ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/providers/generic/provider/CMakeLists.txt":"",
"ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/providers/generic/provider/ut/CMakeLists.txt":"",
+ "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.txt":"",
+ "ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/providers/pq/CMakeLists.txt":"",
"ydb/library/yql/providers/pq/async_io/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/providers/pq/async_io/CMakeLists.linux-aarch64.txt":"",
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
index fc6bb36a28..ef2aa09a30 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
@@ -25,11 +25,21 @@ static const std::unordered_set<std::string> SecondLevelFilters = {
};
struct TPushdownSettings : public NPushdown::TSettings {
- TPushdownSettings() {
+ TPushdownSettings()
+ : NPushdown::TSettings(NYql::NLog::EComponent::ProviderKqp)
+ {
using EFlag = NPushdown::TSettings::EFeatureFlag;
Enable(EFlag::LikeOperator, NSsa::RuntimeVersion >= 2U);
Enable(EFlag::LikeOperatorOnlyForUtf8, NSsa::RuntimeVersion < 3U);
Enable(EFlag::JsonQueryOperators | EFlag::JsonExistsOperator, NSsa::RuntimeVersion >= 3U);
+ Enable(EFlag::LogicalXorOperator
+ | EFlag::ParameterExpression
+ | EFlag::CastExpression
+ | EFlag::StringTypes
+ | EFlag::DateTimeTypes
+ | EFlag::UuidType
+ | EFlag::DecimalType
+ | EFlag::DyNumberType);
}
};
diff --git a/ydb/library/yql/providers/common/pushdown/collection.cpp b/ydb/library/yql/providers/common/pushdown/collection.cpp
index 253ff2cc32..f6a8c30193 100644
--- a/ydb/library/yql/providers/common/pushdown/collection.cpp
+++ b/ydb/library/yql/providers/common/pushdown/collection.cpp
@@ -61,10 +61,8 @@ bool IsSupportedPredicate(const TCoCompare& predicate, const TSettings& settings
return false;
}
-bool IsSupportedDataType(const TCoDataCtor& node) {
- if (node.Maybe<TCoUtf8>() ||
- node.Maybe<TCoString>() ||
- node.Maybe<TCoBool>() ||
+bool IsSupportedDataType(const TCoDataCtor& node, const TSettings& settings) {
+ if (node.Maybe<TCoBool>() ||
node.Maybe<TCoFloat>() ||
node.Maybe<TCoDouble>() ||
node.Maybe<TCoInt8>() ||
@@ -79,10 +77,20 @@ bool IsSupportedDataType(const TCoDataCtor& node) {
return true;
}
+ if (settings.IsEnabled(TSettings::EFeatureFlag::StringTypes)) {
+ if (node.Maybe<TCoUtf8>() || node.Maybe<TCoString>()) {
+ return true;
+ }
+ }
+
return false;
}
-bool IsSupportedCast(const TCoSafeCast& cast) {
+bool IsSupportedCast(const TCoSafeCast& cast, const TSettings& settings) {
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::CastExpression)) {
+ return false;
+ }
+
auto maybeDataType = cast.Type().Maybe<TCoDataType>();
if (!maybeDataType) {
if (const auto maybeOptionalType = cast.Type().Maybe<TCoOptionalType>()) {
@@ -100,8 +108,42 @@ bool IsSupportedCast(const TCoSafeCast& cast) {
return false;
}
+bool IsStringType(NYql::NUdf::TDataTypeId t) {
+ return t == NYql::NProto::String
+ || t == NYql::NProto::Utf8
+ || t == NYql::NProto::Yson
+ || t == NYql::NProto::Json
+ || t == NYql::NProto::JsonDocument;
+}
+
+bool IsDateTimeType(NYql::NUdf::TDataTypeId t) {
+ return t == NYql::NProto::Date
+ || t == NYql::NProto::Datetime
+ || t == NYql::NProto::Timestamp
+ || t == NYql::NProto::Interval
+ || t == NYql::NProto::TzDate
+ || t == NYql::NProto::TzDatetime
+ || t == NYql::NProto::TzTimestamp
+ || t == NYql::NProto::Date32
+ || t == NYql::NProto::Datetime64
+ || t == NYql::NProto::Timestamp64
+ || t == NYql::NProto::Interval64;
+}
+
+bool IsUuidType(NYql::NUdf::TDataTypeId t) {
+ return t == NYql::NProto::Uuid;
+}
+
+bool IsDecimalType(NYql::NUdf::TDataTypeId t) {
+ return t == NYql::NProto::Decimal;
+}
+
+bool IsDyNumberType(NYql::NUdf::TDataTypeId t) {
+ return t == NYql::NProto::DyNumber;
+}
+
bool IsComparableTypes(const TExprBase& leftNode, const TExprBase& rightNode, bool equality,
- const TTypeAnnotationNode* inputType)
+ const TTypeAnnotationNode* inputType, const TSettings& settings)
{
const TExprNode::TPtr leftPtr = leftNode.Ptr();
const TExprNode::TPtr rightPtr = rightNode.Ptr();
@@ -129,7 +171,7 @@ bool IsComparableTypes(const TExprBase& leftNode, const TExprBase& rightNode, bo
return CanCompare<false>(left, right);
};
- auto canCompare = [&defaultCompare](const TTypeAnnotationNode* left, const TTypeAnnotationNode* right) {
+ auto canCompare = [&defaultCompare, &settings](const TTypeAnnotationNode* left, const TTypeAnnotationNode* right) {
if (left->GetKind() != ETypeAnnotationKind::Data ||
right->GetKind() != ETypeAnnotationKind::Data)
{
@@ -139,6 +181,26 @@ bool IsComparableTypes(const TExprBase& leftNode, const TExprBase& rightNode, bo
auto leftTypeId = GetDataTypeInfo(left->Cast<TDataExprType>()->GetSlot()).TypeId;
auto rightTypeId = GetDataTypeInfo(right->Cast<TDataExprType>()->GetSlot()).TypeId;
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::StringTypes) && (IsStringType(leftTypeId) || IsStringType(rightTypeId))) {
+ return ECompareOptions::Uncomparable;
+ }
+
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::DateTimeTypes) && (IsDateTimeType(leftTypeId) || IsDateTimeType(rightTypeId))) {
+ return ECompareOptions::Uncomparable;
+ }
+
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::UuidType) && (IsUuidType(leftTypeId) || IsUuidType(rightTypeId))) {
+ return ECompareOptions::Uncomparable;
+ }
+
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::DecimalType) && (IsDecimalType(leftTypeId) || IsDecimalType(rightTypeId))) {
+ return ECompareOptions::Uncomparable;
+ }
+
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::DyNumberType) && (IsDyNumberType(leftTypeId) || IsDyNumberType(rightTypeId))) {
+ return ECompareOptions::Uncomparable;
+ }
+
if (leftTypeId == rightTypeId) {
return ECompareOptions::Comparable;
}
@@ -213,7 +275,7 @@ bool IsComparableTypes(const TExprBase& leftNode, const TExprBase& rightNode, bo
auto rightType = getDataType(rightPtr);
if (canCompare(leftType, rightType) == ECompareOptions::Uncomparable) {
- YQL_CLOG(DEBUG, ProviderKqp) << "OLAP Pushdown: "
+ YQL_CVLOG(NLog::ELevel::DEBUG, settings.GetLogComponent()) << "Pushdown: "
<< "Uncompatible types in compare of nodes: "
<< leftPtr->Content() << " of type " << FormatType(leftType)
<< " and "
@@ -239,19 +301,31 @@ std::vector<TExprBase> GetComparisonNodes(const TExprBase& node) {
return res;
}
-bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lambdaArg, const TSettings& settings) {
+bool IsMemberColumn(const TCoMember& member, const TExprNode* lambdaArg) {
+ return member.Struct().Raw() == lambdaArg;
+}
+
+bool IsMemberColumn(const TExprBase& node, const TExprNode* lambdaArg) {
+ if (auto member = node.Maybe<TCoMember>()) {
+ return IsMemberColumn(member.Cast(), lambdaArg);
+ }
+ return false;
+}
+
+bool IsSupportedArithmeticalExpression(const TExprBase& node, const TSettings& settings) {
+ if (!settings.IsEnabled(TSettings::EFeatureFlag::ArithmeticalExpressions)) {
+ return false;
+ }
+ return node.Maybe<TCoMul>() || node.Maybe<TCoPlus>() || node.Maybe<TCoMinus>();
+}
+
+bool CheckExpressionNodeForPushdown(const TExprBase& node, const TExprNode* lambdaArg, const TSettings& settings) {
if (auto maybeSafeCast = node.Maybe<TCoSafeCast>()) {
- if (!IsSupportedCast(maybeSafeCast.Cast())) {
- return false;
- }
+ return IsSupportedCast(maybeSafeCast.Cast(), settings);
} else if (auto maybeData = node.Maybe<TCoDataCtor>()) {
- if (!IsSupportedDataType(maybeData.Cast())) {
- return false;
- }
+ return IsSupportedDataType(maybeData.Cast(), settings);
} else if (auto maybeMember = node.Maybe<TCoMember>()) {
- if (maybeMember.Cast().Struct().Raw() != lambdaArg) {
- return false;
- }
+ return IsMemberColumn(maybeMember.Cast(), lambdaArg);
} else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonQueryOperators) && node.Maybe<TCoJsonQueryBase>()) {
if (!node.Maybe<TCoJsonValue>()) {
return false;
@@ -261,11 +335,17 @@ bool CheckComparisonNodeForPushdown(const TExprBase& node, const TExprNode* lamb
// Currently we support only simple columns in pushdown
return false;
}
- } else if (!node.Maybe<TCoNull>() && !node.Maybe<TCoParameter>()) {
- return false;
+ return true;
+ } else if (node.Maybe<TCoNull>()) {
+ return true;
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::ParameterExpression) && node.Maybe<TCoParameter>()) {
+ return true;
+ } else if (IsSupportedArithmeticalExpression(node, settings)) {
+ TCoBinaryArithmetic op = node.Cast<TCoBinaryArithmetic>();
+ return CheckExpressionNodeForPushdown(op.Left(), lambdaArg, settings) && CheckExpressionNodeForPushdown(op.Right(), lambdaArg, settings);
}
- return true;
+ return false;
}
bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExprNode* lambdaArg, const TExprBase& input, const TSettings& settings) {
@@ -297,10 +377,10 @@ bool CheckComparisonParametersForPushdown(const TCoCompare& compare, const TExpr
YQL_ENSURE(leftList.size() == rightList.size(), "Different sizes of lists in comparison!");
for (size_t i = 0; i < leftList.size(); ++i) {
- if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg, settings)) {
+ if (!CheckExpressionNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckExpressionNodeForPushdown(rightList[i], lambdaArg, settings)) {
return false;
}
- if (!IsComparableTypes(leftList[i], rightList[i], equality, inputType)) {
+ if (!IsComparableTypes(leftList[i], rightList[i], equality, inputType, settings)) {
return false;
}
if (IsLikeOperator(compare) && settings.IsEnabled(TSettings::EFeatureFlag::LikeOperatorOnlyForUtf8) && !IsSupportedLikeForUtf8(leftList[i], rightList[i])) {
@@ -345,7 +425,7 @@ bool SafeCastCanBePushed(const TCoFlatMap& flatmap, const TExprNode* lambdaArg,
YQL_ENSURE(leftList.size() == rightList.size(), "Different sizes of lists in comparison!");
for (size_t i = 0; i < leftList.size(); ++i) {
- if (!CheckComparisonNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckComparisonNodeForPushdown(rightList[i], lambdaArg, settings)) {
+ if (!CheckExpressionNodeForPushdown(leftList[i], lambdaArg, settings) || !CheckExpressionNodeForPushdown(rightList[i], lambdaArg, settings)) {
return false;
}
}
@@ -374,7 +454,7 @@ bool JsonExistsCanBePushed(const TCoJsonExists& jsonExists, const TExprNode* lam
// Currently we support only simple columns in pushdown
return false;
}
- if (maybeMember.Cast().Struct().Raw() != lambdaArg) {
+ if (!IsMemberColumn(maybeMember.Cast(), lambdaArg)) {
return false;
}
return true;
@@ -399,18 +479,11 @@ bool CoalesceCanBePushed(const TCoCoalesce& coalesce, const TExprNode* lambdaArg
}
bool ExistsCanBePushed(const TCoExists& exists, const TExprNode* lambdaArg) {
- auto maybeMember = exists.Optional().Maybe<TCoMember>();
- if (!maybeMember.IsValid()) {
- return false;
- }
- if (maybeMember.Cast().Struct().Raw() != lambdaArg) {
- return false;
- }
- return true;
+ return IsMemberColumn(exists.Optional(), lambdaArg);
}
void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
- if (!opNode.Maybe<TCoAnd>() && !opNode.Maybe<TCoOr>() && !opNode.Maybe<TCoOr>()) {
+ if (!opNode.Maybe<TCoAnd>() && !opNode.Maybe<TCoOr>() && !opNode.Maybe<TCoXor>()) {
return;
}
predicateTree.Children.reserve(opNode.Ptr()->ChildrenSize());
@@ -423,6 +496,10 @@ void CollectPredicatesForBinaryBoolOperators(const TExprBase& opNode, TPredicate
}
}
+void CollectExpressionPredicate(TPredicateNode& predicateTree, const TCoMember& member, const TExprNode* lambdaArg) {
+ predicateTree.CanBePushed = IsMemberColumn(member, lambdaArg);
+}
+
} // anonymous namespace end
void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, const TSettings& settings) {
@@ -448,12 +525,14 @@ void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree
} else if (predicate.Maybe<TCoOr>()) {
predicateTree.Op = EBoolOp::Or;
CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoOr>(), predicateTree, lambdaArg, lambdaBody, settings);
- } else if (predicate.Maybe<TCoXor>()) {
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::LogicalXorOperator) && predicate.Maybe<TCoXor>()) {
predicateTree.Op = EBoolOp::Xor;
CollectPredicatesForBinaryBoolOperators(predicate.Cast<TCoXor>(), predicateTree, lambdaArg, lambdaBody, settings);
} else if (settings.IsEnabled(TSettings::EFeatureFlag::JsonExistsOperator) && predicate.Maybe<TCoJsonExists>()) {
auto jsonExists = predicate.Cast<TCoJsonExists>();
predicateTree.CanBePushed = JsonExistsCanBePushed(jsonExists, lambdaArg);
+ } else if (settings.IsEnabled(TSettings::EFeatureFlag::ExpressionAsPredicate) && predicate.Maybe<TCoMember>()) {
+ CollectExpressionPredicate(predicateTree, predicate.Cast<TCoMember>(), lambdaArg);
} else {
predicateTree.CanBePushed = false;
}
diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h
index c53e1bf9fe..136202fb7b 100644
--- a/ydb/library/yql/providers/common/pushdown/settings.h
+++ b/ydb/library/yql/providers/common/pushdown/settings.h
@@ -1,4 +1,5 @@
#pragma once
+#include <ydb/library/yql/utils/log/log_component.h>
#include <util/system/types.h>
@@ -10,16 +11,35 @@ struct TSettings {
LikeOperatorOnlyForUtf8 = 1 << 1,
JsonQueryOperators = 1 << 2,
JsonExistsOperator = 1 << 3,
+ LogicalXorOperator = 1 << 4,
+ ExpressionAsPredicate = 1 << 5, // Bool columns and bool expressions can take part in AND/OR/NOT statements
+ ArithmeticalExpressions = 1 << 6, // *, +, -
+ ParameterExpression = 1 << 7, // Query parameters
+ CastExpression = 1 << 8, // CAST()
+ StringTypes = 1 << 9, // Support string types
+ DateTimeTypes = 1 << 10, // Date, Datetime, Timestamp
+ UuidType = 1 << 11,
+ DecimalType = 1 << 12,
+ DyNumberType = 1 << 13,
};
- TSettings() = default;
+ explicit TSettings(NLog::EComponent logComponent)
+ : LogComponent(logComponent)
+ {
+ }
+
TSettings(const TSettings&) = default;
void Enable(ui64 flagsMask, bool set = true);
bool IsEnabled(EFeatureFlag flagMask) const;
+ NLog::EComponent GetLogComponent() const {
+ return LogComponent;
+ }
+
private:
+ const NLog::EComponent LogComponent;
ui64 FeatureFlags = 0;
};
diff --git a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto
index a2e8f6daf0..9cd34802fe 100644
--- a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto
+++ b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto
@@ -320,11 +320,35 @@ message TContinuation {
// Can be a column, a constant or a result of, for example,
// some arithmetical operation
message TExpression {
+ message TArithmeticalExpression {
+ // An operation code.
+ enum EOperation {
+ EXPRESSION_OPERATION_UNSPECIFIED = 0;
+ MUL = 1; // left_value * right_value
+ ADD = 2; // left_value + right_value
+ SUB = 3; // left_value - right_value
+ BIT_AND = 4; // left_value & right_value
+ BIT_OR = 5; // left_value | right_value
+ BIT_XOR = 6; // left_value ^ right_value
+ // TODO: support `/` and `%`
+ }
+ EOperation operation = 1;
+ TExpression left_value = 2;
+ TExpression right_value = 3;
+ }
+
+ message TNull {
+ }
+
oneof payload {
// A scalar value
Ydb.TypedValue typed_value = 1;
// A name of another column to compare with
string column = 2;
+
+ TArithmeticalExpression arithmetical_expression = 3;
+
+ TNull null = 4;
}
}
@@ -369,11 +393,17 @@ message TPredicate {
TExpression value = 1;
}
+ // Expression wich has bool type
+ // For example, bool column
+ message TBoolExpression {
+ TExpression value = 1;
+ }
+
// A subset of comparators corresponding to the binary logical operators
message TComparison {
// An operation code.
enum EOperation {
- OPERATION_UNSPECIFIED = 0;
+ COMPARISON_OPERATION_UNSPECIFIED = 0;
L = 1; // "$column < value"
LE = 2; // "$column <= value"
EQ = 3; // "$column = value"
@@ -396,6 +426,7 @@ message TPredicate {
TIsNull is_null = 6;
TIsNotNull is_not_null = 7;
TComparison comparison = 8;
+ TBoolExpression bool_expression = 9;
}
}
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
index e85bf56b3b..7e0272869d 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut)
add_library(providers-generic-provider)
target_compile_options(providers-generic-provider PRIVATE
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
index 3e80e407cd..a6195de897 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut)
add_library(providers-generic-provider)
target_compile_options(providers-generic-provider PRIVATE
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
index 3e80e407cd..a6195de897 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut)
add_library(providers-generic-provider)
target_compile_options(providers-generic-provider PRIVATE
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
index e85bf56b3b..7e0272869d 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut)
add_library(providers-generic-provider)
target_compile_options(providers-generic-provider PRIVATE
diff --git a/ydb/library/yql/providers/generic/provider/ut/CMakeLists.txt b/ydb/library/yql/providers/generic/provider/ut/CMakeLists.txt
new file mode 100644
index 0000000000..61a77eb131
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/CMakeLists.txt
@@ -0,0 +1,9 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(pushdown)
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..e686f091c3
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,83 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(yql-providers-generic-provider-ut-pushdown)
+target_compile_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider
+)
+target_link_libraries(yql-providers-generic-provider-ut-pushdown PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-generic-provider
+ contrib-libs-fmt
+ library-cpp-random_provider
+ library-yql-ast
+ library-yql-core
+ yql-core-services
+ yql-dq-expr_nodes
+ library-yql-minikql
+ providers-common-db_id_async_resolver
+ providers-generic-expr_nodes
+ providers-result-provider
+ udf-service-stub
+ library-yql-sql
+ yql-sql-pg_dummy
+)
+target_link_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp
+)
+set_property(
+ TARGET
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ yql-providers-generic-provider-ut-pushdown
+ TEST_TARGET
+ yql-providers-generic-provider-ut-pushdown
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(yql-providers-generic-provider-ut-pushdown
+ system_allocator
+)
+vcs_info(yql-providers-generic-provider-ut-pushdown)
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..bc949cb41d
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,86 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(yql-providers-generic-provider-ut-pushdown)
+target_compile_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider
+)
+target_link_libraries(yql-providers-generic-provider-ut-pushdown PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ providers-generic-provider
+ contrib-libs-fmt
+ library-cpp-random_provider
+ library-yql-ast
+ library-yql-core
+ yql-core-services
+ yql-dq-expr_nodes
+ library-yql-minikql
+ providers-common-db_id_async_resolver
+ providers-generic-expr_nodes
+ providers-result-provider
+ udf-service-stub
+ library-yql-sql
+ yql-sql-pg_dummy
+)
+target_link_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp
+)
+set_property(
+ TARGET
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ yql-providers-generic-provider-ut-pushdown
+ TEST_TARGET
+ yql-providers-generic-provider-ut-pushdown
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(yql-providers-generic-provider-ut-pushdown
+ cpp-malloc-jemalloc
+)
+vcs_info(yql-providers-generic-provider-ut-pushdown)
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..77fc10386f
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,88 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(yql-providers-generic-provider-ut-pushdown)
+target_compile_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider
+)
+target_link_libraries(yql-providers-generic-provider-ut-pushdown PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-generic-provider
+ contrib-libs-fmt
+ library-cpp-random_provider
+ library-yql-ast
+ library-yql-core
+ yql-core-services
+ yql-dq-expr_nodes
+ library-yql-minikql
+ providers-common-db_id_async_resolver
+ providers-generic-expr_nodes
+ providers-result-provider
+ udf-service-stub
+ library-yql-sql
+ yql-sql-pg_dummy
+)
+target_link_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp
+)
+set_property(
+ TARGET
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ yql-providers-generic-provider-ut-pushdown
+ TEST_TARGET
+ yql-providers-generic-provider-ut-pushdown
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(yql-providers-generic-provider-ut-pushdown
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(yql-providers-generic-provider-ut-pushdown)
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.txt b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..4b418322c7
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,76 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(yql-providers-generic-provider-ut-pushdown)
+target_compile_options(yql-providers-generic-provider-ut-pushdown PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider
+)
+target_link_libraries(yql-providers-generic-provider-ut-pushdown PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-generic-provider
+ contrib-libs-fmt
+ library-cpp-random_provider
+ library-yql-ast
+ library-yql-core
+ yql-core-services
+ yql-dq-expr_nodes
+ library-yql-minikql
+ providers-common-db_id_async_resolver
+ providers-generic-expr_nodes
+ providers-result-provider
+ udf-service-stub
+ library-yql-sql
+ yql-sql-pg_dummy
+)
+target_sources(yql-providers-generic-provider-ut-pushdown PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp
+)
+set_property(
+ TARGET
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ SPLIT_FACTOR
+ 1
+)
+add_yunittest(
+ NAME
+ yql-providers-generic-provider-ut-pushdown
+ TEST_TARGET
+ yql-providers-generic-provider-ut-pushdown
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ LABELS
+ SMALL
+)
+set_yunittest_property(
+ TEST
+ yql-providers-generic-provider-ut-pushdown
+ PROPERTY
+ PROCESSORS
+ 1
+)
+target_allocator(yql-providers-generic-provider-ut-pushdown
+ system_allocator
+)
+vcs_info(yql-providers-generic-provider-ut-pushdown)
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp
new file mode 100644
index 0000000000..407048cc6c
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp
@@ -0,0 +1,605 @@
+#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
+#include <ydb/library/yql/providers/generic/proto/source.pb.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
+
+#include <ydb/library/yql/ast/yql_ast.h>
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/core/yql_graph_transformer.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
+#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
+#include <ydb/library/yql/core/services/yql_out_transformers.h>
+#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
+#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
+#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/providers/result/provider/yql_result_provider.h>
+#include <ydb/library/yql/sql/sql.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <library/cpp/random_provider/random_provider.h>
+
+#include <google/protobuf/text_format.h>
+
+#include <fmt/format.h>
+
+using namespace NYql;
+using namespace NNodes;
+using namespace NKikimr::NMiniKQL;
+using namespace fmt::literals;
+
+// Template for program to optimize
+static constexpr auto ProgramTemplate = R"ast((
+(let $data_source (DataSource '"generic" '"test_cluster"))
+(let $empty_lambda (lambda '($arg) (Bool '"true")))
+(let $table
+ (MrTableConcat (Key '('table (String '"test_table"))))
+)
+(let $read (Read! world $data_source $table))
+
+(let $map_lambda (lambda '($row)
+ (OptionalIf
+ {lambda_text}
+ $row
+ )
+))
+(let $filtered_data (FlatMap (Right! $read) $map_lambda))
+
+(let $resulte_data_sink (DataSink '"result"))
+(let $result (ResWrite! (Left! $read) $resulte_data_sink (Key) $filtered_data '('('type))))
+(return (Commit! $result $resulte_data_sink))
+))ast";
+
+struct TFakeDatabaseResolver: public IDatabaseAsyncResolver {
+ NThreading::TFuture<TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const {
+ TDatabaseResolverResponse resp;
+ resp.Success = true;
+ for (const auto& [databasePair, auth] : ids) {
+ const auto& [database, type] = databasePair;
+ TDatabaseResolverResponse::TDatabaseDescription& desc = resp.DatabaseDescriptionMap[std::pair(database, type)];
+ desc.Database = database;
+ desc.Endpoint = "endpoint";
+ desc.Host = "host";
+ desc.Port = 42;
+ }
+ return NThreading::MakeFuture(resp);
+ }
+};
+
+struct TFakeGenericClient: public NConnector::IClient {
+ NConnector::TDescribeTableAsyncResult DescribeTable(const NConnector::NApi::TDescribeTableRequest& request) {
+ UNIT_ASSERT_VALUES_EQUAL(request.table(), "test_table");
+ NConnector::TResult<NConnector::NApi::TDescribeTableResponse> result;
+ auto& resp = result.Response.emplace();
+ auto& schema = *resp.mutable_schema();
+
+#define PRIMITIVE_TYPE_COL(name, type) \
+ { \
+ auto* col = schema.add_columns(); \
+ col->set_name("col_" name); \
+ auto* t = col->mutable_type(); \
+ t->set_type_id(Ydb::Type::type); \
+ } \
+ { \
+ auto* col = schema.add_columns(); \
+ col->set_name("col_optional_" name); \
+ auto* t = col->mutable_type()->mutable_optional_type()->mutable_item(); \
+ t->set_type_id(Ydb::Type::type); \
+ }
+
+ PRIMITIVE_TYPE_COL("bool", BOOL);
+ PRIMITIVE_TYPE_COL("int8", INT8);
+ PRIMITIVE_TYPE_COL("uint8", UINT8);
+ PRIMITIVE_TYPE_COL("int16", INT16);
+ PRIMITIVE_TYPE_COL("uint16", UINT16);
+ PRIMITIVE_TYPE_COL("int32", INT32);
+ PRIMITIVE_TYPE_COL("uint32", UINT32);
+ PRIMITIVE_TYPE_COL("int64", INT64);
+ PRIMITIVE_TYPE_COL("uint64", UINT64);
+ PRIMITIVE_TYPE_COL("float", FLOAT);
+ PRIMITIVE_TYPE_COL("double", DOUBLE);
+ PRIMITIVE_TYPE_COL("date", DATE);
+ PRIMITIVE_TYPE_COL("datetime", DATETIME);
+ PRIMITIVE_TYPE_COL("timestamp", TIMESTAMP);
+ PRIMITIVE_TYPE_COL("interval", INTERVAL);
+ PRIMITIVE_TYPE_COL("tz_date", TZ_DATE);
+ PRIMITIVE_TYPE_COL("tz_datetime", TZ_DATETIME);
+ PRIMITIVE_TYPE_COL("tz_timestamp", TZ_TIMESTAMP);
+ PRIMITIVE_TYPE_COL("string", STRING);
+ PRIMITIVE_TYPE_COL("utf8", UTF8);
+ PRIMITIVE_TYPE_COL("yson", YSON);
+ PRIMITIVE_TYPE_COL("json", JSON);
+ PRIMITIVE_TYPE_COL("uuid", UUID);
+ PRIMITIVE_TYPE_COL("json_document", JSON_DOCUMENT);
+ PRIMITIVE_TYPE_COL("dynumber", DYNUMBER);
+
+ return NThreading::MakeFuture<NConnector::TDescribeTableAsyncResult::value_type>(std::move(result));
+ }
+
+ NConnector::TListSplitsStreamIteratorAsyncResult ListSplits(const NConnector::NApi::TListSplitsRequest& request) {
+ Y_UNUSED(request);
+ try {
+ throw std::runtime_error("ListSplits unimplemented");
+ } catch (...) {
+ return NThreading::MakeErrorFuture<NConnector::TListSplitsStreamIteratorAsyncResult::value_type>(std::current_exception());
+ }
+ }
+
+ NConnector::TReadSplitsStreamIteratorAsyncResult ReadSplits(const NConnector::NApi::TReadSplitsRequest& request) {
+ Y_UNUSED(request);
+ try {
+ throw std::runtime_error("ReadSplits unimplemented");
+ } catch (...) {
+ return NThreading::MakeErrorFuture<NConnector::TReadSplitsStreamIteratorAsyncResult::value_type>(std::current_exception());
+ }
+ }
+};
+
+class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
+public:
+ explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, Generic::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt)
+ : TOptimizeTransformerBase(types, NLog::EComponent::ProviderGeneric, {})
+ , DqSourceSettings_(dqSourceSettings)
+ , DqSourceSettingsWereBuilt_(dqSourceSettingsWereBuilt)
+ {
+ AddHandler(0, TCoRight::Match, "BuildGenericDqSourceSettings", Hndl(&TBuildDqSourceSettingsTransformer::BuildDqSource));
+ }
+
+ TMaybeNode<TExprBase> BuildDqSource(TExprBase node, TExprContext& ctx) {
+ TCoRight right = node.Cast<TCoRight>();
+ TExprBase input = right.Input();
+ if (!input.Maybe<TGenReadTable>()) {
+ return node;
+ }
+ auto genericDataSource = Types->DataSourceMap.find(GenericProviderName);
+ UNIT_ASSERT(genericDataSource != Types->DataSourceMap.end());
+ auto dqIntegration = genericDataSource->second->GetDqIntegration();
+ UNIT_ASSERT(dqIntegration);
+ auto newRead = dqIntegration->WrapRead(TDqSettings(), input.Ptr(), ctx);
+ BuildSettings(newRead, dqIntegration, ctx);
+ return newRead;
+ }
+
+ void BuildSettings(const TExprNode::TPtr& read, IDqIntegration* dqIntegration, TExprContext& ctx) {
+ UNIT_ASSERT(!*DqSourceSettingsWereBuilt_);
+ // Hack: we need DqSource to build settings:
+ // build node, call DqSourceWrap and throw it away
+ TDqSourceWrap wrap(read);
+ auto dqSourceNode =
+ Build<TDqSource>(ctx, read->Pos())
+ .DataSource(wrap.DataSource())
+ .Settings(wrap.Input())
+ .Done()
+ .Ptr();
+ ::google::protobuf::Any settings;
+ TString sourceType;
+ dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType);
+ UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
+ UNIT_ASSERT(settings.Is<Generic::TSource>());
+ settings.UnpackTo(DqSourceSettings_);
+ *DqSourceSettingsWereBuilt_ = true;
+ }
+
+private:
+ Generic::TSource* DqSourceSettings_;
+ bool* DqSourceSettingsWereBuilt_;
+};
+
+struct TPushdownFixture: public NUnitTest::TBaseFixture {
+ TExprContext Ctx;
+ TTypeAnnotationContextPtr TypesCtx;
+
+ TGenericState::TPtr GenericState;
+ std::shared_ptr<TFakeDatabaseResolver> DatabaseResolver = std::make_shared<TFakeDatabaseResolver>();
+ std::shared_ptr<TFakeGenericClient> GenericClient = std::make_shared<TFakeGenericClient>();
+ TIntrusivePtr<IDataProvider> GenericDataSource;
+ TIntrusivePtr<IDataProvider> GenericDataSink;
+
+ TGatewaysConfig GatewaysCfg;
+ IFunctionRegistry::TPtr FunctionRegistry;
+
+ TAutoPtr<IGraphTransformer> Transformer;
+ TAutoPtr<IGraphTransformer> BuildDqSourceSettingsTransformer;
+ Generic::TSource DqSourceSettings;
+ bool DqSourceSettingsWereBuilt = false;
+
+ TExprNode::TPtr InitialExprRoot;
+ TExprNode::TPtr ExprRoot;
+
+ TPushdownFixture() {
+ Init();
+ }
+
+ void Init() {
+ TypesCtx = MakeIntrusive<TTypeAnnotationContext>();
+ TypesCtx->RandomProvider = CreateDeterministicRandomProvider(1);
+
+ FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); // TODO: remove Clone()
+
+ {
+ auto* setting = GatewaysCfg.MutableGeneric()->AddDefaultSettings();
+ setting->SetName("UsePredicatePushdown");
+ setting->SetValue("true");
+
+ auto* cluster = GatewaysCfg.MutableGeneric()->AddClusterMapping();
+ cluster->SetName("test_cluster");
+ cluster->SetKind(NConnector::NApi::POSTGRESQL);
+ cluster->MutableEndpoint()->set_host("host");
+ cluster->MutableEndpoint()->set_port(42);
+ cluster->MutableCredentials()->mutable_basic()->set_username("user");
+ cluster->MutableCredentials()->mutable_basic()->set_password("password");
+ cluster->SetDatabaseName("database");
+ cluster->SetProtocol(NConnector::NApi::NATIVE);
+ }
+
+ GenericState = MakeIntrusive<TGenericState>(
+ TypesCtx.Get(),
+ FunctionRegistry.Get(),
+ DatabaseResolver,
+ GenericClient,
+ GatewaysCfg.GetGeneric());
+
+ GenericDataSource = CreateGenericDataSource(GenericState);
+ GenericDataSink = CreateGenericDataSink(GenericState);
+
+ TypesCtx->AddDataSource(GenericProviderName, GenericDataSource);
+ TypesCtx->AddDataSink(GenericProviderName, GenericDataSink);
+
+ {
+ auto writerFactory = []() { return CreateYsonResultWriter(NYson::EYsonFormat::Binary); };
+ auto cfg = MakeIntrusive<TResultProviderConfig>(*TypesCtx, *FunctionRegistry, IDataProvider::EResultFormat::Yson,
+ ToString((ui32)NYson::EYsonFormat::Binary), writerFactory);
+ auto resultProvider = CreateResultProvider(cfg);
+ TypesCtx->AddDataSink(ResultProviderName, resultProvider);
+ }
+
+ Transformer = TTransformationPipeline(TypesCtx)
+ .AddServiceTransformers()
+ .Add(TExprLogTransformer::Sync("Expr", NLog::EComponent::Core, NLog::ELevel::DEBUG), "LogExpr")
+ .AddPreTypeAnnotation()
+ .AddExpressionEvaluation(*FunctionRegistry)
+ .AddIOAnnotation()
+ .AddTypeAnnotation()
+ .AddPostTypeAnnotation()
+ .Add(TExprLogTransformer::Sync("Expr to optimize", NLog::EComponent::Core, NLog::ELevel::DEBUG), "LogExpr")
+ .AddOptimization(false, false)
+ .Add(TExprLogTransformer::Sync("Optimized expr", NLog::EComponent::Core, NLog::ELevel::DEBUG), "LogExpr")
+ .Build();
+
+ TAutoPtr<IGraphTransformer> buildTransformer = new TBuildDqSourceSettingsTransformer(TypesCtx.Get(), &DqSourceSettings, &DqSourceSettingsWereBuilt);
+ BuildDqSourceSettingsTransformer = TTransformationPipeline(TypesCtx)
+ .AddServiceTransformers()
+ .Add(buildTransformer, "BuildDqSourceSettings")
+ .Add(TExprLogTransformer::Sync("Built settings", NLog::EComponent::Core, NLog::ELevel::DEBUG), "LogExpr")
+ .Build();
+
+ NLog::YqlLogger().ResetBackend(CreateLogBackend("cerr"));
+ NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::TRACE);
+ NLog::YqlLogger().SetComponentLevel(NLog::EComponent::ProviderCommon, NLog::ELevel::TRACE);
+ NLog::YqlLogger().SetComponentLevel(NLog::EComponent::ProviderGeneric, NLog::ELevel::TRACE);
+ }
+
+ void Transform(const TString& program) {
+ Cerr << "Initial program:\n"
+ << program << Endl;
+ TAstParseResult astRes = ParseAst(program);
+ UNIT_ASSERT_C(astRes.IsOk(), astRes.Issues.ToString());
+ UNIT_ASSERT_C(CompileExpr(*astRes.Root, InitialExprRoot, Ctx, nullptr, nullptr), astRes.Issues.ToString());
+
+ ExprRoot = InitialExprRoot;
+ auto status = SyncTransform(*Transformer, ExprRoot, Ctx);
+ UNIT_ASSERT_C(status == IGraphTransformer::TStatus::Ok, Ctx.IssueManager.GetIssues().ToString());
+ }
+
+ void BuildDqSourceSettings() {
+ auto root = ExprRoot;
+ auto status = SyncTransform(*BuildDqSourceSettingsTransformer, root, Ctx);
+ UNIT_ASSERT_C(status == IGraphTransformer::TStatus::Ok, Ctx.IssueManager.GetIssues().ToString());
+ UNIT_ASSERT(DqSourceSettingsWereBuilt);
+ Cerr << "Dq source filter settings:\n"
+ << DqSourceSettings.select().where().Utf8DebugString() << Endl;
+ }
+
+ const NConnector::NApi::TPredicate& BuildProtoFilterFromProgram(const TString& program) {
+ Transform(program);
+ BuildDqSourceSettings();
+ return DqSourceSettings.select().where().filter_typed();
+ }
+
+ static TString ProgramFromLambda(const TString& lambdaText) {
+ return fmt::format(
+ ProgramTemplate,
+ "lambda_text"_a = lambdaText);
+ }
+
+ const NConnector::NApi::TPredicate& BuildProtoFilterFromLambda(const TString& lambdaText) {
+ return BuildProtoFilterFromProgram(ProgramFromLambda(lambdaText));
+ }
+
+ void AssertFilter(const TString& lambdaText, const TString& filterText) {
+ const auto& filter = BuildProtoFilterFromLambda(lambdaText);
+ NConnector::NApi::TPredicate expectedFilter;
+ UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(filterText, &expectedFilter));
+ UNIT_ASSERT_STRINGS_EQUAL(filter.Utf8DebugString(), expectedFilter.Utf8DebugString());
+ }
+
+ void AssertNoPush(const TString& lambdaText) {
+ BuildProtoFilterFromLambda(lambdaText);
+ UNIT_ASSERT(!DqSourceSettings.select().where().has_filter_typed());
+ }
+};
+
+Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
+ Y_UNIT_TEST(NoFilter) {
+ AssertNoPush(R"ast((Bool '"true"))ast"); // Note that R"ast()ast" is empty string!
+ }
+
+ Y_UNIT_TEST(Equal) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ R"ast((== (Member $row '"col_int16") (Int16 '42)))ast",
+ R"proto(
+ comparison {
+ operation: EQ
+ left_value {
+ column: "col_int16"
+ }
+ right_value {
+ typed_value {
+ type {
+ type_id: INT16
+ }
+ value {
+ int32_value: 42
+ }
+ }
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(NotEqual) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ R"ast(
+ (Coalesce
+ (!= (Member $row '"col_optional_uint64") (Uint64 '42))
+ (Bool '"true")
+ )
+ )ast",
+ R"proto(
+ comparison {
+ operation: NE
+ left_value {
+ column: "col_optional_uint64"
+ }
+ right_value {
+ typed_value {
+ type {
+ type_id: UINT64
+ }
+ value {
+ uint64_value: 42
+ }
+ }
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(PartialAnd) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ // division must be excluded from pushdown, but the other parts of "And" statement - not
+ R"ast(
+ (Coalesce
+ (And
+ (Or
+ (Not (Member $row '"col_bool"))
+ (== (* (Member $row '"col_int64") (Member $row '"col_int32")) (Int64 '42))
+ )
+ (< (/ (Int64 '42) (Member $row '"col_int64")) (Int64 '10))
+ (>= (Member $row '"col_uint32") (- (Uint32 '15) (Uint32 '1)))
+ )
+ (Bool '"true")
+ )
+ )ast",
+ R"proto(
+ conjunction {
+ operands {
+ disjunction {
+ operands {
+ negation {
+ operand {
+ bool_expression: {
+ value {
+ column: "col_bool"
+ }
+ }
+ }
+ }
+ }
+ operands {
+ comparison {
+ operation: EQ
+ left_value {
+ arithmetical_expression {
+ operation: MUL
+ left_value {
+ column: "col_int64"
+ }
+ right_value {
+ column: "col_int32"
+ }
+ }
+ }
+ right_value {
+ typed_value {
+ type {
+ type_id: INT64
+ }
+ value {
+ int64_value: 42
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ operands {
+ comparison {
+ operation: GE
+ left_value {
+ column: "col_uint32"
+ }
+ right_value {
+ arithmetical_expression {
+ operation: SUB
+ left_value {
+ typed_value {
+ type {
+ type_id: UINT32
+ }
+ value {
+ uint32_value: 15
+ }
+ }
+ }
+ right_value {
+ typed_value {
+ type {
+ type_id: UINT32
+ }
+ value {
+ uint32_value: 1
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(PartialAndOneBranchPushdownable) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ // division must be excluded from pushdown, but the other part of "And" statement - not.
+ // So we expect only one branch of "And" to be pushed down.
+ R"ast(
+ (Coalesce
+ (And
+ (< (/ (Int64 '42) (Member $row '"col_int64")) (Int64 '10))
+ (>= (Member $row '"col_uint32") (Uint32 '15))
+ )
+ (Bool '"true")
+ )
+ )ast",
+ R"proto(
+ comparison {
+ operation: GE
+ left_value {
+ column: "col_uint32"
+ }
+ right_value {
+ typed_value {
+ type {
+ type_id: UINT32
+ }
+ value {
+ uint32_value: 15
+ }
+ }
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(NotNull) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ R"ast(
+ (Exists
+ (Member $row '"col_optional_utf8")
+ )
+ )ast",
+ R"proto(
+ is_not_null {
+ value {
+ column: "col_optional_utf8"
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(NotNullForDatetime) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ R"ast(
+ (Exists
+ (Member $row '"col_optional_tz_datetime")
+ )
+ )ast",
+ R"proto(
+ is_not_null {
+ value {
+ column: "col_optional_tz_datetime"
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(IsNull) {
+ AssertFilter(
+ // Note that R"ast()ast" is empty string!
+ R"ast(
+ (Not
+ (Exists
+ (Member $row '"col_optional_utf8")
+ )
+ )
+ )ast",
+ R"proto(
+ is_null {
+ value {
+ column: "col_optional_utf8"
+ }
+ }
+ )proto");
+ }
+
+ Y_UNIT_TEST(StringFieldsNotSupported) {
+ AssertNoPush(
+ // Note that R"ast()ast" is empty string!
+ R"ast(
+ (Coalesce
+ (==
+ (Member $row '"col_utf8")
+ (Member $row '"col_optional_utf8")
+ )
+ (Bool '"true")
+ )
+ )ast");
+ }
+
+ Y_UNIT_TEST(StringFieldsNotSupported2) {
+ AssertNoPush(
+ // Note that R"ast()ast" is empty string!
+ R"ast(
+ (!=
+ (Member $row '"col_string")
+ (String '"value")
+ )
+ )ast");
+ }
+}
diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/ya.make b/ydb/library/yql/providers/generic/provider/ut/pushdown/ya.make
new file mode 100644
index 0000000000..7bd5d3b934
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/ya.make
@@ -0,0 +1,27 @@
+UNITTEST_FOR(ydb/library/yql/providers/generic/provider)
+
+SRCS(
+ pushdown_ut.cpp
+)
+
+PEERDIR(
+ contrib/libs/fmt
+ library/cpp/random_provider
+ ydb/library/yql/ast
+ ydb/library/yql/core
+ ydb/library/yql/core/services
+ ydb/library/yql/dq/expr_nodes
+ ydb/library/yql/minikql
+ ydb/library/yql/providers/common/db_id_async_resolver
+ ydb/library/yql/providers/generic/expr_nodes
+ ydb/library/yql/providers/result/provider
+ ydb/library/yql/public/udf/service/stub
+ ydb/library/yql/sql
+ ydb/library/yql/sql/pg_dummy
+)
+
+SIZE(SMALL)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/generic/provider/ut/ya.make b/ydb/library/yql/providers/generic/provider/ut/ya.make
new file mode 100644
index 0000000000..20cde2b37e
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/ut/ya.make
@@ -0,0 +1,3 @@
+RECURSE_FOR_TESTS(
+ pushdown
+)
diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make
index 21c6fd2b18..ad0e77291e 100644
--- a/ydb/library/yql/providers/generic/provider/ya.make
+++ b/ydb/library/yql/providers/generic/provider/ya.make
@@ -54,3 +54,5 @@ PEERDIR(
)
END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index c61b5007c5..8befd4fd7c 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -291,7 +291,7 @@ namespace NYql {
}
void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) {
- const auto dateTimeFormat = *State_->Configuration->DateTimeFormat.Get();
+ const TString dateTimeFormat = State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat);
if (dateTimeFormat == "string") {
request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::STRING_FORMAT);
} else if (dateTimeFormat == "YQL") {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
index 3620d5e14e..5d16fb6ee1 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
@@ -21,16 +21,18 @@ namespace NYql {
namespace {
struct TPushdownSettings: public NPushdown::TSettings {
- TPushdownSettings() {
+ TPushdownSettings()
+ : NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
+ {
using EFlag = NPushdown::TSettings::EFeatureFlag;
- Enable(EFlag::LikeOperator | EFlag::JsonQueryOperators | EFlag::JsonExistsOperator);
+ Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions);
}
};
class TGenericPhysicalOptProposalTransformer: public TOptimizeTransformerBase {
public:
TGenericPhysicalOptProposalTransformer(TGenericState::TPtr state)
- : TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderYdb, {})
+ : TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderGeneric, {})
, State_(state)
{
#define HNDL(name) "PhysicalOptimizer-" #name, Hndl(&TGenericPhysicalOptProposalTransformer::name)
@@ -102,13 +104,6 @@ namespace NYql {
return node;
}
- static NPushdown::TPredicateNode BuildEmptyPredicate(TExprContext& ctx, TPositionHandle pos) {
- auto emptyPredicate = Build<TCoBool>(ctx, pos).Literal().Build("true").Done();
- NPushdown::TPredicateNode p;
- p.ExprNode = emptyPredicate;
- return p;
- }
-
static NPushdown::TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree,
TExprContext& ctx, TPositionHandle pos)
{
@@ -117,7 +112,7 @@ namespace NYql {
}
if (predicateTree.Op != NPushdown::EBoolOp::And) {
- return BuildEmptyPredicate(ctx, pos);
+ return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer
}
std::vector<NPushdown::TPredicateNode> pushable;
@@ -137,7 +132,7 @@ namespace NYql {
YQL_CLOG(TRACE, ProviderGeneric) << "Push filter. Initial filter lambda: " << NCommon::ExprToPrettyString(ctx, lambda.Ref());
auto maybeOptionalIf = lambda.Body().Maybe<TCoOptionalIf>();
- if (!maybeOptionalIf.IsValid()) {
+ if (!maybeOptionalIf.IsValid()) { // Nothing to push
return {};
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
index 26b0f3a637..6bc2abb56f 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
@@ -2,6 +2,8 @@
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
+#include <util/string/cast.h>
+
namespace NYql {
using namespace NNodes;
@@ -18,41 +20,94 @@ namespace NYql {
return true;
}
- bool SerializeInt32(const TCoInt32& constant, TExpression* proto, TStringBuilder&) {
- auto* value = proto->mutable_typed_value();
- auto* t = value->mutable_type();
- t->set_type_id(Ydb::Type::INT32);
- auto* v = value->mutable_value();
- v->set_int32_value(FromString<i32>(constant.Literal()));
- return true;
+ template <class T>
+ T Cast(const TStringBuf& from) {
+ return FromString<T>(from);
}
+ // Special convertation from TStringBuf to TString
+ template <>
+ TString Cast<TString>(const TStringBuf& from) {
+ return TString(from);
+ }
+
+#define MATCH_ATOM(AtomType, ATOM_ENUM, proto_name, cpp_type) \
+ if (auto atom = expression.Maybe<Y_CAT(TCo, AtomType)>()) { \
+ auto* value = proto->mutable_typed_value(); \
+ auto* t = value->mutable_type(); \
+ t->set_type_id(Ydb::Type::ATOM_ENUM); \
+ auto* v = value->mutable_value(); \
+ v->Y_CAT(Y_CAT(set_, proto_name), _value)(Cast<cpp_type>(atom.Cast().Literal())); \
+ return true; \
+ }
+
+#define MATCH_ARITHMETICAL(OpType, OP_ENUM) \
+ if (auto maybeExpr = expression.Maybe<Y_CAT(TCo, OpType)>()) { \
+ auto expr = maybeExpr.Cast(); \
+ auto* exprProto = proto->mutable_arithmetical_expression(); \
+ exprProto->set_operation(TExpression::TArithmeticalExpression::OP_ENUM); \
+ return SerializeExpression(expr.Left(), exprProto->mutable_left_value(), arg, err) && SerializeExpression(expr.Right(), exprProto->mutable_right_value(), arg, err); \
+ }
+
bool SerializeExpression(const TExprBase& expression, TExpression* proto, const TCoArgument& arg, TStringBuilder& err) {
if (auto member = expression.Maybe<TCoMember>()) {
return SerializeMember(member.Cast(), proto, arg, err);
}
// data
- if (auto int32Atom = expression.Maybe<TCoInt32>()) {
- return SerializeInt32(int32Atom.Cast(), proto, err);
+ MATCH_ATOM(Int8, INT8, int32, i8);
+ MATCH_ATOM(Uint8, UINT8, uint32, ui8);
+ MATCH_ATOM(Int16, INT16, int32, i16);
+ MATCH_ATOM(Uint16, UINT16, uint32, ui16);
+ MATCH_ATOM(Int32, INT32, int32, i32);
+ MATCH_ATOM(Uint32, UINT32, uint32, ui32);
+ MATCH_ATOM(Int64, INT64, int64, i64);
+ MATCH_ATOM(Uint64, UINT64, uint64, ui64);
+ MATCH_ATOM(Float, FLOAT, float, float);
+ MATCH_ATOM(Double, DOUBLE, double, double);
+ MATCH_ATOM(String, STRING, bytes, TString);
+ MATCH_ATOM(Utf8, UTF8, text, TString);
+ MATCH_ARITHMETICAL(Minus, SUB);
+ MATCH_ARITHMETICAL(Plus, ADD);
+ MATCH_ARITHMETICAL(Mul, MUL);
+
+ if (auto maybeNull = expression.Maybe<TCoNull>()) {
+ proto->mutable_null();
+ return true;
}
err << "unknown expression: " << expression.Raw()->Content();
return false;
}
+#undef MATCH_ATOM
+
+#define EXPR_NODE_TO_COMPARE_TYPE(TExprNodeType, COMPARE_TYPE) \
+ if (!opMatched && compare.Maybe<TExprNodeType>()) { \
+ opMatched = true; \
+ proto->set_operation(TPredicate::TComparison::COMPARE_TYPE); \
+ }
+
bool SerializeCompare(const TCoCompare& compare, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err) {
TPredicate::TComparison* proto = predicateProto->mutable_comparison();
- if (compare.Maybe<TCoCmpEqual>()) {
- proto->set_operation(TPredicate::TComparison::EQ);
- }
- if (proto->operation() == TPredicate::TComparison::OPERATION_UNSPECIFIED) { // Unknown operation
+ bool opMatched = false;
+
+ EXPR_NODE_TO_COMPARE_TYPE(TCoCmpEqual, EQ);
+ EXPR_NODE_TO_COMPARE_TYPE(TCoCmpNotEqual, NE);
+ EXPR_NODE_TO_COMPARE_TYPE(TCoCmpLess, L);
+ EXPR_NODE_TO_COMPARE_TYPE(TCoCmpLessOrEqual, LE);
+ EXPR_NODE_TO_COMPARE_TYPE(TCoCmpGreater, G);
+ EXPR_NODE_TO_COMPARE_TYPE(TCoCmpGreaterOrEqual, GE);
+
+ if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) {
err << "unknown operation: " << compare.Raw()->Content();
return false;
}
return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err);
}
+#undef EXPR_NODE_TO_COMPARE_TYPE
+
bool SerializeCoalesce(const TCoCoalesce& coalesce, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
auto predicate = coalesce.Predicate();
if (auto compare = predicate.Maybe<TCoCompare>()) {
@@ -63,6 +118,46 @@ namespace NYql {
return false;
}
+ bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err);
+
+ bool SerializeExists(const TCoExists& exists, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, bool withNot = false) {
+ auto* expressionProto = withNot ? proto->mutable_is_null()->mutable_value() : proto->mutable_is_not_null()->mutable_value();
+ return SerializeExpression(exists.Optional(), expressionProto, arg, err);
+ }
+
+ bool SerializeAnd(const TCoAnd& andExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
+ auto* dstProto = proto->mutable_conjunction();
+ for (const auto& child : andExpr.Ptr()->Children()) {
+ if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool SerializeOr(const TCoOr& orExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
+ auto* dstProto = proto->mutable_disjunction();
+ for (const auto& child : orExpr.Ptr()->Children()) {
+ if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool SerializeNot(const TCoNot& notExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
+ // Special case: (Not (Exists ...))
+ if (auto exists = notExpr.Value().Maybe<TCoExists>()) {
+ return SerializeExists(exists.Cast(), proto, arg, err, true);
+ }
+ auto* dstProto = proto->mutable_negation();
+ return SerializePredicate(notExpr.Value(), dstProto->mutable_operand(), arg, err);
+ }
+
+ bool SerializeMember(const TCoMember& member, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
+ return SerializeMember(member, proto->mutable_bool_expression()->mutable_value(), arg, err);
+ }
+
bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
if (auto compare = predicate.Maybe<TCoCompare>()) {
return SerializeCompare(compare.Cast(), proto, arg, err);
@@ -70,12 +165,27 @@ namespace NYql {
if (auto coalesce = predicate.Maybe<TCoCoalesce>()) {
return SerializeCoalesce(coalesce.Cast(), proto, arg, err);
}
+ if (auto andExpr = predicate.Maybe<TCoAnd>()) {
+ return SerializeAnd(andExpr.Cast(), proto, arg, err);
+ }
+ if (auto orExpr = predicate.Maybe<TCoOr>()) {
+ return SerializeOr(orExpr.Cast(), proto, arg, err);
+ }
+ if (auto notExpr = predicate.Maybe<TCoNot>()) {
+ return SerializeNot(notExpr.Cast(), proto, arg, err);
+ }
+ if (auto member = predicate.Maybe<TCoMember>()) {
+ return SerializeMember(member.Cast(), proto, arg, err);
+ }
+ if (auto exists = predicate.Maybe<TCoExists>()) {
+ return SerializeExists(exists.Cast(), proto, arg, err);
+ }
err << "unknown predicate: " << predicate.Raw()->Content();
return false;
}
- } // namespace
+ }
bool IsEmptyFilterPredicate(const TCoLambda& lambda) {
auto maybeBool = lambda.Body().Maybe<TCoBool>();
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
index 32660b339c..fe79fb197b 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
@@ -7,6 +7,8 @@
namespace NYql {
+ const TString TGenericSettings::TDefault::DateTimeFormat = "string";
+
TGenericConfiguration::TGenericConfiguration() {
REGISTER_SETTING(*this, UsePredicatePushdown);
REGISTER_SETTING(*this, DateTimeFormat);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index 62ec0a6da8..a58f4c31c3 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -11,12 +11,12 @@ namespace NYql {
using TConstPtr = std::shared_ptr<const TGenericSettings>;
NCommon::TConfSetting<bool, false> UsePredicatePushdown;
+ NCommon::TConfSetting<TString, false> DateTimeFormat;
struct TDefault {
static constexpr bool UsePredicatePushdown = false;
+ static const TString DateTimeFormat; // = "string"
};
-
- NCommon::TConfSetting<TString, false> DateTimeFormat;
};
struct TGenericConfiguration: public TGenericSettings, public NCommon::TSettingDispatcher {