diff options
author | Slusarenko Igor <[email protected]> | 2025-10-13 14:09:59 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-10-13 14:09:59 +0300 |
commit | b050143e845cbde7257bfc320c805fa8772b758f (patch) | |
tree | d5246e14f86b7f131ae3f8562288ec9f96968e33 | |
parent | 685cab53a815e897c59717da19b9d39c62b010fc (diff) |
Move listsplit after physical opt (#22085)
22 files changed, 1567 insertions, 167 deletions
diff --git a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp index 2e2c8e514d0..a5c369de8ec 100644 --- a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp +++ b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp @@ -85,7 +85,9 @@ public: } TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { - return QueryBlockTransformer->ApplyAsyncChanges(input, output, ctx); + YQL_ENSURE(CurrentBlock < input->ChildrenSize()); + output = input; + return QueryBlockTransformer->ApplyAsyncChanges(input->Child(CurrentBlock), output->ChildRef(CurrentBlock), ctx); } private: diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp index b85a14eac79..f5e152aca48 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp @@ -699,11 +699,14 @@ public: .TypeMappingSettings(typeMappingSettings); auto listSplitsBuilder = mockClient->ExpectListSplits(); - listSplitsBuilder + auto fillListSplitExpectation = listSplitsBuilder .ValidateArgs(settings.ValidateListSplitsArgs) .Select() .DataSourceInstance(GetMockConnectorSourceInstance()) - .Table(settings.TableName); + .Table(settings.TableName) + .What(); + + FillMockConnectorRequestColumns(fillListSplitExpectation, settings.Columns); for (ui64 i = 0; i < settings.DescribeCount; ++i) { auto responseBuilder = describeTableBuilder.Response(); @@ -1944,6 +1947,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { )); { // Prepare connector mock + const std::vector<TColumn> columns = { {"fqdn", Ydb::Type::STRING}, {"payload", Ydb::Type::STRING} @@ -1952,7 +1956,10 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { .TableName = ydbTable, .Columns = columns, .DescribeCount = 2, - .ListSplitsCount = 2 + // For stream queries type annotation is executed twice, but + // now List Split is done after type annotation optimization. + // That is why only single call to List Split is expected. + .ListSplitsCount = 1 }); const std::vector<std::string> fqdnColumn = {"host1.example.com", "host2.example.com", "host3.example.com"}; @@ -2054,7 +2061,9 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { .TableName = ydbTable, .Columns = columns, .DescribeCount = 2, - .ListSplitsCount = 5, + // Now List Split is done after type annotation, that is the + // reason why this value equal to 4 not 5 + .ListSplitsCount = 4, .ValidateListSplitsArgs = false }); diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index 316e3524188..32a3e8b82e0 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -145,6 +145,65 @@ namespace NKikimr::NKqp { return databaseAsyncResolverMock; } + /// + /// Fixture that prepares mocks and services for a provider. + /// + /// TODO: + /// Make it reusable, currently it fails if multiple + /// expects are applied to mock + /// + class TQueryExecutorFixture : public NUnitTest::TBaseFixture { + public: + TQueryExecutorFixture(EProviderType providerType) + : DataSourceInstance(MakeDataSourceInstance(providerType)) + , ClientMock(std::make_shared<TConnectorClientMock>()) + { + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); + auto appConfig = CreateDefaultAppConfig(); + auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); + + Kikimr = MakeKikimrRunner( + false, + ClientMock, + databaseAsyncResolverMock, + appConfig, + s3ActorsFactory, + {.CredentialsFactory = CreateCredentialsFactory()} + ); + + CreateExternalDataSource(providerType, Kikimr); + QueryClient = Kikimr->GetQueryClient(); + } + + TQueryClient GetQueryClient() { + return *QueryClient; + } + + TAsyncExecuteQueryResult ExecuteQuery(const TString& query) { + return GetQueryClient() + .ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()); + } + + NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script) { + return GetQueryClient() + .ExecuteScript(script); + } + + TConnectorClientMock::TSelectBuilder<> GetSelectBuilder() { + TConnectorClientMock::TSelectBuilder<> builder; + builder.DataSourceInstance(DataSourceInstance); + return builder; + } + + public: + const NYql::TGenericDataSourceInstance DataSourceInstance; + std::shared_ptr<TConnectorClientMock> ClientMock; + + protected: + std::shared_ptr<TKikimrRunner> Kikimr; + std::optional<TQueryClient> QueryClient; + }; + Y_UNIT_TEST_SUITE(GenericFederatedQuery) { void TestSelectAllFields(EProviderType providerType) { // prepare mock @@ -163,6 +222,9 @@ namespace NKikimr::NKqp { clientMock->ExpectListSplits() .Select() .DataSourceInstance(dataSourceInstance) + .What() + .Column("col1", Ydb::Type::UINT16) + .Done() .Done() .Result() .AddResponse(NewSuccess()) @@ -284,6 +346,8 @@ namespace NKikimr::NKqp { clientMock->ExpectListSplits() .Select() .DataSourceInstance(dataSourceInstance) + .What() + .Done() .Done() .Result() .AddResponse(NewSuccess()) @@ -399,6 +463,8 @@ namespace NKikimr::NKqp { clientMock->ExpectListSplits() .Select() .DataSourceInstance(dataSourceInstance) + .What() + .Done() .Done() .Result() .AddResponse(NewSuccess()) @@ -489,104 +555,122 @@ namespace NKikimr::NKqp { TestSelectCount(EProviderType::IcebergHadoopToken); } - void TestFilterPushdown(EProviderType providerType) { - // prepare mock - auto clientMock = std::make_shared<TConnectorClientMock>(); - - const NYql::TGenericDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); - - // clang-format off - const NApi::TSelect selectInListSplits = TConnectorClientMock::TSelectBuilder<>() - .DataSourceInstance(dataSourceInstance).GetResult(); - - const NApi::TSelect selectInReadSplits = TConnectorClientMock::TSelectBuilder<>() - .DataSourceInstance(dataSourceInstance) + /// + /// Test a filter pushdown for a provider + /// + /// @param[in] providerType Provider's type + /// @param[in] where Where clause that will be appended to a sql query + /// @param[in] expectedWhere Where clause that will be expected in a list split and read split requests + /// + void TestFilterPushdown(EProviderType providerType, const TString& where, NApi::TSelect::TWhere& expectedWhere) { + auto f = std::make_shared<TQueryExecutorFixture>(providerType); + auto expectedSelect = f->GetSelectBuilder() .What() - .NullableColumn("data_column", Ydb::Type::STRING) - .NullableColumn("filtered_column", Ydb::Type::INT32) - .Done() - .Where() - .Filter() - .Equal() - .Column("filtered_column") - .Value<i32>(42) - .Done() - .Done() + .NullableColumn("colDate", Ydb::Type::DATE) + .NullableColumn("colInt32", Ydb::Type::INT32) + .NullableColumn("colString", Ydb::Type::STRING) .Done() + .Where(expectedWhere) .GetResult(); - // clang-format on // step 1: DescribeTable - // clang-format off - clientMock->ExpectDescribeTable() - .DataSourceInstance(dataSourceInstance) + f->ClientMock->ExpectDescribeTable() + .DataSourceInstance(f->DataSourceInstance) .TypeMappingSettings(MakeTypeMappingSettings(NYql::NConnector::NApi::STRING_FORMAT)) .Response() - .NullableColumn("filtered_column", Ydb::Type::INT32) - .NullableColumn("data_column", Ydb::Type::STRING); - // clang-format on + .NullableColumn("colDate", Ydb::Type::DATE) + .NullableColumn("colInt32", Ydb::Type::INT32) + .NullableColumn("colString", Ydb::Type::STRING); // step 2: ListSplits - // clang-format off - clientMock->ExpectListSplits() - .Select(selectInListSplits) + f->ClientMock->ExpectListSplits() + .Select(expectedSelect) .Result() .AddResponse(NewSuccess()) .Description("some binary description") - .Select(selectInReadSplits); - // clang-format on + .Select(expectedSelect); // step 3: ReadSplits - // Return data such that it contains values not satisfying the filter conditions. - // Then check that, despite that connector reads additional data, - // our generic provider then filters it out. - std::vector<std::string> colData = {"Filtered text", "Text"}; - std::vector<i32> filterColumnData = {42, 24}; - // clang-format off - clientMock->ExpectReadSplits() + std::vector<std::string> colString = {"Filtered text", "Text"}; + std::vector<ui16> colDate = {20326, 20329}; + std::vector<i32> colInt32 = {42, 24}; + + f->ClientMock->ExpectReadSplits() .Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL) .Split() .Description("some binary description") - .Select(selectInReadSplits) + .Select(expectedSelect) .Done() .Result() .AddResponse(MakeRecordBatch( - MakeArray<arrow::BinaryBuilder>("data_column", colData, arrow::binary()), - MakeArray<arrow::Int32Builder>("filtered_column", filterColumnData, arrow::int32())), + MakeArray<arrow::UInt16Builder>("colDate", colDate, arrow::uint16()), + MakeArray<arrow::Int32Builder>("colInt32", colInt32, arrow::int32()), + MakeArray<arrow::BinaryBuilder>("colString", colString, arrow::binary())), NewSuccess()); - // clang-format on - - // prepare database resolver mock - auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); - - // run test - auto appConfig = CreateDefaultAppConfig(); - auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); - auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory, - {.CredentialsFactory = CreateCredentialsFactory()}); - - CreateExternalDataSource(providerType, kikimr); const TString query = fmt::format( R"( PRAGMA generic.UsePredicatePushdown="true"; - SELECT data_column FROM {data_source_name}.{table_name} WHERE filtered_column = 42; + SELECT colDate, colInt32, colString FROM {data_source_name}.{table_name} WHERE {table_where}; )", "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, - "table_name"_a = DEFAULT_TABLE); + "table_name"_a = DEFAULT_TABLE, + "table_where"_a = where + ); - auto db = kikimr->GetQueryClient(); - auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync(); + auto queryResult = f->ExecuteQuery(query).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString()); + // Check a query result TResultSetParser resultSet(queryResult.GetResultSetParser(0)); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); - // check every row - // Check that, despite returning nonfiltered data in connector, response will be correct - std::vector<TMaybe<TString>> result = {"Filtered text"}; // Only data satisfying filter conditions - MATCH_RESULT_WITH_INPUT(result, resultSet, GetOptionalString); + // Check values for the query result + std::vector<std::optional<TInstant>> colDateResults = {TInstant::Days(20326)}; + std::vector<std::optional<int>> colInt32Result = {42}; + std::vector<std::optional<TString>> colStringResult = {"Filtered text"}; + + for (size_t i = 0; i < colDateResults.size(); ++i) { + resultSet.TryNextRow(); + + MATCH_OPT_RESULT_WITH_VAL_IDX(colDateResults[i], resultSet, GetOptionalDate, 0); + MATCH_OPT_RESULT_WITH_VAL_IDX(colInt32Result[i], resultSet, GetOptionalInt32, 1); + MATCH_OPT_RESULT_WITH_VAL_IDX(colStringResult[i], resultSet, GetOptionalString, 2); + } + } + + /// + /// Test a filter pushdown for a provider + /// + /// @param[in] providerType Provider's type + /// + void TestFilterPushdown(EProviderType providerType) { + using namespace NYql::NConnector::NTest; + + auto expectedWhereInt = TConnectorClientMock::TWhereBuilder<>() + .Filter().Equal() + .Column("colInt32") + .Value<i32>(42) + .Done() + .Done() + .GetResult(); + + TestFilterPushdown(providerType, "colInt32 = 42", expectedWhereInt); + TestFilterPushdown(providerType, "colInt32 = EvaluateExpr(44 - 2)", expectedWhereInt); + TestFilterPushdown(providerType, "colInt32 = 44 - 2", expectedWhereInt); + + auto expectedWhereDate = TConnectorClientMock::TWhereBuilder<>() + .Filter().Equal() + .Column("colDate") + .Value<ui32>(20326, ::Ydb::Type::DATE) + .Done() + .Done() + .GetResult(); + + TestFilterPushdown(providerType, "colDate = Date('2025-08-26')", expectedWhereDate); + TestFilterPushdown(providerType, "colDate = EvaluateExpr(Date('2025-08-27') - Interval(\"P1D\"))", expectedWhereDate); + TestFilterPushdown(providerType, "colDate = Date('2025-08-27') - Interval(\"P1D\")", expectedWhereDate); } Y_UNIT_TEST(PostgreSQLFilterPushdown) { diff --git a/ydb/library/yql/providers/common/pushdown/collection.cpp b/ydb/library/yql/providers/common/pushdown/collection.cpp index a8770902431..9cbfef2beb8 100644 --- a/ydb/library/yql/providers/common/pushdown/collection.cpp +++ b/ydb/library/yql/providers/common/pushdown/collection.cpp @@ -201,6 +201,9 @@ private: node.Maybe<TCoUint64>()) { return true; } + if (Settings.IsEnabled(EFlag::DateCtor) && node.Maybe<TCoDate>()) { + return true; + } if (Settings.IsEnabled(EFlag::TimestampCtor) && node.Maybe<TCoTimestamp>()) { return true; } diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h index cdd4c4fd984..66aa009b169 100644 --- a/ydb/library/yql/providers/common/pushdown/settings.h +++ b/ydb/library/yql/providers/common/pushdown/settings.h @@ -46,6 +46,7 @@ struct TSettings { MinMax = 1 << 27, NonDeterministic = 1 << 28, DecimalCtor = 1 << 29, + DateCtor = 1 << 30, }; explicit TSettings(NLog::EComponent logComponent) diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp index ed92d42b65a..fe0c446b68e 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp @@ -29,6 +29,12 @@ namespace NYql::NConnector::NTest { DEFINE_SIMPLE_TYPE_SETTER(i64, INT64, int64_value); DEFINE_SIMPLE_TYPE_SETTER(ui64, UINT64, uint64_value); + template <> + void SetValue(const ui32& value, Ydb::TypedValue* proto, const ::Ydb::Type::PrimitiveTypeId& typeId, bool optional) { + *proto->mutable_type() = MakeYdbType(typeId, optional); + proto->mutable_value()->Y_CAT(set_, uint32_value)(value); + } + void CreatePostgreSQLExternalDataSource( const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr, const TString& dataSourceName, diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h index 897a4a156ab..5cc999ce544 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -63,9 +63,20 @@ namespace NYql::NConnector::NTest { } MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") { - Cerr << "CRAB Expected: " << expected.DebugString() << Endl; - Cerr << "CRAB Actual: " << arg.DebugString() << Endl; - return google::protobuf::util::MessageDifferencer::Equals(arg, expected); + Cerr << "GENERIC-CONNECTOR-MOCK Expected: " << expected.DebugString() << Endl; + Cerr << "GENERIC-CONNECTOR-MOCK Actual: " << arg.DebugString() << Endl; + + google::protobuf::util::MessageDifferencer differencer; + TString differences; + differencer.ReportDifferencesToString(&differences); + + bool result = differencer.Compare(arg, expected); + + if (!result) { + Cerr << "GENERIC-CONNECTOR-MOCK Differences:" << Endl << differences << Endl; + } + + return result; } MATCHER_P(RequestRelaxedMatcher, expected, "") { @@ -73,14 +84,26 @@ namespace NYql::NConnector::NTest { return true; } -#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \ - { \ - for (const auto& val : INPUT) { \ - UNIT_ASSERT(RESULT_SET.TryNextRow()); \ - UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(0).GETTER(), val); \ - } \ +#define MATCH_OPT_RESULT_WITH_VAL_IDX(VAL, RESULT_SET, GETTER, INDEX) \ + { \ + auto r = RESULT_SET.ColumnParser(INDEX).GETTER(); \ + UNIT_ASSERT_VALUES_EQUAL(r.has_value(), VAL.has_value()); \ + if (r.has_value()) { \ + UNIT_ASSERT_VALUES_EQUAL(*r, *VAL); \ + } \ } +#define MATCH_RESULT_WITH_INPUT_IDX(INPUT, RESULT_SET, GETTER, INDEX) \ + { \ + for (const auto& val : INPUT) { \ + UNIT_ASSERT(RESULT_SET.TryNextRow()); \ + UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(INDEX).GETTER(), val); \ + } \ + } + +#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER)\ + MATCH_RESULT_WITH_INPUT_IDX(INPUT, RESULT_SET, GETTER, 0) + // Make arrow array for one column. // Returns field for schema and array with data. // Designed for call with MakeRecordBatch function. @@ -129,6 +152,10 @@ namespace NYql::NConnector::NTest { template <class T> void SetSimpleValue(const T& value, Ydb::TypedValue* proto, bool optional = false); + template <class T> + void SetValue(const T& value, Ydb::TypedValue* proto, + const ::Ydb::Type::PrimitiveTypeId& typeId, bool optional = false); + template <class TParent> struct TWithParentBuilder { explicit TWithParentBuilder(TParent* parent) @@ -426,11 +453,23 @@ namespace NYql::NConnector::NTest { } template <class T> + TBuilder& Value(const T& value, const ::Ydb::Type::PrimitiveTypeId& typeId) { + SetValue(value, this->Result_->mutable_typed_value(), typeId); + return *this; + } + + template <class T> TBuilder& OptionalValue(const T& value) { SetSimpleValue(value, this->Result_->mutable_typed_value(), true); return *this; } + template <class T> + TBuilder& OptionalValue(const T& value, const ::Ydb::Type::PrimitiveTypeId& typeId) { + SetValue(value, this->Result_->mutable_typed_value(), typeId, true); + return *this; + } + void FillWithDefaults() { } }; @@ -481,10 +520,18 @@ namespace NYql::NConnector::NTest { return Arg().Value(value).Done(); } + TBuilder& Value(const auto& value, const ::Ydb::Type::PrimitiveTypeId& typeId) { + return Arg().Value(value, typeId).Done(); + } + TBuilder& OptionalValue(const auto& value) { return Arg().OptionalValue(value).Done(); } + TBuilder& OptionalValue(const auto& value, const ::Ydb::Type::PrimitiveTypeId& typeId) { + return Arg().OptionalValue(value, typeId).Done(); + } + void FillWithDefaults() { } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp index 998be041016..f6968585dbc 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp @@ -110,4 +110,5 @@ namespace NYql::NConnector { return res->type(); } + } // namespace NYql::NConnector diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.h b/ydb/library/yql/providers/generic/connector/libcpp/utils.h index 8f86ae643c7..7cfbb16a9b9 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/utils.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.h @@ -13,5 +13,4 @@ namespace NYql::NConnector { std::shared_ptr<arrow::RecordBatch> ReadSplitsResponseToArrowRecordBatch(const NApi::TReadSplitsResponse& resp); Ydb::Type GetColumnTypeByName(const NApi::TSchema& schema, const TString& name); - } // namespace NYql::NConnector diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index 5f28b0a2558..2a46177e42d 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -7,8 +7,12 @@ SRCS( yql_generic_datasink_type_ann.cpp yql_generic_datasource.cpp yql_generic_datasource_type_ann.cpp + yql_generic_describe_table.cpp + yql_generic_describe_table.h yql_generic_dq_integration.cpp yql_generic_io_discovery.cpp + yql_generic_list_splits.cpp + yql_generic_list_splits.h yql_generic_load_meta.cpp yql_generic_logical_opt.cpp yql_generic_mkql_compiler.cpp diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp new file mode 100644 index 00000000000..8565df5520d --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp @@ -0,0 +1,616 @@ +#include "yql_generic_describe_table.h" + +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/providers/common/structured_token/yql_token_builder.h> +#include <yql/essentials/providers/common/provider/yql_provider.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> +#include <yql/essentials/minikql/mkql_type_ops.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/mkql_alloc.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> +#include <yql/essentials/minikql/comp_nodes/mkql_factories.h> +#include <yql/essentials/core/yql_graph_transformer.h> +#include <yql/essentials/core/yql_expr_optimize.h> +#include <yql/essentials/core/type_ann/type_ann_expr.h> +#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/ast/yql_type_string.h> +#include <yql/essentials/ast/yql_expr.h> +#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/error.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> +#include <ydb/core/fq/libs/result_formatter/result_formatter.h> +#include <ydb/core/external_sources/iceberg_fields.h> +#include <library/cpp/json/json_reader.h> + +namespace NYql { + +using namespace NNodes; +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +class TGenericDescribeTableTransformer : public TGraphTransformerBase { + struct TTableDescription { + using TPtr = std::shared_ptr<TTableDescription>; + + NYql::TGenericDataSourceInstance DataSourceInstance; + std::optional<NConnector::NApi::TSchema> Schema; + // Issues that could occur at any phase of network interaction with Connector + TIssues Issues; + }; + + using TTableDescriptionMap = + std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>; + +public: + explicit TGenericDescribeTableTransformer(TGenericState::TPtr state); + +public: + TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + + NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode&) final { + return AsyncFuture_; + } + + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + + void Rewind() final; + +private: + TIssues DescribeTableFromConnector(const TGenericState::TTableAddress& tableAddress, + std::vector<NThreading::TFuture<void>>& handles); + + TIssues FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig, const TString& tablePath); + + void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig); + + void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request); + +private: + const TGenericState::TPtr State_; + TTableDescriptionMap TableDescriptions_; + NThreading::TFuture<void> AsyncFuture_; +}; + +TGenericDescribeTableTransformer::TGenericDescribeTableTransformer(TGenericState::TPtr state) + : State_(std::move(state)) +{ } + +void TGenericDescribeTableTransformer::FillCredentials(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig) { + auto dsi = request.mutable_data_source_instance(); + + // If login/password is provided, just copy them into request: + // connector will use Basic Auth to access external data sources. + if (clusterConfig.GetCredentials().Hasbasic()) { + *dsi->mutable_credentials() = clusterConfig.GetCredentials(); + return; + } + + // If there are no Basic Auth parameters, two options can be considered: + // 1. Client provided own IAM-token to access external data source + auto iamToken = State_->Types->Credentials->FindCredentialContent( + "default_" + clusterConfig.name(), "default_generic", clusterConfig.GetToken()); + if (iamToken) { + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + return; + } + + // 2. Client provided service account creds that must be converted into IAM-token + Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized"); + auto structuredTokenJSON = TStructuredTokenBuilder() + .SetServiceAccountIdAuth(clusterConfig.GetServiceAccountId(), + clusterConfig.GetServiceAccountIdSignature()) + .ToJson(); + Y_ENSURE(structuredTokenJSON, "empty structured token"); + + // Create provider or get existing one. + // It's crucial to reuse providers because their construction implies synchronous IO. + auto providersIt = State_->CredentialProviders.find(clusterConfig.name()); + if (providersIt == State_->CredentialProviders.end()) { + auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken( + State_->CredentialsFactory, structuredTokenJSON, false); + + providersIt = + State_->CredentialProviders + .emplace(clusterConfig.name(), credentialsProviderFactory->CreateProvider()) + .first; + } + + iamToken = providersIt->second->GetAuthInfo(); + Y_ENSURE(iamToken, "empty IAM token"); + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; +} + +template <typename T> +void SetSchema(T& request, const TGenericClusterConfig& clusterConfig) { + TString schema; + const auto it = clusterConfig.GetDataSourceOptions().find("schema"); + if (it != clusterConfig.GetDataSourceOptions().end()) { + schema = it->second; + } + if (!schema) { + schema = "public"; + } + request.set_schema(schema); +} + +void SetOracleServiceName(NYql::TOracleDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) { + const auto it = clusterConfig.GetDataSourceOptions().find("service_name"); + if (it != clusterConfig.GetDataSourceOptions().end()) { + options.set_service_name(it->second); + } +} + +void SetLoggingFolderId(NYql::TLoggingDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) { + const auto it = clusterConfig.GetDataSourceOptions().find("folder_id"); + if (it != clusterConfig.GetDataSourceOptions().end()) { + options.set_folder_id(it->second); + } +} + +TString GetIcebergOptionValue(const ::google::protobuf::Map<TProtoStringType, + TProtoStringType>& options, TString option) { + auto it = options.find(option); + if (options.end() == it) { + throw yexception() + << "Cluster config for an Iceberg data source" + << " is missing option: " + << option; + } + return it->second; +} + +/// +/// Fill options into DatSourceOptions specific for an iceberg data type +/// +void SetIcebergOptions(NYql::TIcebergDataSourceOptions& dataSourceOptions, const TGenericClusterConfig& clusterConfig) { + using namespace NKikimr::NExternalSource::NIceberg; + const auto& clusterOptions = clusterConfig.GetDataSourceOptions(); + auto warehouseType = GetIcebergOptionValue(clusterOptions, WAREHOUSE_TYPE); + + if (VALUE_S3 != warehouseType) { + throw yexception() << "Unexpected warehouse type: " << warehouseType; + } + + auto endpoint = GetIcebergOptionValue(clusterOptions, WAREHOUSE_S3_ENDPOINT); + auto region = GetIcebergOptionValue(clusterOptions, WAREHOUSE_S3_REGION); + auto uri = GetIcebergOptionValue(clusterOptions, WAREHOUSE_S3_URI); + auto& s3 = *dataSourceOptions.mutable_warehouse()->mutable_s3(); + + s3.set_endpoint(endpoint); + s3.set_region(region); + s3.set_uri(uri); + + auto catalogType = GetIcebergOptionValue(clusterOptions, CATALOG_TYPE); + auto& catalog = *dataSourceOptions.mutable_catalog(); + + // set catalog options + if (VALUE_HADOOP == catalogType) { + // hadoop nothing yet + catalog.mutable_hadoop(); + } else if (VALUE_HIVE_METASTORE == catalogType) { + auto uri = GetIcebergOptionValue(clusterOptions, CATALOG_HIVE_METASTORE_URI); + catalog.mutable_hive_metastore()->set_uri(uri); + } else { + throw yexception() << "Unexpected catalog type: " << catalogType; + } +} + +TIssues SetMongoDBOptions(NYql::TMongoDbDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) { + TIssues issues; + auto it = clusterConfig.GetDataSourceOptions().find("reading_mode"); + + if (it != clusterConfig.GetDataSourceOptions().end()) { + TMongoDbDataSourceOptions_EReadingMode value = TMongoDbDataSourceOptions::READING_MODE_UNSPECIFIED; + if (!TMongoDbDataSourceOptions_EReadingMode_Parse(it->second, &value)) { + issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB reading_mode: " << it->second)); + } + options.set_reading_mode(value); + } + + it = clusterConfig.GetDataSourceOptions().find("unexpected_type_display_mode"); + + if (it != clusterConfig.GetDataSourceOptions().end()) { + TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode value = TMongoDbDataSourceOptions::UNEXPECTED_UNSPECIFIED; + if (!TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode_Parse(it->second, &value)) { + issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unexpected_type_display_mode: " << it->second)); + } + options.set_unexpected_type_display_mode(value); + } + + it = clusterConfig.GetDataSourceOptions().find("unsupported_type_display_mode"); + + if (it != clusterConfig.GetDataSourceOptions().end()) { + TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode value = TMongoDbDataSourceOptions::UNSUPPORTED_UNSPECIFIED; + if (!TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode_Parse(it->second, &value)) { + issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unsupported_type_display_mode: " << it->second)); + } + options.set_unsupported_type_display_mode(value); + } + + return issues; +} + +TIssues FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig) { + const auto dataSourceKind = clusterConfig.GetKind(); + + switch (dataSourceKind) { + case NYql::EGenericDataSourceKind::CLICKHOUSE: + break; + case NYql::EGenericDataSourceKind::YDB: + break; + case NYql::EGenericDataSourceKind::MYSQL: + break; + case NYql::EGenericDataSourceKind::GREENPLUM: { + auto* options = request.mutable_data_source_instance()->mutable_gp_options(); + SetSchema(*options, clusterConfig); + } break; + case NYql::EGenericDataSourceKind::MS_SQL_SERVER: + break; + case NYql::EGenericDataSourceKind::POSTGRESQL: { + auto* options = request.mutable_data_source_instance()->mutable_pg_options(); + SetSchema(*options, clusterConfig); + } break; + case NYql::EGenericDataSourceKind::ORACLE: { + auto* options = request.mutable_data_source_instance()->mutable_oracle_options(); + SetOracleServiceName(*options, clusterConfig); + } break; + case NYql::EGenericDataSourceKind::LOGGING: { + auto* options = request.mutable_data_source_instance()->mutable_logging_options(); + SetLoggingFolderId(*options, clusterConfig); + } break; + case NYql::EGenericDataSourceKind::ICEBERG: { + auto* options = request.mutable_data_source_instance()->mutable_iceberg_options(); + SetIcebergOptions(*options, clusterConfig); + } break; + case NYql::EGenericDataSourceKind::REDIS: + break; + case NYql::EGenericDataSourceKind::PROMETHEUS: + break; + case NYql::EGenericDataSourceKind::MONGO_DB: { + auto* options = request.mutable_data_source_instance()->mutable_mongodb_options(); + return SetMongoDBOptions(*options, clusterConfig); + } break; + case NYql::EGenericDataSourceKind::OPENSEARCH: + break; + default: + throw yexception() << "Unexpected data source kind: '" + << NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'"; + } + + return TIssues{}; +} + +void FillTablePath(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, + const TString& tablePath) { + request.mutable_data_source_instance()->set_database(clusterConfig.GetDatabaseName()); + request.set_table(tablePath); +} + +void TGenericDescribeTableTransformer::FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) { + const TString dateTimeFormat = + State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat); + + if (dateTimeFormat == "string") { + request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::STRING_FORMAT); + } else if (dateTimeFormat == "YQL") { + request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::YQL_FORMAT); + } else { + throw yexception() << "Unexpected date/time format: '" << dateTimeFormat << "'"; + } +} + +IGraphTransformer::TStatus TGenericDescribeTableTransformer::DoTransform(TExprNode::TPtr input, + TExprNode::TPtr& output, + TExprContext& ctx) { + output = input; + + if (ctx.Step.IsDone(TExprStep::LoadTablesMetadata)) { + return TStatus::Ok; + } + + const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) { + if (const auto maybeRead = TMaybeNode<TGenRead>(node)) { + return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; + } + return false; + }); + + if (reads.empty()) { + return TStatus::Ok; + } + + std::unordered_set<TTableDescriptionMap::key_type, TTableDescriptionMap::hasher> pendingRequests; + + // Iterate over all read table queries in the input expression, + // create Describe requests for unique tables + for (const auto& r : reads) { + const TGenRead read(r); + + if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) { + ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key")); + return TStatus::Error; + } + + const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe<TCoKey>(); + + if (!maybeKey) { + ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key")); + return TStatus::Error; + } + + const auto& keyArg = maybeKey.Cast().Ref().Head(); + + if (!keyArg.IsList() || keyArg.ChildrenSize() != 2U || !keyArg.Head().IsAtom("table") || + !keyArg.Tail().IsCallable(TCoString::CallableName())) { + ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), "Expected single table name")); + return TStatus::Error; + } + + const auto clusterName = read.DataSource().Cluster().StringValue(); + const auto tableName = TString(keyArg.Tail().Head().Content()); + auto tableAddress = TGenericState::TTableAddress(clusterName, tableName); + + // Table meta has already been acquired + if (State_->HasTable(tableAddress)) { + continue; + } + + if (pendingRequests.insert(tableAddress).second) { + YQL_CLOG(INFO, ProviderGeneric) << "Describe table for: `" << tableAddress.ToString() << "`"; + } + } + + if (pendingRequests.empty()) { + return TStatus::Ok; + } + + std::vector<NThreading::TFuture<void>> handles; + handles.reserve(pendingRequests.size()); + TableDescriptions_.reserve(pendingRequests.size()); + + for (const auto& tableAddress : pendingRequests) { + auto tIssues = DescribeTableFromConnector(tableAddress, handles); + + if (!tIssues.Empty()) { + ctx.AddError(TIssue(tIssues.ToString())); + return TStatus::Error; + } + } + + if (handles.empty()) { + return TStatus::Ok; + } + + AsyncFuture_ = NThreading::WaitExceptionOrAll(handles); + return TStatus::Async; +} + +TIssues TGenericDescribeTableTransformer::DescribeTableFromConnector(const TGenericState::TTableAddress& tableAddress, + std::vector<NThreading::TFuture<void>>& handles) { + const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(tableAddress.ClusterName); + + YQL_ENSURE( + State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, + "cluster not found: " << tableAddress.ClusterName + ); + + // Preserve data source instance for the further usage + auto emplaceIt = TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>()); + auto desc = emplaceIt.first->second; + NConnector::NApi::TDescribeTableRequest request; + auto issues = FillDescribeTableRequest(request, it->second, tableAddress.TableName); + + if (!issues.Empty()) { + return issues; + } + + auto promise = NThreading::NewPromise(); + handles.emplace_back(promise.GetFuture()); + desc->DataSourceInstance = request.data_source_instance(); + + Y_ENSURE(State_->GenericClient); + + State_->GenericClient->DescribeTable(request, State_->Configuration->DescribeTableTimeout).Subscribe( + [desc, tableAddress, promise, client = State_->GenericClient](const NConnector::TDescribeTableAsyncResult& f1) mutable { + NConnector::TDescribeTableAsyncResult f2(f1); + auto result = f2.ExtractValueSync(); + + // Check transport error + if (!result.Status.Ok()) { + desc->Issues.AddIssue(TStringBuilder() + << "Call DescribeTable for table " << tableAddress.ToString() << ": " + << result.Status.ToDebugString()); + promise.SetValue(); + return; + } + + // Check logical error + if (!NConnector::IsSuccess(*result.Response)) { + desc->Issues.AddIssues(NConnector::ErrorToIssues( + result.Response->error(), + TStringBuilder() << "Call DescribeTable for table " << tableAddress.ToString() << ": ")); + promise.SetValue(); + return; + } + + // Preserve schema for the further usage + desc->Schema = result.Response->schema(); + promise.SetValue(); + }); + + return {}; +} + +TIssues TGenericDescribeTableTransformer::FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig, + const TString& tablePath) { + const auto dataSourceKind = clusterConfig.GetKind(); + auto dsi = request.mutable_data_source_instance(); + + *dsi->mutable_endpoint() = clusterConfig.GetEndpoint(); + dsi->set_kind(dataSourceKind); + dsi->set_use_tls(clusterConfig.GetUseSsl()); + dsi->set_protocol(clusterConfig.GetProtocol()); + + FillCredentials(request, clusterConfig); + FillTypeMappingSettings(request); + + auto issues = FillDataSourceOptions(request, clusterConfig); + + if (!issues.Empty()) { + return issues; + } + + FillTablePath(request, clusterConfig, tablePath); + return {}; +} + +TIssues ParseTableMeta( + TExprContext& ctx, + const TPosition& pos, + const TGenericState::TTableAddress& tableAddress, + TGenericState::TTableMeta& tableMeta +) try { + TVector<const TItemExprType*> items; + + const auto& columns = tableMeta.Schema.columns(); + + if (columns.empty()) { + TIssues issues; + issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << tableAddress.ToString() << " doesn't exist.")); + return issues; + } + + for (const auto& column : columns) { + // Make type annotation + NYdb::TTypeParser parser(column.type()); + auto typeAnnotation = NFq::MakeType(parser, ctx); + + // Create items from graph + items.emplace_back(ctx.MakeType<TItemExprType>(column.name(), typeAnnotation)); + tableMeta.ColumnOrder.emplace_back(column.name()); + } + + tableMeta.ItemType = ctx.MakeType<TStructExprType>(items); + return TIssues{}; +} catch (std::exception&) { + TIssues issues; + issues.AddIssue(TIssue(pos, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage())); + return issues; +} + +TExprNode::TPtr MakeTableMetaNode( + TExprContext& ctx, + const TGenRead& read, + const TString& tableName +) { + // clang-format off + auto row = Build<TCoArgument>(ctx, read.Pos()) + .Name("row") + .Done(); + + auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos()) + .Args({row}) + .Body<TCoBool>() + .Literal().Build("true") + .Build() + .Done().Ptr(); + + auto table = Build<TGenTable>(ctx, read.Pos()) + .Name().Value(tableName).Build() + .Done(); + + return Build<TGenReadTable>(ctx, read.Pos()) + .World(read.World()) + .DataSource(read.DataSource()) + .Table(table) + .Columns<TCoVoid>().Build() + .FilterPredicate(emptyPredicate) + .Done().Ptr(); + // clang-format on +} + +IGraphTransformer::TStatus TGenericDescribeTableTransformer::DoApplyAsyncChanges(TExprNode::TPtr input, + TExprNode::TPtr& output, + TExprContext& ctx) { + AsyncFuture_.GetValue(); + + const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) { + if (const auto maybeRead = TMaybeNode<TGenRead>(node)) { + return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; + } + return false; + }); + + if (!reads.size()) { + return TStatus::Ok; + } + + TNodeOnNodeOwnedMap replaces(reads.size()); + + // Iterate over all read table queries, check Connector responses + for (const auto& r : reads) { + const TGenRead genRead(r); + const auto clusterName = genRead.DataSource().Cluster().StringValue(); + const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head(); + const auto tableName = TString(keyArg.Tail().Head().Content()); + const TGenericState::TTableAddress tableAddress{clusterName, tableName}; + + // Find appropriate response + auto iter = TableDescriptions_.find(tableAddress); + if (iter == TableDescriptions_.end()) { + ctx.AddError(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder() + << "Connector response not found for table " + << tableAddress.ToString())); + + return TStatus::Error; + } + + auto& result = iter->second; + + // If errors occurred during network interaction with Connector, return them + if (result->Issues) { + for (const auto& issue : result->Issues) { + ctx.AddError(issue); + } + + return TStatus::Error; + } + + Y_ENSURE(result->Schema); + + TGenericState::TTableMeta tableMeta; + tableMeta.Schema = *result->Schema; + tableMeta.DataSourceInstance = result->DataSourceInstance; + + // Parse table schema + ParseTableMeta(ctx, ctx.GetPosition(genRead.Pos()), tableAddress, tableMeta); + + // Fill AST for a table + if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) { + ins.first->second = MakeTableMetaNode(ctx, genRead, tableName); + } + + // Save table metadata into provider state + State_->AddTable(tableAddress, std::move(tableMeta)); + } + + return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); +} + +void TGenericDescribeTableTransformer::Rewind() { + TableDescriptions_.clear(); + AsyncFuture_ = {}; +} + +THolder<IGraphTransformer> CreateGenericDescribeTableTransformer(TGenericState::TPtr state) { + return MakeHolder<TGenericDescribeTableTransformer>(std::move(state)); +} + +} // NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h new file mode 100644 index 00000000000..83e9c667406 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h @@ -0,0 +1,10 @@ +#pragma once + +#include <yql/essentials/core/yql_graph_transformer.h> +#include "yql_generic_state.h" + +namespace NYql { + +THolder<IGraphTransformer> CreateGenericDescribeTableTransformer(TGenericState::TPtr state); + +} // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index a05d920cebc..f7f54ec759b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -9,6 +9,7 @@ #include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> #include <ydb/library/yql/providers/generic/proto/partition.pb.h> #include <ydb/library/yql/providers/generic/proto/source.pb.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_utils.h> #include <ydb/library/yql/utils/plan/plan_utils.h> #include <yql/essentials/ast/yql_expr.h> #include <yql/essentials/providers/common/dq/yql_dq_integration_impl.h> @@ -115,6 +116,28 @@ namespace NYql { return read; } + /// + /// Fill a select from a dq source + /// + void FillSelect(NConnector::NApi::TSelect& select, const TDqSource& source, TExprContext& ctx) { + const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>(); + + if (!maybeSettings) { + return; + } + + const auto settings = maybeSettings.Cast(); + const auto& tableName = settings.Table().StringValue(); + const auto& clusterName = source.DataSource().Cast<TGenDataSource>().Cluster().StringValue(); + auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); + + if (issues) { + throw yexception() << "Get table metadata: " << issues.ToOneLineString(); + } + + FillSelectFromGenSourceSettings(select, settings, ctx, tableMeta); + } + ui64 Partition( const TExprNode& node, TVector<TString>& partitions, @@ -145,7 +168,12 @@ namespace NYql { return 0; } - const size_t totalSplits = tableMeta->Splits.size(); + NConnector::NApi::TSelect select; + FillSelect(select, TDqSource(&node), ctx); + + auto selectKey = tableAddress.MakeKeyFor(select); + auto splits = tableMeta->GetSplitsForSelect(selectKey); + const size_t totalSplits = splits.size(); partitions.clear(); @@ -153,7 +181,7 @@ namespace NYql { // If there are not too many splits, simply make a single-split partitions. for (size_t i = 0; i < totalSplits; i++) { Generic::TPartition partition; - *partition.add_splits() = tableMeta->Splits[i]; + *partition.add_splits() = splits[i]; TString partitionStr; YQL_ENSURE(partition.SerializeToString(&partitionStr), "Failed to serialize partition"); partitions.emplace_back(std::move(partitionStr)); @@ -166,7 +194,7 @@ namespace NYql { for (size_t i = 0; i < totalSplits; i += splitsPerPartition) { Generic::TPartition partition; for (size_t j = i; j < i + splitsPerPartition && j < totalSplits; j++) { - *partition.add_splits() = tableMeta->Splits[j]; + *partition.add_splits() = splits[j]; } TString partitionStr; YQL_ENSURE(partition.SerializeToString(&partitionStr), "Failed to serialize partition"); @@ -180,77 +208,60 @@ namespace NYql { void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t, TExprContext& ctx) override { - const TDqSource source(&node); - if (const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>()) { - const auto settings = maybeSettings.Cast(); - const auto& clusterName = source.DataSource().Cast<TGenDataSource>().Cluster().StringValue(); - const auto& tableName = settings.Table().StringValue(); - const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; - const auto& endpoint = clusterConfig.endpoint(); - - Generic::TSource source; - - YQL_CLOG(INFO, ProviderGeneric) - << "Filling source settings" - << ": cluster: " << clusterName - << ", table: " << tableName - << ", endpoint: " << endpoint.ShortDebugString(); - - const auto& columns = settings.Columns(); - - auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); - if (issues) { - throw yexception() << "Get table metadata: " << issues.ToOneLineString(); - } + const TDqSource dqSource(&node); + const auto maybeSettings = dqSource.Settings().Maybe<TGenSourceSettings>(); - // prepare select - auto select = source.mutable_select(); - select->mutable_from()->set_table(TString(tableName)); - *select->mutable_data_source_instance() = tableMeta->DataSourceInstance; - - auto items = select->mutable_what()->mutable_items(); - for (size_t i = 0; i < columns.Size(); i++) { - // assign column name - auto column = items->Add()->mutable_column(); - auto columnName = columns.Item(i).StringValue(); - column->mutable_name()->assign(columnName); - - // assign column type - auto type = NConnector::GetColumnTypeByName(tableMeta->Schema, columnName); - *column->mutable_type() = type; - } + if (!maybeSettings) { + return; + } - if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) { - TStringBuilder err; - if (!SerializeFilterPredicate(ctx, predicate, select->mutable_where()->mutable_filter_typed(), err)) { - throw yexception() << "Failed to serialize filter predicate for source: " << err; - } - } + const auto settings = maybeSettings.Cast(); + const auto& clusterName = dqSource.DataSource().Cast<TGenDataSource>().Cluster().StringValue(); + const auto& tableName = settings.Table().StringValue(); + const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; + const auto& endpoint = clusterConfig.endpoint(); - // TODO: remove this block as soon as the first part YQ-4730 is deployed - // - // Iceberg/Managed YDB (including YDB underlying Logging) supports access via IAM token. - // If exist, copy service account creds to obtain tokens during request execution phase. - // If exists, copy previously created token. - if (IsIn({NYql::EGenericDataSourceKind::YDB, NYql::EGenericDataSourceKind::LOGGING, NYql::EGenericDataSourceKind::ICEBERG}, clusterConfig.kind())) { - source.SetServiceAccountId(clusterConfig.GetServiceAccountId()); - source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature()); - source.SetToken(State_->Types->Credentials->FindCredentialContent( - "default_" + clusterConfig.name(), - "default_generic", - clusterConfig.GetToken())); - } + Generic::TSource source; - // We set token name to the protobuf message that will be received - // by the read actor during the execution phase. - // It will use token name to extract credentials from the secureParams. - const TString tokenName(settings.Token().Maybe<TCoSecureParam>().Name().Cast()); - source.SetTokenName(tokenName); + YQL_CLOG(INFO, ProviderGeneric) + << "Filling source settings" + << ": cluster: " << clusterName + << ", table: " << tableName + << ", endpoint: " << endpoint.ShortDebugString(); - // preserve source description for read actor - protoSettings.PackFrom(source); - sourceType = GetSourceType(select->data_source_instance()); + auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); + + if (issues) { + throw yexception() << "Get table metadata: " << issues.ToOneLineString(); + } + + // prepare select + auto select = source.mutable_select(); + FillSelect(*select, dqSource, ctx); + + // TODO: remove this block as soon as the first part YQ-4730 is deployed + // + // Iceberg/Managed YDB (including YDB underlying Logging) supports access via IAM token. + // If exists, copy service account creds to obtain tokens during request execution phase. + // If exists, copy previously created token. + if (IsIn({NYql::EGenericDataSourceKind::YDB, NYql::EGenericDataSourceKind::LOGGING, NYql::EGenericDataSourceKind::ICEBERG}, clusterConfig.kind())) { + source.SetServiceAccountId(clusterConfig.GetServiceAccountId()); + source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature()); + source.SetToken(State_->Types->Credentials->FindCredentialContent( + "default_" + clusterConfig.name(), + "default_generic", + clusterConfig.GetToken())); } + + // We set token name to the protobuf message that will be received + // by the read actor during the execution phase. + // It will use token name to extract credentials from the secureParams. + const TString tokenName(settings.Token().Maybe<TCoSecureParam>().Name().Cast()); + source.SetTokenName(tokenName); + + // preserve source description for read actor + protoSettings.PackFrom(source); + sourceType = GetSourceType(select->data_source_instance()); } bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override { diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp new file mode 100644 index 00000000000..c496bcb62d0 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp @@ -0,0 +1,342 @@ +#include "yql_generic_list_splits.h" + +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/providers/common/structured_token/yql_token_builder.h> +#include <yql/essentials/providers/common/provider/yql_provider.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> +#include <yql/essentials/minikql/mkql_type_ops.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/mkql_alloc.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> +#include <yql/essentials/minikql/comp_nodes/mkql_factories.h> +#include <yql/essentials/core/yql_graph_transformer.h> +#include <yql/essentials/core/yql_expr_optimize.h> +#include <yql/essentials/core/type_ann/type_ann_expr.h> +#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/ast/yql_type_string.h> +#include <yql/essentials/ast/yql_expr.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_utils.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h> +#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/error.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/core/fq/libs/result_formatter/result_formatter.h> +#include <ydb/core/external_sources/iceberg_fields.h> +#include <library/cpp/json/json_reader.h> + +namespace NYql { + +using namespace NNodes; +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +/// +/// Optimization process contains several transformers in a pipeline which are called +/// consequently. If a DoTransform returns repeat, e.g. it changes expression, the process +/// is starting again from the very begining, but with a new input. That is why transformer +/// can be called multiple times. The purpose of this transformer is to find TGenSourceSettings +/// nodes which were created during previous transformations and perform ListSplit request. +/// ListSplit transformer has to work after where clause has been pushed; otherwise +/// in a ReadSplit request, which ultimately occurs after pushdown, connector could receive +/// where clause that is differ from ListSplit's. +/// +/// The order of transformations calls: +/// +/// 1. TGenericPhysicalOptProposalTransformer::PushFilterToReadTable pushdowns predicate into +/// a TGenReadTable node. +/// +/// 2. TKqpConstantFoldingTransformer folds const expression in a pushdown predicate. +/// +/// 3. TGenericListSplitTransformer performs a ListSplit request on a TGenSourceSettings node. +/// +class TGenericListSplitTransformer : public TGraphTransformerBase { + struct TListSplitRequestData { + TSelectKey Key; + NConnector::NApi::TSelect Select; + TGenericState::TTableAddress TableAddress; + }; + + struct TListResponse { + using TPtr = std::shared_ptr<TListResponse>; + + std::vector<NConnector::NApi::TSplit> Splits; + TIssues Issues; + }; + + using TListResponseMap = + std::unordered_map<TSelectKey, TListResponse::TPtr, TSelectKeyHash>; + +public: + explicit TGenericListSplitTransformer(TGenericState::TPtr state); + +public: + TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + + NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode&) final { + return AsyncFuture_; + } + + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + + void Rewind() final; + +private: + TIssues ListSplitsFromConnector(const TListSplitRequestData& data, std::vector<NThreading::TFuture<void>>& handles); + +private: + const TGenericState::TPtr State_; + TListResponseMap ListResponses_; + NThreading::TFuture<void> AsyncFuture_; +}; + +TGenericListSplitTransformer::TGenericListSplitTransformer(TGenericState::TPtr state) + : State_(std::move(state)) +{ } + +IGraphTransformer::TStatus TGenericListSplitTransformer::DoTransform(TExprNode::TPtr input, + TExprNode::TPtr& output, + TExprContext& ctx) { + output = input; + + const auto& readsSettings = FindNodes(input, [&](const TExprNode::TPtr& node) { + if (const auto maybeSettings = TMaybeNode<TGenSourceSettings>(node)) { + return true; + } + + return false; + }); + + if (readsSettings.empty()) { + return TStatus::Ok; + } + + std::unordered_map<TSelectKey, TListSplitRequestData, TSelectKeyHash> pendingRequests; + + // Iterate over all settings in the input expression, create ListSplit request if needed + for (const auto& r : readsSettings) { + const TGenSourceSettings settings(r); + + const auto& tableName = settings.Table().StringValue(); + const auto& clusterName = settings.Cluster().StringValue(); + + auto tableAddress = TGenericState::TTableAddress(clusterName, tableName); + auto table = State_->GetTable(tableAddress); + + if (!table.first) { + ctx.AddError(TIssue(table.second.ToString())); + return TStatus::Error; + } + + // Grab select from a read table query + NConnector::NApi::TSelect select; + FillSelectFromGenSourceSettings(select, settings, ctx, table.first); + + auto selectKey = tableAddress.MakeKeyFor(select); + + // The one sql query could contain multiple selects, e.g. + // join, subquery etc. If splits has been already acquired for a + // select, skip it + if (table.first->HasSplitsForSelect(selectKey)) { + continue; + } + + auto v = TListSplitRequestData{selectKey, std::move(select), tableAddress}; + pendingRequests.emplace(selectKey, v); + } + + if (pendingRequests.empty()) { + return TStatus::Ok; + } + + std::vector<NThreading::TFuture<void>> handles; + handles.reserve(pendingRequests.size()); + ListResponses_.reserve(pendingRequests.size()); + + for (auto& k : pendingRequests) { + auto tIssues = ListSplitsFromConnector(k.second, handles); + if (!tIssues.Empty()) { + ctx.AddError(TIssue(tIssues.ToString())); + return TStatus::Error; + } + } + + if (handles.empty()) { + return TStatus::Ok; + } + + AsyncFuture_ = NThreading::WaitExceptionOrAll(handles); + return TStatus::Async; +} + +TIssues TGenericListSplitTransformer::ListSplitsFromConnector(const TListSplitRequestData& data, + std::vector<NThreading::TFuture<void>>& handles) { + auto table = State_->GetTable(data.TableAddress); + + if (!table.first) { + return table.second; + } + + // Preserve data source instance for the further usage + auto emplaceIt = ListResponses_.emplace(data.Key, std::make_shared<TListResponse>()); + auto desc = emplaceIt.first->second; + + // Call ListSplits + NConnector::NApi::TListSplitsRequest request; + *request.mutable_selects()->Add() = data.Select; + + auto promise = NThreading::NewPromise(); + handles.emplace_back(promise.GetFuture()); + + Y_ENSURE(State_->GenericClient); + + State_->GenericClient->ListSplits(request).Subscribe([desc, promise, data] + (const NConnector::TListSplitsStreamIteratorAsyncResult f3) mutable { + NConnector::TListSplitsStreamIteratorAsyncResult f4(f3); + auto streamIterResult = f4.ExtractValueSync(); + + // Check transport error + if (!streamIterResult.Status.Ok()) { + desc->Issues.AddIssue(TStringBuilder() + << "Call ListSplits for table: " << data.TableAddress.ToString() + << " with select: " << streamIterResult.Status.ToDebugString()); + promise.SetValue(); + return; + } + + Y_ENSURE(streamIterResult.Iterator); + + auto drainer = + NConnector::MakeListSplitsStreamIteratorDrainer(std::move(streamIterResult.Iterator)); + + // Pass drainer to the callback because we want him to stay alive until the callback is called + drainer->Run().Subscribe([desc, promise, data, drainer] + (const NThreading::TFuture<NConnector::TListSplitsStreamIteratorDrainer::TBuffer>& f5) mutable { + NThreading::TFuture<NConnector::TListSplitsStreamIteratorDrainer::TBuffer> f6(f5); + auto drainerResult = f6.ExtractValueSync(); + + // check transport and logical errors + if (drainerResult.Issues) { + auto msg = TStringBuilder() + << "Call ListSplits for table: " << data.TableAddress.ToString() << " with select: " + << data.Select.what().DebugString() + << data.Select.from().DebugString() + << data.Select.where().DebugString(); + + TIssue dstIssue(msg); + + for (const auto& srcIssue : drainerResult.Issues) { + dstIssue.AddSubIssue(MakeIntrusive<TIssue>(srcIssue)); + } + + desc->Issues.AddIssue(std::move(dstIssue)); + promise.SetValue(); + return; + } + + // Collect all the splits from every response into a single vector + for (auto&& response : drainerResult.Responses) { + std::transform(std::make_move_iterator(response.mutable_splits()->begin()), + std::make_move_iterator(response.mutable_splits()->end()), + std::back_inserter(desc->Splits), + [](auto&& split) { return std::move(split); }); + } + + promise.SetValue(); + }); + }); + + return TIssues(); +} + +IGraphTransformer::TStatus TGenericListSplitTransformer::DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { + AsyncFuture_.GetValue(); + output = input; + + /* + * Search for a TGenSourceSettings. TGenSourceSettings contains the same data as during + * DoTransform method call + */ + const auto& readsSettings = FindNodes(input, [&](const TExprNode::TPtr& node) { + if (const auto maybeSettings = TMaybeNode<TGenSourceSettings>(node)) { + return true; + } + + return false; + }); + + if (readsSettings.empty()) { + return TStatus::Ok; + } + + // Iterate over all settings in the input expression, check Connector responses + for (const auto& r : readsSettings) { + const TGenSourceSettings settings(r); + + const auto& tableName = settings.Table().StringValue(); + const auto& clusterName = settings.Cluster().StringValue(); + + auto tableAddress = TGenericState::TTableAddress(clusterName, tableName); + auto table = State_->GetTable(tableAddress); + + if (!table.first) { + ctx.AddError(TIssue(table.second.ToString())); + return TStatus::Error; + } + + // Grab select from a read table query + NConnector::NApi::TSelect select; + FillSelectFromGenSourceSettings(select, settings, ctx, table.first); + auto selectKey = tableAddress.MakeKeyFor(select); + + // If splits for a similar select for this table was created skip it + if (table.first->HasSplitsForSelect(selectKey)) { + continue; + } + + // Find appropriate response + auto iter = ListResponses_.find(selectKey); + + if (iter == ListResponses_.end()) { + auto msg = TStringBuilder() + << "Connector response not found for table: " << tableAddress.ToString() << " and select: " + << select.what().DebugString() + << select.from().DebugString() + << select.where().DebugString(); + + ctx.AddError(TIssue(ctx.GetPosition(settings.Pos()), msg)); + return TStatus::Error; + } + + auto& result = iter->second; + + // If errors occurred during network interaction with Connector, return them + if (result->Issues) { + for (const auto& issue : result->Issues) { + ctx.AddError(issue); + } + + return TStatus::Error; + } + + Y_ENSURE(!result->Splits.empty()); + + if (auto issue = State_->AttachSplitsToTable(tableAddress, selectKey, result->Splits); issue) { + ctx.AddError(*issue); + return TStatus::Error; + } + } + + return TStatus::Ok; +} + +void TGenericListSplitTransformer::Rewind() { + ListResponses_.clear(); + AsyncFuture_ = {}; +} + +THolder<TGraphTransformerBase> CreateGenericListSplitTransformer(TGenericState::TPtr state) { + return MakeHolder<TGenericListSplitTransformer>(std::move(state)); +} + +} // NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h new file mode 100644 index 00000000000..0dcb72984a4 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h @@ -0,0 +1,10 @@ +#pragma once + +#include <yql/essentials/core/yql_graph_transformer.h> +#include "yql_generic_state.h" + +namespace NYql { + +THolder<TGraphTransformerBase> CreateGenericListSplitTransformer(TGenericState::TPtr state); + +} // NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index e8edf828e2a..6b6d6829961 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -1,5 +1,6 @@ // clang-format off #include "yql_generic_provider_impl.h" +#include "yql_generic_describe_table.h" #include <library/cpp/json/json_reader.h> #include <ydb/core/fq/libs/result_formatter/result_formatter.h> @@ -27,7 +28,7 @@ namespace NYql { using namespace NKikimr; using namespace NKikimr::NMiniKQL; - class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase { + class [[deprecated("Now this transformer is represented by two transformers, it will be removed in the next version")]] TGenericLoadTableMetadataTransformer: public TGraphTransformerBase { struct TTableDescription { using TPtr = std::shared_ptr<TTableDescription>; @@ -635,7 +636,7 @@ namespace NYql { }; THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state) { - return MakeHolder<TGenericLoadTableMetadataTransformer>(std::move(state)); + return CreateGenericDescribeTableTransformer(state); } } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp index e98d002f24f..fdb29c0fbec 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp @@ -1,20 +1,23 @@ #include "yql_generic_provider_impl.h" #include "yql_generic_predicate_pushdown.h" +#include "yql_generic_list_splits.h" -#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> -#include <yql/essentials/core/yql_opt_utils.h> -#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> -#include <yql/essentials/providers/common/provider/yql_data_provider_impl.h> +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/providers/common/transform/yql_optimize.h> +#include <yql/essentials/providers/common/provider/yql_provider.h> #include <yql/essentials/providers/common/provider/yql_provider.h> #include <yql/essentials/providers/common/provider/yql_provider_names.h> -#include <ydb/library/yql/providers/common/pushdown/collection.h> -#include <ydb/library/yql/providers/common/pushdown/physical_opt.h> -#include <ydb/library/yql/providers/common/pushdown/predicate_node.h> -#include <yql/essentials/providers/common/transform/yql_optimize.h> +#include <yql/essentials/providers/common/provider/yql_data_provider_impl.h> +#include <yql/essentials/core/yql_opt_utils.h> +#include <yql/essentials/core/services/yql_transform_pipeline.h> +#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> -#include <yql/essentials/utils/log/log.h> -#include <yql/essentials/providers/common/provider/yql_provider.h> +#include <ydb/library/yql/providers/common/pushdown/predicate_node.h> +#include <ydb/library/yql/providers/common/pushdown/settings.h> +#include <ydb/library/yql/providers/common/pushdown/physical_opt.h> +#include <ydb/library/yql/providers/common/pushdown/collection.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> namespace NYql { @@ -38,7 +41,9 @@ namespace NYql { EFlag::JustPassthroughOperators | // To pushdown REGEXP over String column EFlag::FlatMapOverOptionals | // To pushdown REGEXP over Utf8 column EFlag::ToStringFromStringExpressions | // To pushdown REGEXP over Utf8 column - EFlag::DecimalType | EFlag::DecimalCtor + EFlag::DecimalType | EFlag::DecimalCtor | + EFlag::IntervalCtor | + EFlag::DateCtor ); EnableFunction("Re2.Grep"); // For REGEXP pushdown } @@ -207,10 +212,60 @@ namespace NYql { private: const TGenericState::TPtr State_; }; + + class TGenericPhysicalOptProposalWithListTransformer : public TGraphTransformerBase { + public: + explicit TGenericPhysicalOptProposalWithListTransformer(TGenericState::TPtr state) + : PhysicalOptTransformer_(std::make_unique<TGenericPhysicalOptProposalTransformer>(state)) + , ListTransformer_(CreateGenericListSplitTransformer(state)) + , AllowAsync_(false) + { } + + public: + TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + auto resultStatus = PhysicalOptTransformer_->DoTransform(input, output, ctx); + + Y_ENSURE(resultStatus != TStatus::Async); + + if (resultStatus != TStatus::Ok) { + return resultStatus; + } + + input = output; + resultStatus = ListTransformer_->DoTransform(input, output, ctx); + + if (resultStatus == TStatus::Async) { + AllowAsync_ = true; + } + + return resultStatus; + } + + NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& node) final { + Y_ENSURE(AllowAsync_); + return ListTransformer_->DoGetAsyncFuture(node); + } + + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + Y_ENSURE(AllowAsync_); + return ListTransformer_->DoApplyAsyncChanges(input, output, ctx); + } + + void Rewind() final { + AllowAsync_ = false; + PhysicalOptTransformer_->Rewind(); + ListTransformer_->Rewind(); + } + + private: + const std::unique_ptr<TGenericPhysicalOptProposalTransformer> PhysicalOptTransformer_; + const THolder<TGraphTransformerBase> ListTransformer_; + bool AllowAsync_; + }; } // namespace THolder<IGraphTransformer> CreateGenericPhysicalOptProposalTransformer(TGenericState::TPtr state) { - return MakeHolder<TGenericPhysicalOptProposalTransformer>(state); + return MakeHolder<TGenericPhysicalOptProposalWithListTransformer>(state); } } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp index 0b987d422b2..ea7ee42990d 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp @@ -341,6 +341,10 @@ namespace NYql { MATCH_ATOM(Utf8, UTF8, text, TString); MATCH_ATOM(Timestamp, TIMESTAMP, int64, i64); MATCH_ATOM(Interval, INTERVAL, int64, i64); + // Arrow vector holds value with ui16 type + // Proto value does not have ability to store + // ui16 type that's why uint32 is used + MATCH_ATOM(Date, DATE, uint32, ui16); MATCH_ARITHMETICAL(Sub, SUB); MATCH_ARITHMETICAL(Add, ADD); MATCH_ARITHMETICAL(Mul, MUL); @@ -683,6 +687,8 @@ namespace NYql { return "Utf8"; case Ydb::Type::JSON: return "Json"; + case Ydb::Type::DATE: + return "Date"; default: throw yexception() << "Failed to format primitive type, type case " << static_cast<ui64>(typeId) << " is not supported"; } @@ -716,6 +722,16 @@ namespace NYql { [[fallthrough]]; } } + case Ydb::Type::DATE: { + const auto& value = typedValue.value(); + switch (value.value_case()) { + case Ydb::Value::kUint32Value: + return TStringBuilder() << FormatType(typedValue.type()) << "(\"" + << TInstant::Days(value.uint32_value()).FormatLocalTime("%Y-%m-%d") << "\")"; + default: + [[fallthrough]]; + } + } default: [[fallthrough]]; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp index 61e86bf982c..7c9564294da 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp @@ -1,12 +1,87 @@ #include "yql_generic_state.h" +#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_utils.h> namespace NYql { + std::vector<TString> GetColumns(const NConnector::NApi::TSelect& select) { + std::vector<TString> columns; + + if (select.has_what() && !select.what().items().empty()) { + columns.reserve(select.what().items().size()); + + for (const auto& item : select.what().items()) { + if (!item.column().name().empty()) { + columns.push_back(item.column().name()); + } + } + + std::sort(columns.begin(), columns.end()); + } + + return columns; + } + + TSelectKey::TSelectKey(const TString& cluster, const NConnector::NApi::TSelect& select) + : Cluster(cluster) + , Table(select.has_from() ? select.from().table() : "") + , Columns(GetColumns(select)) + , Where(select.has_where() && select.where().has_filter_typed() ? + select.where().filter_typed().SerializeAsString() : "") + , Hash(CalculateHash()) + { } + + size_t TSelectKey::CalculateHash() const { + auto hash = CombineHashes(std::hash<TString>()(Cluster), std::hash<TString>()(Table)); + hash = CombineHashes(hash, std::hash<TString>()(Where)); + + for (const auto& col : Columns) { + hash = CombineHashes(hash, std::hash<TString>()(col)); + } + + return hash; + } + + TGenericState::TTableAddress::operator size_t() const { + return CombineHashes(std::hash<TString>()(ClusterName), std::hash<TString>()(TableName)); + } + + TSelectKey TGenericState::TTableAddress::MakeKeyFor(const NConnector::NApi::TSelect& select) const { + return TSelectKey(ClusterName, select); + } + + bool TGenericState::TTableMeta::HasSplitsForSelect(const TSelectKey& key) const { + return SelectSplits.contains(key); + } + + void TGenericState::TTableMeta::AttachSplitsForSelect(const TSelectKey& key, std::vector<NYql::NConnector::NApi::TSplit>& splits) { + Y_ENSURE(splits.size()); + Y_ENSURE(SelectSplits.emplace(key, std::move(splits)).second); + } + + const std::vector<NYql::NConnector::NApi::TSplit>& TGenericState::TTableMeta::GetSplitsForSelect( + const TSelectKey& key) const { + const auto it = SelectSplits.find(key); + + if (it == SelectSplits.end()) { + throw yexception() + << "Table metadata does not contain splits for a select from the table: " << key.Table + << " with where:" << key.Where; + } + + return it->second; + } + + bool TGenericState::HasTable(const TTableAddress& tableAddress) { + return Tables_.contains(tableAddress); + } + void TGenericState::AddTable(const TTableAddress& tableAddress, TTableMeta&& tableMeta) { Tables_.emplace(tableAddress, std::move(tableMeta)); } TGenericState::TGetTableResult TGenericState::GetTable(const TTableAddress& tableAddress) const { auto result = Tables_.FindPtr(tableAddress); + if (result) { return std::make_pair(result, TIssues{}); } @@ -17,4 +92,17 @@ namespace NYql { return std::make_pair<TTableMeta*, TIssues>(nullptr, std::move(issues)); } + std::optional<TIssue> TGenericState::AttachSplitsToTable(const TTableAddress& tableAddress, + const TSelectKey& key, + std::vector<NYql::NConnector::NApi::TSplit>& splits) { + auto result = Tables_.FindPtr(tableAddress); + + if (!result) { + return {TIssue(TStringBuilder() << "no metadata for table " << tableAddress.ToString())}; + } + + result->AttachSplitsForSelect(key, splits); + return std::nullopt; + } + } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h index 4abefb0e89d..bce6170de19 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h @@ -13,6 +13,37 @@ namespace NKikimr::NMiniKQL { } // namespace NKikimr::NMiniKQL namespace NYql { + /// + /// A key for a select query on a cluster table. Hash value is + /// calculated in a constructor and stored in a Hash field. + /// + struct TSelectKey { + const TString Cluster; + const TString Table; + const std::vector<TString> Columns; + const TString Where; + const size_t Hash; + + TSelectKey(const TSelectKey& select) = default; + + TSelectKey(const TString& Cluster, const NConnector::NApi::TSelect& select); + + bool operator==(const TSelectKey& other) const = default; + + TSelectKey& operator=(const TSelectKey& other) = default; + + size_t CalculateHash() const; + }; + + /// + /// Hasher for TSelectKey + /// + struct TSelectKeyHash { + size_t operator()(const TSelectKey& key) const noexcept { + return key.Hash; + } + }; + struct TGenericState: public TThrRefBase { using TPtr = TIntrusivePtr<TGenericState>; @@ -28,9 +59,12 @@ namespace NYql { return ClusterName == other.ClusterName && TableName == other.TableName; } - explicit operator size_t() const { - return CombineHashes(std::hash<TString>()(ClusterName), std::hash<TString>()(TableName)); - } + explicit operator size_t() const; + + /// + /// Make a key for a select request on a cluster table + /// + TSelectKey MakeKeyFor(const NConnector::NApi::TSelect& select) const; }; struct TTableMeta { @@ -41,8 +75,18 @@ namespace NYql { NYql::TGenericDataSourceInstance DataSourceInstance; // External table schema NYql::NConnector::NApi::TSchema Schema; + // Deprecated // Contains some binary description of table splits (partitions) produced by Connector std::vector<NYql::NConnector::NApi::TSplit> Splits; + // Contains splits for a particular select + std::unordered_map<TSelectKey, std::vector<NYql::NConnector::NApi::TSplit>, TSelectKeyHash> SelectSplits; + + bool HasSplitsForSelect(const TSelectKey& key) const; + + void AttachSplitsForSelect(const TSelectKey& key, + std::vector<NYql::NConnector::NApi::TSplit>& splits); + + const std::vector<NYql::NConnector::NApi::TSplit>& GetSplitsForSelect(const TSelectKey& key) const; }; using TGetTableResult = std::pair<const TTableMeta*, TIssues>; @@ -66,7 +110,11 @@ namespace NYql { Configuration->Init(gatewayConfig, databaseResolver, DatabaseAuth, types->Credentials); } + bool HasTable(const TTableAddress& tableAddress); void AddTable(const TTableAddress& tableAddress, TTableMeta&& tableMeta); + std::optional<TIssue> AttachSplitsToTable(const TTableAddress& tableAddress, + const TSelectKey& key, + std::vector<NYql::NConnector::NApi::TSplit>& splits); TGetTableResult GetTable(const TTableAddress& tableAddress) const; TTypeAnnotationContext* Types; @@ -88,4 +136,5 @@ namespace NYql { private: THashMap<TTableAddress, TTableMeta> Tables_; }; + } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp index 9d88896b889..b14af1091c6 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp @@ -1,6 +1,8 @@ #include "yql_generic_utils.h" #include <util/string/builder.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h> namespace NYql { TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) { @@ -19,4 +21,40 @@ namespace NYql { return sb; } + + /// + /// Fill a select from a TGenSourceSettings + /// + void FillSelectFromGenSourceSettings(NConnector::NApi::TSelect& select, + const NNodes::TGenSourceSettings& settings, + TExprContext& ctx, + const TGenericState::TTableMeta* tableMeta) { + const auto& tableName = settings.Table().StringValue(); + + select.mutable_from()->set_table(TString(tableName)); + *select.mutable_data_source_instance() = tableMeta->DataSourceInstance; + + const auto& columns = settings.Columns(); + auto items = select.mutable_what()->mutable_items(); + + for (size_t i = 0; i < columns.Size(); i++) { + // assign column name + auto column = items->Add()->mutable_column(); + auto columnName = columns.Item(i).StringValue(); + column->mutable_name()->assign(columnName); + + // assign column type + auto type = NConnector::GetColumnTypeByName(tableMeta->Schema, columnName); + *column->mutable_type() = type; + } + + if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) { + TStringBuilder err; + + if (!SerializeFilterPredicate(ctx, predicate, select.mutable_where()->mutable_filter_typed(), err)) { + throw yexception() << "Failed to serialize filter predicate for source: " << err; + } + } + } + } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.h b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h index 4370e4f5ece..dd271a245b0 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_utils.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h @@ -1,8 +1,16 @@ #pragma once -#include <yql/essentials/providers/common/proto/gateways_config.pb.h> -#include <yql/essentials/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h> namespace NYql { TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig); + + /// + /// Fill a select from a TGenSourceSettings + /// + void FillSelectFromGenSourceSettings(NConnector::NApi::TSelect& select, + const NNodes::TGenSourceSettings& settings, + TExprContext& ctx, + const TGenericState::TTableMeta* tableMeta); } // namespace NYql |