summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSlusarenko Igor <[email protected]>2025-10-13 14:09:59 +0300
committerGitHub <[email protected]>2025-10-13 14:09:59 +0300
commitb050143e845cbde7257bfc320c805fa8772b758f (patch)
treed5246e14f86b7f131ae3f8562288ec9f96968e33
parent685cab53a815e897c59717da19b9d39c62b010fc (diff)
Move listsplit after physical opt (#22085)
-rw-r--r--ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp4
-rw-r--r--ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp17
-rw-r--r--ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp214
-rw-r--r--ydb/library/yql/providers/common/pushdown/collection.cpp3
-rw-r--r--ydb/library/yql/providers/common/pushdown/settings.h1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp6
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h65
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/utils.cpp1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/utils.h1
-rw-r--r--ydb/library/yql/providers/generic/provider/ya.make4
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp616
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h10
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp147
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp342
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h10
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp5
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp79
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp16
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_state.cpp88
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_state.h55
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp38
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_utils.h12
22 files changed, 1567 insertions, 167 deletions
diff --git a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
index 2e2c8e514d0..a5c369de8ec 100644
--- a/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
+++ b/ydb/core/kqp/opt/kqp_query_blocks_transformer.cpp
@@ -85,7 +85,9 @@ public:
}
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
- return QueryBlockTransformer->ApplyAsyncChanges(input, output, ctx);
+ YQL_ENSURE(CurrentBlock < input->ChildrenSize());
+ output = input;
+ return QueryBlockTransformer->ApplyAsyncChanges(input->Child(CurrentBlock), output->ChildRef(CurrentBlock), ctx);
}
private:
diff --git a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp
index b85a14eac79..f5e152aca48 100644
--- a/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp
@@ -699,11 +699,14 @@ public:
.TypeMappingSettings(typeMappingSettings);
auto listSplitsBuilder = mockClient->ExpectListSplits();
- listSplitsBuilder
+ auto fillListSplitExpectation = listSplitsBuilder
.ValidateArgs(settings.ValidateListSplitsArgs)
.Select()
.DataSourceInstance(GetMockConnectorSourceInstance())
- .Table(settings.TableName);
+ .Table(settings.TableName)
+ .What();
+
+ FillMockConnectorRequestColumns(fillListSplitExpectation, settings.Columns);
for (ui64 i = 0; i < settings.DescribeCount; ++i) {
auto responseBuilder = describeTableBuilder.Response();
@@ -1944,6 +1947,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
));
{ // Prepare connector mock
+
const std::vector<TColumn> columns = {
{"fqdn", Ydb::Type::STRING},
{"payload", Ydb::Type::STRING}
@@ -1952,7 +1956,10 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
.TableName = ydbTable,
.Columns = columns,
.DescribeCount = 2,
- .ListSplitsCount = 2
+ // For stream queries type annotation is executed twice, but
+ // now List Split is done after type annotation optimization.
+ // That is why only single call to List Split is expected.
+ .ListSplitsCount = 1
});
const std::vector<std::string> fqdnColumn = {"host1.example.com", "host2.example.com", "host3.example.com"};
@@ -2054,7 +2061,9 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
.TableName = ydbTable,
.Columns = columns,
.DescribeCount = 2,
- .ListSplitsCount = 5,
+ // Now List Split is done after type annotation, that is the
+ // reason why this value equal to 4 not 5
+ .ListSplitsCount = 4,
.ValidateListSplitsArgs = false
});
diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp
index 316e3524188..32a3e8b82e0 100644
--- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp
@@ -145,6 +145,65 @@ namespace NKikimr::NKqp {
return databaseAsyncResolverMock;
}
+ ///
+ /// Fixture that prepares mocks and services for a provider.
+ ///
+ /// TODO:
+ /// Make it reusable, currently it fails if multiple
+ /// expects are applied to mock
+ ///
+ class TQueryExecutorFixture : public NUnitTest::TBaseFixture {
+ public:
+ TQueryExecutorFixture(EProviderType providerType)
+ : DataSourceInstance(MakeDataSourceInstance(providerType))
+ , ClientMock(std::make_shared<TConnectorClientMock>())
+ {
+ auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType);
+ auto appConfig = CreateDefaultAppConfig();
+ auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
+
+ Kikimr = MakeKikimrRunner(
+ false,
+ ClientMock,
+ databaseAsyncResolverMock,
+ appConfig,
+ s3ActorsFactory,
+ {.CredentialsFactory = CreateCredentialsFactory()}
+ );
+
+ CreateExternalDataSource(providerType, Kikimr);
+ QueryClient = Kikimr->GetQueryClient();
+ }
+
+ TQueryClient GetQueryClient() {
+ return *QueryClient;
+ }
+
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query) {
+ return GetQueryClient()
+ .ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings());
+ }
+
+ NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script) {
+ return GetQueryClient()
+ .ExecuteScript(script);
+ }
+
+ TConnectorClientMock::TSelectBuilder<> GetSelectBuilder() {
+ TConnectorClientMock::TSelectBuilder<> builder;
+ builder.DataSourceInstance(DataSourceInstance);
+ return builder;
+ }
+
+ public:
+ const NYql::TGenericDataSourceInstance DataSourceInstance;
+ std::shared_ptr<TConnectorClientMock> ClientMock;
+
+ protected:
+ std::shared_ptr<TKikimrRunner> Kikimr;
+ std::optional<TQueryClient> QueryClient;
+ };
+
Y_UNIT_TEST_SUITE(GenericFederatedQuery) {
void TestSelectAllFields(EProviderType providerType) {
// prepare mock
@@ -163,6 +222,9 @@ namespace NKikimr::NKqp {
clientMock->ExpectListSplits()
.Select()
.DataSourceInstance(dataSourceInstance)
+ .What()
+ .Column("col1", Ydb::Type::UINT16)
+ .Done()
.Done()
.Result()
.AddResponse(NewSuccess())
@@ -284,6 +346,8 @@ namespace NKikimr::NKqp {
clientMock->ExpectListSplits()
.Select()
.DataSourceInstance(dataSourceInstance)
+ .What()
+ .Done()
.Done()
.Result()
.AddResponse(NewSuccess())
@@ -399,6 +463,8 @@ namespace NKikimr::NKqp {
clientMock->ExpectListSplits()
.Select()
.DataSourceInstance(dataSourceInstance)
+ .What()
+ .Done()
.Done()
.Result()
.AddResponse(NewSuccess())
@@ -489,104 +555,122 @@ namespace NKikimr::NKqp {
TestSelectCount(EProviderType::IcebergHadoopToken);
}
- void TestFilterPushdown(EProviderType providerType) {
- // prepare mock
- auto clientMock = std::make_shared<TConnectorClientMock>();
-
- const NYql::TGenericDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
-
- // clang-format off
- const NApi::TSelect selectInListSplits = TConnectorClientMock::TSelectBuilder<>()
- .DataSourceInstance(dataSourceInstance).GetResult();
-
- const NApi::TSelect selectInReadSplits = TConnectorClientMock::TSelectBuilder<>()
- .DataSourceInstance(dataSourceInstance)
+ ///
+ /// Test a filter pushdown for a provider
+ ///
+ /// @param[in] providerType Provider's type
+ /// @param[in] where Where clause that will be appended to a sql query
+ /// @param[in] expectedWhere Where clause that will be expected in a list split and read split requests
+ ///
+ void TestFilterPushdown(EProviderType providerType, const TString& where, NApi::TSelect::TWhere& expectedWhere) {
+ auto f = std::make_shared<TQueryExecutorFixture>(providerType);
+ auto expectedSelect = f->GetSelectBuilder()
.What()
- .NullableColumn("data_column", Ydb::Type::STRING)
- .NullableColumn("filtered_column", Ydb::Type::INT32)
- .Done()
- .Where()
- .Filter()
- .Equal()
- .Column("filtered_column")
- .Value<i32>(42)
- .Done()
- .Done()
+ .NullableColumn("colDate", Ydb::Type::DATE)
+ .NullableColumn("colInt32", Ydb::Type::INT32)
+ .NullableColumn("colString", Ydb::Type::STRING)
.Done()
+ .Where(expectedWhere)
.GetResult();
- // clang-format on
// step 1: DescribeTable
- // clang-format off
- clientMock->ExpectDescribeTable()
- .DataSourceInstance(dataSourceInstance)
+ f->ClientMock->ExpectDescribeTable()
+ .DataSourceInstance(f->DataSourceInstance)
.TypeMappingSettings(MakeTypeMappingSettings(NYql::NConnector::NApi::STRING_FORMAT))
.Response()
- .NullableColumn("filtered_column", Ydb::Type::INT32)
- .NullableColumn("data_column", Ydb::Type::STRING);
- // clang-format on
+ .NullableColumn("colDate", Ydb::Type::DATE)
+ .NullableColumn("colInt32", Ydb::Type::INT32)
+ .NullableColumn("colString", Ydb::Type::STRING);
// step 2: ListSplits
- // clang-format off
- clientMock->ExpectListSplits()
- .Select(selectInListSplits)
+ f->ClientMock->ExpectListSplits()
+ .Select(expectedSelect)
.Result()
.AddResponse(NewSuccess())
.Description("some binary description")
- .Select(selectInReadSplits);
- // clang-format on
+ .Select(expectedSelect);
// step 3: ReadSplits
- // Return data such that it contains values not satisfying the filter conditions.
- // Then check that, despite that connector reads additional data,
- // our generic provider then filters it out.
- std::vector<std::string> colData = {"Filtered text", "Text"};
- std::vector<i32> filterColumnData = {42, 24};
- // clang-format off
- clientMock->ExpectReadSplits()
+ std::vector<std::string> colString = {"Filtered text", "Text"};
+ std::vector<ui16> colDate = {20326, 20329};
+ std::vector<i32> colInt32 = {42, 24};
+
+ f->ClientMock->ExpectReadSplits()
.Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL)
.Split()
.Description("some binary description")
- .Select(selectInReadSplits)
+ .Select(expectedSelect)
.Done()
.Result()
.AddResponse(MakeRecordBatch(
- MakeArray<arrow::BinaryBuilder>("data_column", colData, arrow::binary()),
- MakeArray<arrow::Int32Builder>("filtered_column", filterColumnData, arrow::int32())),
+ MakeArray<arrow::UInt16Builder>("colDate", colDate, arrow::uint16()),
+ MakeArray<arrow::Int32Builder>("colInt32", colInt32, arrow::int32()),
+ MakeArray<arrow::BinaryBuilder>("colString", colString, arrow::binary())),
NewSuccess());
- // clang-format on
-
- // prepare database resolver mock
- auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType);
-
- // run test
- auto appConfig = CreateDefaultAppConfig();
- auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
- auto kikimr = MakeKikimrRunner(false, clientMock, databaseAsyncResolverMock, appConfig, s3ActorsFactory,
- {.CredentialsFactory = CreateCredentialsFactory()});
-
- CreateExternalDataSource(providerType, kikimr);
const TString query = fmt::format(
R"(
PRAGMA generic.UsePredicatePushdown="true";
- SELECT data_column FROM {data_source_name}.{table_name} WHERE filtered_column = 42;
+ SELECT colDate, colInt32, colString FROM {data_source_name}.{table_name} WHERE {table_where};
)",
"data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
- "table_name"_a = DEFAULT_TABLE);
+ "table_name"_a = DEFAULT_TABLE,
+ "table_where"_a = where
+ );
- auto db = kikimr->GetQueryClient();
- auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
+ auto queryResult = f->ExecuteQuery(query).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());
+ // Check a query result
TResultSetParser resultSet(queryResult.GetResultSetParser(0));
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3);
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
- // check every row
- // Check that, despite returning nonfiltered data in connector, response will be correct
- std::vector<TMaybe<TString>> result = {"Filtered text"}; // Only data satisfying filter conditions
- MATCH_RESULT_WITH_INPUT(result, resultSet, GetOptionalString);
+ // Check values for the query result
+ std::vector<std::optional<TInstant>> colDateResults = {TInstant::Days(20326)};
+ std::vector<std::optional<int>> colInt32Result = {42};
+ std::vector<std::optional<TString>> colStringResult = {"Filtered text"};
+
+ for (size_t i = 0; i < colDateResults.size(); ++i) {
+ resultSet.TryNextRow();
+
+ MATCH_OPT_RESULT_WITH_VAL_IDX(colDateResults[i], resultSet, GetOptionalDate, 0);
+ MATCH_OPT_RESULT_WITH_VAL_IDX(colInt32Result[i], resultSet, GetOptionalInt32, 1);
+ MATCH_OPT_RESULT_WITH_VAL_IDX(colStringResult[i], resultSet, GetOptionalString, 2);
+ }
+ }
+
+ ///
+ /// Test a filter pushdown for a provider
+ ///
+ /// @param[in] providerType Provider's type
+ ///
+ void TestFilterPushdown(EProviderType providerType) {
+ using namespace NYql::NConnector::NTest;
+
+ auto expectedWhereInt = TConnectorClientMock::TWhereBuilder<>()
+ .Filter().Equal()
+ .Column("colInt32")
+ .Value<i32>(42)
+ .Done()
+ .Done()
+ .GetResult();
+
+ TestFilterPushdown(providerType, "colInt32 = 42", expectedWhereInt);
+ TestFilterPushdown(providerType, "colInt32 = EvaluateExpr(44 - 2)", expectedWhereInt);
+ TestFilterPushdown(providerType, "colInt32 = 44 - 2", expectedWhereInt);
+
+ auto expectedWhereDate = TConnectorClientMock::TWhereBuilder<>()
+ .Filter().Equal()
+ .Column("colDate")
+ .Value<ui32>(20326, ::Ydb::Type::DATE)
+ .Done()
+ .Done()
+ .GetResult();
+
+ TestFilterPushdown(providerType, "colDate = Date('2025-08-26')", expectedWhereDate);
+ TestFilterPushdown(providerType, "colDate = EvaluateExpr(Date('2025-08-27') - Interval(\"P1D\"))", expectedWhereDate);
+ TestFilterPushdown(providerType, "colDate = Date('2025-08-27') - Interval(\"P1D\")", expectedWhereDate);
}
Y_UNIT_TEST(PostgreSQLFilterPushdown) {
diff --git a/ydb/library/yql/providers/common/pushdown/collection.cpp b/ydb/library/yql/providers/common/pushdown/collection.cpp
index a8770902431..9cbfef2beb8 100644
--- a/ydb/library/yql/providers/common/pushdown/collection.cpp
+++ b/ydb/library/yql/providers/common/pushdown/collection.cpp
@@ -201,6 +201,9 @@ private:
node.Maybe<TCoUint64>()) {
return true;
}
+ if (Settings.IsEnabled(EFlag::DateCtor) && node.Maybe<TCoDate>()) {
+ return true;
+ }
if (Settings.IsEnabled(EFlag::TimestampCtor) && node.Maybe<TCoTimestamp>()) {
return true;
}
diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h
index cdd4c4fd984..66aa009b169 100644
--- a/ydb/library/yql/providers/common/pushdown/settings.h
+++ b/ydb/library/yql/providers/common/pushdown/settings.h
@@ -46,6 +46,7 @@ struct TSettings {
MinMax = 1 << 27,
NonDeterministic = 1 << 28,
DecimalCtor = 1 << 29,
+ DateCtor = 1 << 30,
};
explicit TSettings(NLog::EComponent logComponent)
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
index ed92d42b65a..fe0c446b68e 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
@@ -29,6 +29,12 @@ namespace NYql::NConnector::NTest {
DEFINE_SIMPLE_TYPE_SETTER(i64, INT64, int64_value);
DEFINE_SIMPLE_TYPE_SETTER(ui64, UINT64, uint64_value);
+ template <>
+ void SetValue(const ui32& value, Ydb::TypedValue* proto, const ::Ydb::Type::PrimitiveTypeId& typeId, bool optional) {
+ *proto->mutable_type() = MakeYdbType(typeId, optional);
+ proto->mutable_value()->Y_CAT(set_, uint32_value)(value);
+ }
+
void CreatePostgreSQLExternalDataSource(
const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
const TString& dataSourceName,
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
index 897a4a156ab..5cc999ce544 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
@@ -63,9 +63,20 @@ namespace NYql::NConnector::NTest {
}
MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") {
- Cerr << "CRAB Expected: " << expected.DebugString() << Endl;
- Cerr << "CRAB Actual: " << arg.DebugString() << Endl;
- return google::protobuf::util::MessageDifferencer::Equals(arg, expected);
+ Cerr << "GENERIC-CONNECTOR-MOCK Expected: " << expected.DebugString() << Endl;
+ Cerr << "GENERIC-CONNECTOR-MOCK Actual: " << arg.DebugString() << Endl;
+
+ google::protobuf::util::MessageDifferencer differencer;
+ TString differences;
+ differencer.ReportDifferencesToString(&differences);
+
+ bool result = differencer.Compare(arg, expected);
+
+ if (!result) {
+ Cerr << "GENERIC-CONNECTOR-MOCK Differences:" << Endl << differences << Endl;
+ }
+
+ return result;
}
MATCHER_P(RequestRelaxedMatcher, expected, "") {
@@ -73,14 +84,26 @@ namespace NYql::NConnector::NTest {
return true;
}
-#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \
- { \
- for (const auto& val : INPUT) { \
- UNIT_ASSERT(RESULT_SET.TryNextRow()); \
- UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(0).GETTER(), val); \
- } \
+#define MATCH_OPT_RESULT_WITH_VAL_IDX(VAL, RESULT_SET, GETTER, INDEX) \
+ { \
+ auto r = RESULT_SET.ColumnParser(INDEX).GETTER(); \
+ UNIT_ASSERT_VALUES_EQUAL(r.has_value(), VAL.has_value()); \
+ if (r.has_value()) { \
+ UNIT_ASSERT_VALUES_EQUAL(*r, *VAL); \
+ } \
}
+#define MATCH_RESULT_WITH_INPUT_IDX(INPUT, RESULT_SET, GETTER, INDEX) \
+ { \
+ for (const auto& val : INPUT) { \
+ UNIT_ASSERT(RESULT_SET.TryNextRow()); \
+ UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(INDEX).GETTER(), val); \
+ } \
+ }
+
+#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER)\
+ MATCH_RESULT_WITH_INPUT_IDX(INPUT, RESULT_SET, GETTER, 0)
+
// Make arrow array for one column.
// Returns field for schema and array with data.
// Designed for call with MakeRecordBatch function.
@@ -129,6 +152,10 @@ namespace NYql::NConnector::NTest {
template <class T>
void SetSimpleValue(const T& value, Ydb::TypedValue* proto, bool optional = false);
+ template <class T>
+ void SetValue(const T& value, Ydb::TypedValue* proto,
+ const ::Ydb::Type::PrimitiveTypeId& typeId, bool optional = false);
+
template <class TParent>
struct TWithParentBuilder {
explicit TWithParentBuilder(TParent* parent)
@@ -426,11 +453,23 @@ namespace NYql::NConnector::NTest {
}
template <class T>
+ TBuilder& Value(const T& value, const ::Ydb::Type::PrimitiveTypeId& typeId) {
+ SetValue(value, this->Result_->mutable_typed_value(), typeId);
+ return *this;
+ }
+
+ template <class T>
TBuilder& OptionalValue(const T& value) {
SetSimpleValue(value, this->Result_->mutable_typed_value(), true);
return *this;
}
+ template <class T>
+ TBuilder& OptionalValue(const T& value, const ::Ydb::Type::PrimitiveTypeId& typeId) {
+ SetValue(value, this->Result_->mutable_typed_value(), typeId, true);
+ return *this;
+ }
+
void FillWithDefaults() {
}
};
@@ -481,10 +520,18 @@ namespace NYql::NConnector::NTest {
return Arg().Value(value).Done();
}
+ TBuilder& Value(const auto& value, const ::Ydb::Type::PrimitiveTypeId& typeId) {
+ return Arg().Value(value, typeId).Done();
+ }
+
TBuilder& OptionalValue(const auto& value) {
return Arg().OptionalValue(value).Done();
}
+ TBuilder& OptionalValue(const auto& value, const ::Ydb::Type::PrimitiveTypeId& typeId) {
+ return Arg().OptionalValue(value, typeId).Done();
+ }
+
void FillWithDefaults() {
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
index 998be041016..f6968585dbc 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
@@ -110,4 +110,5 @@ namespace NYql::NConnector {
return res->type();
}
+
} // namespace NYql::NConnector
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.h b/ydb/library/yql/providers/generic/connector/libcpp/utils.h
index 8f86ae643c7..7cfbb16a9b9 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/utils.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.h
@@ -13,5 +13,4 @@ namespace NYql::NConnector {
std::shared_ptr<arrow::RecordBatch> ReadSplitsResponseToArrowRecordBatch(const NApi::TReadSplitsResponse& resp);
Ydb::Type GetColumnTypeByName(const NApi::TSchema& schema, const TString& name);
-
} // namespace NYql::NConnector
diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make
index 5f28b0a2558..2a46177e42d 100644
--- a/ydb/library/yql/providers/generic/provider/ya.make
+++ b/ydb/library/yql/providers/generic/provider/ya.make
@@ -7,8 +7,12 @@ SRCS(
yql_generic_datasink_type_ann.cpp
yql_generic_datasource.cpp
yql_generic_datasource_type_ann.cpp
+ yql_generic_describe_table.cpp
+ yql_generic_describe_table.h
yql_generic_dq_integration.cpp
yql_generic_io_discovery.cpp
+ yql_generic_list_splits.cpp
+ yql_generic_list_splits.h
yql_generic_load_meta.cpp
yql_generic_logical_opt.cpp
yql_generic_mkql_compiler.cpp
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp
new file mode 100644
index 00000000000..8565df5520d
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.cpp
@@ -0,0 +1,616 @@
+#include "yql_generic_describe_table.h"
+
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/providers/common/structured_token/yql_token_builder.h>
+#include <yql/essentials/providers/common/provider/yql_provider.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+#include <yql/essentials/minikql/mkql_type_ops.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/mkql_alloc.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node.h>
+#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
+#include <yql/essentials/core/yql_graph_transformer.h>
+#include <yql/essentials/core/yql_expr_optimize.h>
+#include <yql/essentials/core/type_ann/type_ann_expr.h>
+#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <yql/essentials/ast/yql_type_string.h>
+#include <yql/essentials/ast/yql_expr.h>
+#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
+#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
+#include <ydb/core/external_sources/iceberg_fields.h>
+#include <library/cpp/json/json_reader.h>
+
+namespace NYql {
+
+using namespace NNodes;
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+
+class TGenericDescribeTableTransformer : public TGraphTransformerBase {
+ struct TTableDescription {
+ using TPtr = std::shared_ptr<TTableDescription>;
+
+ NYql::TGenericDataSourceInstance DataSourceInstance;
+ std::optional<NConnector::NApi::TSchema> Schema;
+ // Issues that could occur at any phase of network interaction with Connector
+ TIssues Issues;
+ };
+
+ using TTableDescriptionMap =
+ std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>;
+
+public:
+ explicit TGenericDescribeTableTransformer(TGenericState::TPtr state);
+
+public:
+ TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
+
+ NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode&) final {
+ return AsyncFuture_;
+ }
+
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
+
+ void Rewind() final;
+
+private:
+ TIssues DescribeTableFromConnector(const TGenericState::TTableAddress& tableAddress,
+ std::vector<NThreading::TFuture<void>>& handles);
+
+ TIssues FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request,
+ const TGenericClusterConfig& clusterConfig, const TString& tablePath);
+
+ void FillCredentials(NConnector::NApi::TDescribeTableRequest& request,
+ const TGenericClusterConfig& clusterConfig);
+
+ void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request);
+
+private:
+ const TGenericState::TPtr State_;
+ TTableDescriptionMap TableDescriptions_;
+ NThreading::TFuture<void> AsyncFuture_;
+};
+
+TGenericDescribeTableTransformer::TGenericDescribeTableTransformer(TGenericState::TPtr state)
+ : State_(std::move(state))
+{ }
+
+void TGenericDescribeTableTransformer::FillCredentials(NConnector::NApi::TDescribeTableRequest& request,
+ const TGenericClusterConfig& clusterConfig) {
+ auto dsi = request.mutable_data_source_instance();
+
+ // If login/password is provided, just copy them into request:
+ // connector will use Basic Auth to access external data sources.
+ if (clusterConfig.GetCredentials().Hasbasic()) {
+ *dsi->mutable_credentials() = clusterConfig.GetCredentials();
+ return;
+ }
+
+ // If there are no Basic Auth parameters, two options can be considered:
+ // 1. Client provided own IAM-token to access external data source
+ auto iamToken = State_->Types->Credentials->FindCredentialContent(
+ "default_" + clusterConfig.name(), "default_generic", clusterConfig.GetToken());
+ if (iamToken) {
+ *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
+ *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
+ return;
+ }
+
+ // 2. Client provided service account creds that must be converted into IAM-token
+ Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized");
+ auto structuredTokenJSON = TStructuredTokenBuilder()
+ .SetServiceAccountIdAuth(clusterConfig.GetServiceAccountId(),
+ clusterConfig.GetServiceAccountIdSignature())
+ .ToJson();
+ Y_ENSURE(structuredTokenJSON, "empty structured token");
+
+ // Create provider or get existing one.
+ // It's crucial to reuse providers because their construction implies synchronous IO.
+ auto providersIt = State_->CredentialProviders.find(clusterConfig.name());
+ if (providersIt == State_->CredentialProviders.end()) {
+ auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
+ State_->CredentialsFactory, structuredTokenJSON, false);
+
+ providersIt =
+ State_->CredentialProviders
+ .emplace(clusterConfig.name(), credentialsProviderFactory->CreateProvider())
+ .first;
+ }
+
+ iamToken = providersIt->second->GetAuthInfo();
+ Y_ENSURE(iamToken, "empty IAM token");
+ *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
+ *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
+}
+
+template <typename T>
+void SetSchema(T& request, const TGenericClusterConfig& clusterConfig) {
+ TString schema;
+ const auto it = clusterConfig.GetDataSourceOptions().find("schema");
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ schema = it->second;
+ }
+ if (!schema) {
+ schema = "public";
+ }
+ request.set_schema(schema);
+}
+
+void SetOracleServiceName(NYql::TOracleDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) {
+ const auto it = clusterConfig.GetDataSourceOptions().find("service_name");
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ options.set_service_name(it->second);
+ }
+}
+
+void SetLoggingFolderId(NYql::TLoggingDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) {
+ const auto it = clusterConfig.GetDataSourceOptions().find("folder_id");
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ options.set_folder_id(it->second);
+ }
+}
+
+TString GetIcebergOptionValue(const ::google::protobuf::Map<TProtoStringType,
+ TProtoStringType>& options, TString option) {
+ auto it = options.find(option);
+ if (options.end() == it) {
+ throw yexception()
+ << "Cluster config for an Iceberg data source"
+ << " is missing option: "
+ << option;
+ }
+ return it->second;
+}
+
+///
+/// Fill options into DatSourceOptions specific for an iceberg data type
+///
+void SetIcebergOptions(NYql::TIcebergDataSourceOptions& dataSourceOptions, const TGenericClusterConfig& clusterConfig) {
+ using namespace NKikimr::NExternalSource::NIceberg;
+ const auto& clusterOptions = clusterConfig.GetDataSourceOptions();
+ auto warehouseType = GetIcebergOptionValue(clusterOptions, WAREHOUSE_TYPE);
+
+ if (VALUE_S3 != warehouseType) {
+ throw yexception() << "Unexpected warehouse type: " << warehouseType;
+ }
+
+ auto endpoint = GetIcebergOptionValue(clusterOptions, WAREHOUSE_S3_ENDPOINT);
+ auto region = GetIcebergOptionValue(clusterOptions, WAREHOUSE_S3_REGION);
+ auto uri = GetIcebergOptionValue(clusterOptions, WAREHOUSE_S3_URI);
+ auto& s3 = *dataSourceOptions.mutable_warehouse()->mutable_s3();
+
+ s3.set_endpoint(endpoint);
+ s3.set_region(region);
+ s3.set_uri(uri);
+
+ auto catalogType = GetIcebergOptionValue(clusterOptions, CATALOG_TYPE);
+ auto& catalog = *dataSourceOptions.mutable_catalog();
+
+ // set catalog options
+ if (VALUE_HADOOP == catalogType) {
+ // hadoop nothing yet
+ catalog.mutable_hadoop();
+ } else if (VALUE_HIVE_METASTORE == catalogType) {
+ auto uri = GetIcebergOptionValue(clusterOptions, CATALOG_HIVE_METASTORE_URI);
+ catalog.mutable_hive_metastore()->set_uri(uri);
+ } else {
+ throw yexception() << "Unexpected catalog type: " << catalogType;
+ }
+}
+
+TIssues SetMongoDBOptions(NYql::TMongoDbDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) {
+ TIssues issues;
+ auto it = clusterConfig.GetDataSourceOptions().find("reading_mode");
+
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ TMongoDbDataSourceOptions_EReadingMode value = TMongoDbDataSourceOptions::READING_MODE_UNSPECIFIED;
+ if (!TMongoDbDataSourceOptions_EReadingMode_Parse(it->second, &value)) {
+ issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB reading_mode: " << it->second));
+ }
+ options.set_reading_mode(value);
+ }
+
+ it = clusterConfig.GetDataSourceOptions().find("unexpected_type_display_mode");
+
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode value = TMongoDbDataSourceOptions::UNEXPECTED_UNSPECIFIED;
+ if (!TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode_Parse(it->second, &value)) {
+ issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unexpected_type_display_mode: " << it->second));
+ }
+ options.set_unexpected_type_display_mode(value);
+ }
+
+ it = clusterConfig.GetDataSourceOptions().find("unsupported_type_display_mode");
+
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode value = TMongoDbDataSourceOptions::UNSUPPORTED_UNSPECIFIED;
+ if (!TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode_Parse(it->second, &value)) {
+ issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unsupported_type_display_mode: " << it->second));
+ }
+ options.set_unsupported_type_display_mode(value);
+ }
+
+ return issues;
+}
+
+TIssues FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request,
+ const TGenericClusterConfig& clusterConfig) {
+ const auto dataSourceKind = clusterConfig.GetKind();
+
+ switch (dataSourceKind) {
+ case NYql::EGenericDataSourceKind::CLICKHOUSE:
+ break;
+ case NYql::EGenericDataSourceKind::YDB:
+ break;
+ case NYql::EGenericDataSourceKind::MYSQL:
+ break;
+ case NYql::EGenericDataSourceKind::GREENPLUM: {
+ auto* options = request.mutable_data_source_instance()->mutable_gp_options();
+ SetSchema(*options, clusterConfig);
+ } break;
+ case NYql::EGenericDataSourceKind::MS_SQL_SERVER:
+ break;
+ case NYql::EGenericDataSourceKind::POSTGRESQL: {
+ auto* options = request.mutable_data_source_instance()->mutable_pg_options();
+ SetSchema(*options, clusterConfig);
+ } break;
+ case NYql::EGenericDataSourceKind::ORACLE: {
+ auto* options = request.mutable_data_source_instance()->mutable_oracle_options();
+ SetOracleServiceName(*options, clusterConfig);
+ } break;
+ case NYql::EGenericDataSourceKind::LOGGING: {
+ auto* options = request.mutable_data_source_instance()->mutable_logging_options();
+ SetLoggingFolderId(*options, clusterConfig);
+ } break;
+ case NYql::EGenericDataSourceKind::ICEBERG: {
+ auto* options = request.mutable_data_source_instance()->mutable_iceberg_options();
+ SetIcebergOptions(*options, clusterConfig);
+ } break;
+ case NYql::EGenericDataSourceKind::REDIS:
+ break;
+ case NYql::EGenericDataSourceKind::PROMETHEUS:
+ break;
+ case NYql::EGenericDataSourceKind::MONGO_DB: {
+ auto* options = request.mutable_data_source_instance()->mutable_mongodb_options();
+ return SetMongoDBOptions(*options, clusterConfig);
+ } break;
+ case NYql::EGenericDataSourceKind::OPENSEARCH:
+ break;
+ default:
+ throw yexception() << "Unexpected data source kind: '"
+ << NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'";
+ }
+
+ return TIssues{};
+}
+
+void FillTablePath(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig,
+ const TString& tablePath) {
+ request.mutable_data_source_instance()->set_database(clusterConfig.GetDatabaseName());
+ request.set_table(tablePath);
+}
+
+void TGenericDescribeTableTransformer::FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) {
+ const TString dateTimeFormat =
+ State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat);
+
+ if (dateTimeFormat == "string") {
+ request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::STRING_FORMAT);
+ } else if (dateTimeFormat == "YQL") {
+ request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::YQL_FORMAT);
+ } else {
+ throw yexception() << "Unexpected date/time format: '" << dateTimeFormat << "'";
+ }
+}
+
+IGraphTransformer::TStatus TGenericDescribeTableTransformer::DoTransform(TExprNode::TPtr input,
+ TExprNode::TPtr& output,
+ TExprContext& ctx) {
+ output = input;
+
+ if (ctx.Step.IsDone(TExprStep::LoadTablesMetadata)) {
+ return TStatus::Ok;
+ }
+
+ const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
+ if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
+ return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName;
+ }
+ return false;
+ });
+
+ if (reads.empty()) {
+ return TStatus::Ok;
+ }
+
+ std::unordered_set<TTableDescriptionMap::key_type, TTableDescriptionMap::hasher> pendingRequests;
+
+ // Iterate over all read table queries in the input expression,
+ // create Describe requests for unique tables
+ for (const auto& r : reads) {
+ const TGenRead read(r);
+
+ if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) {
+ ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key"));
+ return TStatus::Error;
+ }
+
+ const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe<TCoKey>();
+
+ if (!maybeKey) {
+ ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key"));
+ return TStatus::Error;
+ }
+
+ const auto& keyArg = maybeKey.Cast().Ref().Head();
+
+ if (!keyArg.IsList() || keyArg.ChildrenSize() != 2U || !keyArg.Head().IsAtom("table") ||
+ !keyArg.Tail().IsCallable(TCoString::CallableName())) {
+ ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), "Expected single table name"));
+ return TStatus::Error;
+ }
+
+ const auto clusterName = read.DataSource().Cluster().StringValue();
+ const auto tableName = TString(keyArg.Tail().Head().Content());
+ auto tableAddress = TGenericState::TTableAddress(clusterName, tableName);
+
+ // Table meta has already been acquired
+ if (State_->HasTable(tableAddress)) {
+ continue;
+ }
+
+ if (pendingRequests.insert(tableAddress).second) {
+ YQL_CLOG(INFO, ProviderGeneric) << "Describe table for: `" << tableAddress.ToString() << "`";
+ }
+ }
+
+ if (pendingRequests.empty()) {
+ return TStatus::Ok;
+ }
+
+ std::vector<NThreading::TFuture<void>> handles;
+ handles.reserve(pendingRequests.size());
+ TableDescriptions_.reserve(pendingRequests.size());
+
+ for (const auto& tableAddress : pendingRequests) {
+ auto tIssues = DescribeTableFromConnector(tableAddress, handles);
+
+ if (!tIssues.Empty()) {
+ ctx.AddError(TIssue(tIssues.ToString()));
+ return TStatus::Error;
+ }
+ }
+
+ if (handles.empty()) {
+ return TStatus::Ok;
+ }
+
+ AsyncFuture_ = NThreading::WaitExceptionOrAll(handles);
+ return TStatus::Async;
+}
+
+TIssues TGenericDescribeTableTransformer::DescribeTableFromConnector(const TGenericState::TTableAddress& tableAddress,
+ std::vector<NThreading::TFuture<void>>& handles) {
+ const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(tableAddress.ClusterName);
+
+ YQL_ENSURE(
+ State_->Configuration->ClusterNamesToClusterConfigs.cend() != it,
+ "cluster not found: " << tableAddress.ClusterName
+ );
+
+ // Preserve data source instance for the further usage
+ auto emplaceIt = TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>());
+ auto desc = emplaceIt.first->second;
+ NConnector::NApi::TDescribeTableRequest request;
+ auto issues = FillDescribeTableRequest(request, it->second, tableAddress.TableName);
+
+ if (!issues.Empty()) {
+ return issues;
+ }
+
+ auto promise = NThreading::NewPromise();
+ handles.emplace_back(promise.GetFuture());
+ desc->DataSourceInstance = request.data_source_instance();
+
+ Y_ENSURE(State_->GenericClient);
+
+ State_->GenericClient->DescribeTable(request, State_->Configuration->DescribeTableTimeout).Subscribe(
+ [desc, tableAddress, promise, client = State_->GenericClient](const NConnector::TDescribeTableAsyncResult& f1) mutable {
+ NConnector::TDescribeTableAsyncResult f2(f1);
+ auto result = f2.ExtractValueSync();
+
+ // Check transport error
+ if (!result.Status.Ok()) {
+ desc->Issues.AddIssue(TStringBuilder()
+ << "Call DescribeTable for table " << tableAddress.ToString() << ": "
+ << result.Status.ToDebugString());
+ promise.SetValue();
+ return;
+ }
+
+ // Check logical error
+ if (!NConnector::IsSuccess(*result.Response)) {
+ desc->Issues.AddIssues(NConnector::ErrorToIssues(
+ result.Response->error(),
+ TStringBuilder() << "Call DescribeTable for table " << tableAddress.ToString() << ": "));
+ promise.SetValue();
+ return;
+ }
+
+ // Preserve schema for the further usage
+ desc->Schema = result.Response->schema();
+ promise.SetValue();
+ });
+
+ return {};
+}
+
+TIssues TGenericDescribeTableTransformer::FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request,
+ const TGenericClusterConfig& clusterConfig,
+ const TString& tablePath) {
+ const auto dataSourceKind = clusterConfig.GetKind();
+ auto dsi = request.mutable_data_source_instance();
+
+ *dsi->mutable_endpoint() = clusterConfig.GetEndpoint();
+ dsi->set_kind(dataSourceKind);
+ dsi->set_use_tls(clusterConfig.GetUseSsl());
+ dsi->set_protocol(clusterConfig.GetProtocol());
+
+ FillCredentials(request, clusterConfig);
+ FillTypeMappingSettings(request);
+
+ auto issues = FillDataSourceOptions(request, clusterConfig);
+
+ if (!issues.Empty()) {
+ return issues;
+ }
+
+ FillTablePath(request, clusterConfig, tablePath);
+ return {};
+}
+
+TIssues ParseTableMeta(
+ TExprContext& ctx,
+ const TPosition& pos,
+ const TGenericState::TTableAddress& tableAddress,
+ TGenericState::TTableMeta& tableMeta
+) try {
+ TVector<const TItemExprType*> items;
+
+ const auto& columns = tableMeta.Schema.columns();
+
+ if (columns.empty()) {
+ TIssues issues;
+ issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << tableAddress.ToString() << " doesn't exist."));
+ return issues;
+ }
+
+ for (const auto& column : columns) {
+ // Make type annotation
+ NYdb::TTypeParser parser(column.type());
+ auto typeAnnotation = NFq::MakeType(parser, ctx);
+
+ // Create items from graph
+ items.emplace_back(ctx.MakeType<TItemExprType>(column.name(), typeAnnotation));
+ tableMeta.ColumnOrder.emplace_back(column.name());
+ }
+
+ tableMeta.ItemType = ctx.MakeType<TStructExprType>(items);
+ return TIssues{};
+} catch (std::exception&) {
+ TIssues issues;
+ issues.AddIssue(TIssue(pos, TStringBuilder() << "Failed to parse table metadata: " << CurrentExceptionMessage()));
+ return issues;
+}
+
+TExprNode::TPtr MakeTableMetaNode(
+ TExprContext& ctx,
+ const TGenRead& read,
+ const TString& tableName
+) {
+ // clang-format off
+ auto row = Build<TCoArgument>(ctx, read.Pos())
+ .Name("row")
+ .Done();
+
+ auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
+ .Args({row})
+ .Body<TCoBool>()
+ .Literal().Build("true")
+ .Build()
+ .Done().Ptr();
+
+ auto table = Build<TGenTable>(ctx, read.Pos())
+ .Name().Value(tableName).Build()
+ .Done();
+
+ return Build<TGenReadTable>(ctx, read.Pos())
+ .World(read.World())
+ .DataSource(read.DataSource())
+ .Table(table)
+ .Columns<TCoVoid>().Build()
+ .FilterPredicate(emptyPredicate)
+ .Done().Ptr();
+ // clang-format on
+}
+
+IGraphTransformer::TStatus TGenericDescribeTableTransformer::DoApplyAsyncChanges(TExprNode::TPtr input,
+ TExprNode::TPtr& output,
+ TExprContext& ctx) {
+ AsyncFuture_.GetValue();
+
+ const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
+ if (const auto maybeRead = TMaybeNode<TGenRead>(node)) {
+ return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName;
+ }
+ return false;
+ });
+
+ if (!reads.size()) {
+ return TStatus::Ok;
+ }
+
+ TNodeOnNodeOwnedMap replaces(reads.size());
+
+ // Iterate over all read table queries, check Connector responses
+ for (const auto& r : reads) {
+ const TGenRead genRead(r);
+ const auto clusterName = genRead.DataSource().Cluster().StringValue();
+ const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast<TCoKey>().Ref().Head();
+ const auto tableName = TString(keyArg.Tail().Head().Content());
+ const TGenericState::TTableAddress tableAddress{clusterName, tableName};
+
+ // Find appropriate response
+ auto iter = TableDescriptions_.find(tableAddress);
+ if (iter == TableDescriptions_.end()) {
+ ctx.AddError(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder()
+ << "Connector response not found for table "
+ << tableAddress.ToString()));
+
+ return TStatus::Error;
+ }
+
+ auto& result = iter->second;
+
+ // If errors occurred during network interaction with Connector, return them
+ if (result->Issues) {
+ for (const auto& issue : result->Issues) {
+ ctx.AddError(issue);
+ }
+
+ return TStatus::Error;
+ }
+
+ Y_ENSURE(result->Schema);
+
+ TGenericState::TTableMeta tableMeta;
+ tableMeta.Schema = *result->Schema;
+ tableMeta.DataSourceInstance = result->DataSourceInstance;
+
+ // Parse table schema
+ ParseTableMeta(ctx, ctx.GetPosition(genRead.Pos()), tableAddress, tableMeta);
+
+ // Fill AST for a table
+ if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) {
+ ins.first->second = MakeTableMetaNode(ctx, genRead, tableName);
+ }
+
+ // Save table metadata into provider state
+ State_->AddTable(tableAddress, std::move(tableMeta));
+ }
+
+ return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
+}
+
+void TGenericDescribeTableTransformer::Rewind() {
+ TableDescriptions_.clear();
+ AsyncFuture_ = {};
+}
+
+THolder<IGraphTransformer> CreateGenericDescribeTableTransformer(TGenericState::TPtr state) {
+ return MakeHolder<TGenericDescribeTableTransformer>(std::move(state));
+}
+
+} // NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h
new file mode 100644
index 00000000000..83e9c667406
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_describe_table.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <yql/essentials/core/yql_graph_transformer.h>
+#include "yql_generic_state.h"
+
+namespace NYql {
+
+THolder<IGraphTransformer> CreateGenericDescribeTableTransformer(TGenericState::TPtr state);
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index a05d920cebc..f7f54ec759b 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -9,6 +9,7 @@
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/generic/proto/partition.pb.h>
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_utils.h>
#include <ydb/library/yql/utils/plan/plan_utils.h>
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/providers/common/dq/yql_dq_integration_impl.h>
@@ -115,6 +116,28 @@ namespace NYql {
return read;
}
+ ///
+ /// Fill a select from a dq source
+ ///
+ void FillSelect(NConnector::NApi::TSelect& select, const TDqSource& source, TExprContext& ctx) {
+ const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>();
+
+ if (!maybeSettings) {
+ return;
+ }
+
+ const auto settings = maybeSettings.Cast();
+ const auto& tableName = settings.Table().StringValue();
+ const auto& clusterName = source.DataSource().Cast<TGenDataSource>().Cluster().StringValue();
+ auto [tableMeta, issues] = State_->GetTable({clusterName, tableName});
+
+ if (issues) {
+ throw yexception() << "Get table metadata: " << issues.ToOneLineString();
+ }
+
+ FillSelectFromGenSourceSettings(select, settings, ctx, tableMeta);
+ }
+
ui64 Partition(
const TExprNode& node,
TVector<TString>& partitions,
@@ -145,7 +168,12 @@ namespace NYql {
return 0;
}
- const size_t totalSplits = tableMeta->Splits.size();
+ NConnector::NApi::TSelect select;
+ FillSelect(select, TDqSource(&node), ctx);
+
+ auto selectKey = tableAddress.MakeKeyFor(select);
+ auto splits = tableMeta->GetSplitsForSelect(selectKey);
+ const size_t totalSplits = splits.size();
partitions.clear();
@@ -153,7 +181,7 @@ namespace NYql {
// If there are not too many splits, simply make a single-split partitions.
for (size_t i = 0; i < totalSplits; i++) {
Generic::TPartition partition;
- *partition.add_splits() = tableMeta->Splits[i];
+ *partition.add_splits() = splits[i];
TString partitionStr;
YQL_ENSURE(partition.SerializeToString(&partitionStr), "Failed to serialize partition");
partitions.emplace_back(std::move(partitionStr));
@@ -166,7 +194,7 @@ namespace NYql {
for (size_t i = 0; i < totalSplits; i += splitsPerPartition) {
Generic::TPartition partition;
for (size_t j = i; j < i + splitsPerPartition && j < totalSplits; j++) {
- *partition.add_splits() = tableMeta->Splits[j];
+ *partition.add_splits() = splits[j];
}
TString partitionStr;
YQL_ENSURE(partition.SerializeToString(&partitionStr), "Failed to serialize partition");
@@ -180,77 +208,60 @@ namespace NYql {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings,
TString& sourceType, size_t, TExprContext& ctx) override {
- const TDqSource source(&node);
- if (const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>()) {
- const auto settings = maybeSettings.Cast();
- const auto& clusterName = source.DataSource().Cast<TGenDataSource>().Cluster().StringValue();
- const auto& tableName = settings.Table().StringValue();
- const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName];
- const auto& endpoint = clusterConfig.endpoint();
-
- Generic::TSource source;
-
- YQL_CLOG(INFO, ProviderGeneric)
- << "Filling source settings"
- << ": cluster: " << clusterName
- << ", table: " << tableName
- << ", endpoint: " << endpoint.ShortDebugString();
-
- const auto& columns = settings.Columns();
-
- auto [tableMeta, issues] = State_->GetTable({clusterName, tableName});
- if (issues) {
- throw yexception() << "Get table metadata: " << issues.ToOneLineString();
- }
+ const TDqSource dqSource(&node);
+ const auto maybeSettings = dqSource.Settings().Maybe<TGenSourceSettings>();
- // prepare select
- auto select = source.mutable_select();
- select->mutable_from()->set_table(TString(tableName));
- *select->mutable_data_source_instance() = tableMeta->DataSourceInstance;
-
- auto items = select->mutable_what()->mutable_items();
- for (size_t i = 0; i < columns.Size(); i++) {
- // assign column name
- auto column = items->Add()->mutable_column();
- auto columnName = columns.Item(i).StringValue();
- column->mutable_name()->assign(columnName);
-
- // assign column type
- auto type = NConnector::GetColumnTypeByName(tableMeta->Schema, columnName);
- *column->mutable_type() = type;
- }
+ if (!maybeSettings) {
+ return;
+ }
- if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) {
- TStringBuilder err;
- if (!SerializeFilterPredicate(ctx, predicate, select->mutable_where()->mutable_filter_typed(), err)) {
- throw yexception() << "Failed to serialize filter predicate for source: " << err;
- }
- }
+ const auto settings = maybeSettings.Cast();
+ const auto& clusterName = dqSource.DataSource().Cast<TGenDataSource>().Cluster().StringValue();
+ const auto& tableName = settings.Table().StringValue();
+ const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName];
+ const auto& endpoint = clusterConfig.endpoint();
- // TODO: remove this block as soon as the first part YQ-4730 is deployed
- //
- // Iceberg/Managed YDB (including YDB underlying Logging) supports access via IAM token.
- // If exist, copy service account creds to obtain tokens during request execution phase.
- // If exists, copy previously created token.
- if (IsIn({NYql::EGenericDataSourceKind::YDB, NYql::EGenericDataSourceKind::LOGGING, NYql::EGenericDataSourceKind::ICEBERG}, clusterConfig.kind())) {
- source.SetServiceAccountId(clusterConfig.GetServiceAccountId());
- source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature());
- source.SetToken(State_->Types->Credentials->FindCredentialContent(
- "default_" + clusterConfig.name(),
- "default_generic",
- clusterConfig.GetToken()));
- }
+ Generic::TSource source;
- // We set token name to the protobuf message that will be received
- // by the read actor during the execution phase.
- // It will use token name to extract credentials from the secureParams.
- const TString tokenName(settings.Token().Maybe<TCoSecureParam>().Name().Cast());
- source.SetTokenName(tokenName);
+ YQL_CLOG(INFO, ProviderGeneric)
+ << "Filling source settings"
+ << ": cluster: " << clusterName
+ << ", table: " << tableName
+ << ", endpoint: " << endpoint.ShortDebugString();
- // preserve source description for read actor
- protoSettings.PackFrom(source);
- sourceType = GetSourceType(select->data_source_instance());
+ auto [tableMeta, issues] = State_->GetTable({clusterName, tableName});
+
+ if (issues) {
+ throw yexception() << "Get table metadata: " << issues.ToOneLineString();
+ }
+
+ // prepare select
+ auto select = source.mutable_select();
+ FillSelect(*select, dqSource, ctx);
+
+ // TODO: remove this block as soon as the first part YQ-4730 is deployed
+ //
+ // Iceberg/Managed YDB (including YDB underlying Logging) supports access via IAM token.
+ // If exists, copy service account creds to obtain tokens during request execution phase.
+ // If exists, copy previously created token.
+ if (IsIn({NYql::EGenericDataSourceKind::YDB, NYql::EGenericDataSourceKind::LOGGING, NYql::EGenericDataSourceKind::ICEBERG}, clusterConfig.kind())) {
+ source.SetServiceAccountId(clusterConfig.GetServiceAccountId());
+ source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature());
+ source.SetToken(State_->Types->Credentials->FindCredentialContent(
+ "default_" + clusterConfig.name(),
+ "default_generic",
+ clusterConfig.GetToken()));
}
+
+ // We set token name to the protobuf message that will be received
+ // by the read actor during the execution phase.
+ // It will use token name to extract credentials from the secureParams.
+ const TString tokenName(settings.Token().Maybe<TCoSecureParam>().Name().Cast());
+ source.SetTokenName(tokenName);
+
+ // preserve source description for read actor
+ protoSettings.PackFrom(source);
+ sourceType = GetSourceType(select->data_source_instance());
}
bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override {
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp
new file mode 100644
index 00000000000..c496bcb62d0
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.cpp
@@ -0,0 +1,342 @@
+#include "yql_generic_list_splits.h"
+
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/providers/common/structured_token/yql_token_builder.h>
+#include <yql/essentials/providers/common/provider/yql_provider.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+#include <yql/essentials/minikql/mkql_type_ops.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/mkql_alloc.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node.h>
+#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
+#include <yql/essentials/core/yql_graph_transformer.h>
+#include <yql/essentials/core/yql_expr_optimize.h>
+#include <yql/essentials/core/type_ann/type_ann_expr.h>
+#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <yql/essentials/ast/yql_type_string.h>
+#include <yql/essentials/ast/yql_expr.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_utils.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h>
+#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
+#include <ydb/core/external_sources/iceberg_fields.h>
+#include <library/cpp/json/json_reader.h>
+
+namespace NYql {
+
+using namespace NNodes;
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+
+///
+/// Optimization process contains several transformers in a pipeline which are called
+/// consequently. If a DoTransform returns repeat, e.g. it changes expression, the process
+/// is starting again from the very begining, but with a new input. That is why transformer
+/// can be called multiple times. The purpose of this transformer is to find TGenSourceSettings
+/// nodes which were created during previous transformations and perform ListSplit request.
+/// ListSplit transformer has to work after where clause has been pushed; otherwise
+/// in a ReadSplit request, which ultimately occurs after pushdown, connector could receive
+/// where clause that is differ from ListSplit's.
+///
+/// The order of transformations calls:
+///
+/// 1. TGenericPhysicalOptProposalTransformer::PushFilterToReadTable pushdowns predicate into
+/// a TGenReadTable node.
+///
+/// 2. TKqpConstantFoldingTransformer folds const expression in a pushdown predicate.
+///
+/// 3. TGenericListSplitTransformer performs a ListSplit request on a TGenSourceSettings node.
+///
+class TGenericListSplitTransformer : public TGraphTransformerBase {
+ struct TListSplitRequestData {
+ TSelectKey Key;
+ NConnector::NApi::TSelect Select;
+ TGenericState::TTableAddress TableAddress;
+ };
+
+ struct TListResponse {
+ using TPtr = std::shared_ptr<TListResponse>;
+
+ std::vector<NConnector::NApi::TSplit> Splits;
+ TIssues Issues;
+ };
+
+ using TListResponseMap =
+ std::unordered_map<TSelectKey, TListResponse::TPtr, TSelectKeyHash>;
+
+public:
+ explicit TGenericListSplitTransformer(TGenericState::TPtr state);
+
+public:
+ TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
+
+ NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode&) final {
+ return AsyncFuture_;
+ }
+
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
+
+ void Rewind() final;
+
+private:
+ TIssues ListSplitsFromConnector(const TListSplitRequestData& data, std::vector<NThreading::TFuture<void>>& handles);
+
+private:
+ const TGenericState::TPtr State_;
+ TListResponseMap ListResponses_;
+ NThreading::TFuture<void> AsyncFuture_;
+};
+
+TGenericListSplitTransformer::TGenericListSplitTransformer(TGenericState::TPtr state)
+ : State_(std::move(state))
+{ }
+
+IGraphTransformer::TStatus TGenericListSplitTransformer::DoTransform(TExprNode::TPtr input,
+ TExprNode::TPtr& output,
+ TExprContext& ctx) {
+ output = input;
+
+ const auto& readsSettings = FindNodes(input, [&](const TExprNode::TPtr& node) {
+ if (const auto maybeSettings = TMaybeNode<TGenSourceSettings>(node)) {
+ return true;
+ }
+
+ return false;
+ });
+
+ if (readsSettings.empty()) {
+ return TStatus::Ok;
+ }
+
+ std::unordered_map<TSelectKey, TListSplitRequestData, TSelectKeyHash> pendingRequests;
+
+ // Iterate over all settings in the input expression, create ListSplit request if needed
+ for (const auto& r : readsSettings) {
+ const TGenSourceSettings settings(r);
+
+ const auto& tableName = settings.Table().StringValue();
+ const auto& clusterName = settings.Cluster().StringValue();
+
+ auto tableAddress = TGenericState::TTableAddress(clusterName, tableName);
+ auto table = State_->GetTable(tableAddress);
+
+ if (!table.first) {
+ ctx.AddError(TIssue(table.second.ToString()));
+ return TStatus::Error;
+ }
+
+ // Grab select from a read table query
+ NConnector::NApi::TSelect select;
+ FillSelectFromGenSourceSettings(select, settings, ctx, table.first);
+
+ auto selectKey = tableAddress.MakeKeyFor(select);
+
+ // The one sql query could contain multiple selects, e.g.
+ // join, subquery etc. If splits has been already acquired for a
+ // select, skip it
+ if (table.first->HasSplitsForSelect(selectKey)) {
+ continue;
+ }
+
+ auto v = TListSplitRequestData{selectKey, std::move(select), tableAddress};
+ pendingRequests.emplace(selectKey, v);
+ }
+
+ if (pendingRequests.empty()) {
+ return TStatus::Ok;
+ }
+
+ std::vector<NThreading::TFuture<void>> handles;
+ handles.reserve(pendingRequests.size());
+ ListResponses_.reserve(pendingRequests.size());
+
+ for (auto& k : pendingRequests) {
+ auto tIssues = ListSplitsFromConnector(k.second, handles);
+ if (!tIssues.Empty()) {
+ ctx.AddError(TIssue(tIssues.ToString()));
+ return TStatus::Error;
+ }
+ }
+
+ if (handles.empty()) {
+ return TStatus::Ok;
+ }
+
+ AsyncFuture_ = NThreading::WaitExceptionOrAll(handles);
+ return TStatus::Async;
+}
+
+TIssues TGenericListSplitTransformer::ListSplitsFromConnector(const TListSplitRequestData& data,
+ std::vector<NThreading::TFuture<void>>& handles) {
+ auto table = State_->GetTable(data.TableAddress);
+
+ if (!table.first) {
+ return table.second;
+ }
+
+ // Preserve data source instance for the further usage
+ auto emplaceIt = ListResponses_.emplace(data.Key, std::make_shared<TListResponse>());
+ auto desc = emplaceIt.first->second;
+
+ // Call ListSplits
+ NConnector::NApi::TListSplitsRequest request;
+ *request.mutable_selects()->Add() = data.Select;
+
+ auto promise = NThreading::NewPromise();
+ handles.emplace_back(promise.GetFuture());
+
+ Y_ENSURE(State_->GenericClient);
+
+ State_->GenericClient->ListSplits(request).Subscribe([desc, promise, data]
+ (const NConnector::TListSplitsStreamIteratorAsyncResult f3) mutable {
+ NConnector::TListSplitsStreamIteratorAsyncResult f4(f3);
+ auto streamIterResult = f4.ExtractValueSync();
+
+ // Check transport error
+ if (!streamIterResult.Status.Ok()) {
+ desc->Issues.AddIssue(TStringBuilder()
+ << "Call ListSplits for table: " << data.TableAddress.ToString()
+ << " with select: " << streamIterResult.Status.ToDebugString());
+ promise.SetValue();
+ return;
+ }
+
+ Y_ENSURE(streamIterResult.Iterator);
+
+ auto drainer =
+ NConnector::MakeListSplitsStreamIteratorDrainer(std::move(streamIterResult.Iterator));
+
+ // Pass drainer to the callback because we want him to stay alive until the callback is called
+ drainer->Run().Subscribe([desc, promise, data, drainer]
+ (const NThreading::TFuture<NConnector::TListSplitsStreamIteratorDrainer::TBuffer>& f5) mutable {
+ NThreading::TFuture<NConnector::TListSplitsStreamIteratorDrainer::TBuffer> f6(f5);
+ auto drainerResult = f6.ExtractValueSync();
+
+ // check transport and logical errors
+ if (drainerResult.Issues) {
+ auto msg = TStringBuilder()
+ << "Call ListSplits for table: " << data.TableAddress.ToString() << " with select: "
+ << data.Select.what().DebugString()
+ << data.Select.from().DebugString()
+ << data.Select.where().DebugString();
+
+ TIssue dstIssue(msg);
+
+ for (const auto& srcIssue : drainerResult.Issues) {
+ dstIssue.AddSubIssue(MakeIntrusive<TIssue>(srcIssue));
+ }
+
+ desc->Issues.AddIssue(std::move(dstIssue));
+ promise.SetValue();
+ return;
+ }
+
+ // Collect all the splits from every response into a single vector
+ for (auto&& response : drainerResult.Responses) {
+ std::transform(std::make_move_iterator(response.mutable_splits()->begin()),
+ std::make_move_iterator(response.mutable_splits()->end()),
+ std::back_inserter(desc->Splits),
+ [](auto&& split) { return std::move(split); });
+ }
+
+ promise.SetValue();
+ });
+ });
+
+ return TIssues();
+}
+
+IGraphTransformer::TStatus TGenericListSplitTransformer::DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
+ AsyncFuture_.GetValue();
+ output = input;
+
+ /*
+ * Search for a TGenSourceSettings. TGenSourceSettings contains the same data as during
+ * DoTransform method call
+ */
+ const auto& readsSettings = FindNodes(input, [&](const TExprNode::TPtr& node) {
+ if (const auto maybeSettings = TMaybeNode<TGenSourceSettings>(node)) {
+ return true;
+ }
+
+ return false;
+ });
+
+ if (readsSettings.empty()) {
+ return TStatus::Ok;
+ }
+
+ // Iterate over all settings in the input expression, check Connector responses
+ for (const auto& r : readsSettings) {
+ const TGenSourceSettings settings(r);
+
+ const auto& tableName = settings.Table().StringValue();
+ const auto& clusterName = settings.Cluster().StringValue();
+
+ auto tableAddress = TGenericState::TTableAddress(clusterName, tableName);
+ auto table = State_->GetTable(tableAddress);
+
+ if (!table.first) {
+ ctx.AddError(TIssue(table.second.ToString()));
+ return TStatus::Error;
+ }
+
+ // Grab select from a read table query
+ NConnector::NApi::TSelect select;
+ FillSelectFromGenSourceSettings(select, settings, ctx, table.first);
+ auto selectKey = tableAddress.MakeKeyFor(select);
+
+ // If splits for a similar select for this table was created skip it
+ if (table.first->HasSplitsForSelect(selectKey)) {
+ continue;
+ }
+
+ // Find appropriate response
+ auto iter = ListResponses_.find(selectKey);
+
+ if (iter == ListResponses_.end()) {
+ auto msg = TStringBuilder()
+ << "Connector response not found for table: " << tableAddress.ToString() << " and select: "
+ << select.what().DebugString()
+ << select.from().DebugString()
+ << select.where().DebugString();
+
+ ctx.AddError(TIssue(ctx.GetPosition(settings.Pos()), msg));
+ return TStatus::Error;
+ }
+
+ auto& result = iter->second;
+
+ // If errors occurred during network interaction with Connector, return them
+ if (result->Issues) {
+ for (const auto& issue : result->Issues) {
+ ctx.AddError(issue);
+ }
+
+ return TStatus::Error;
+ }
+
+ Y_ENSURE(!result->Splits.empty());
+
+ if (auto issue = State_->AttachSplitsToTable(tableAddress, selectKey, result->Splits); issue) {
+ ctx.AddError(*issue);
+ return TStatus::Error;
+ }
+ }
+
+ return TStatus::Ok;
+}
+
+void TGenericListSplitTransformer::Rewind() {
+ ListResponses_.clear();
+ AsyncFuture_ = {};
+}
+
+THolder<TGraphTransformerBase> CreateGenericListSplitTransformer(TGenericState::TPtr state) {
+ return MakeHolder<TGenericListSplitTransformer>(std::move(state));
+}
+
+} // NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h
new file mode 100644
index 00000000000..0dcb72984a4
--- /dev/null
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_list_splits.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <yql/essentials/core/yql_graph_transformer.h>
+#include "yql_generic_state.h"
+
+namespace NYql {
+
+THolder<TGraphTransformerBase> CreateGenericListSplitTransformer(TGenericState::TPtr state);
+
+} // NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index e8edf828e2a..6b6d6829961 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -1,5 +1,6 @@
// clang-format off
#include "yql_generic_provider_impl.h"
+#include "yql_generic_describe_table.h"
#include <library/cpp/json/json_reader.h>
#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
@@ -27,7 +28,7 @@ namespace NYql {
using namespace NKikimr;
using namespace NKikimr::NMiniKQL;
- class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase {
+ class [[deprecated("Now this transformer is represented by two transformers, it will be removed in the next version")]] TGenericLoadTableMetadataTransformer: public TGraphTransformerBase {
struct TTableDescription {
using TPtr = std::shared_ptr<TTableDescription>;
@@ -635,7 +636,7 @@ namespace NYql {
};
THolder<IGraphTransformer> CreateGenericLoadTableMetadataTransformer(TGenericState::TPtr state) {
- return MakeHolder<TGenericLoadTableMetadataTransformer>(std::move(state));
+ return CreateGenericDescribeTableTransformer(state);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
index e98d002f24f..fdb29c0fbec 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
@@ -1,20 +1,23 @@
#include "yql_generic_provider_impl.h"
#include "yql_generic_predicate_pushdown.h"
+#include "yql_generic_list_splits.h"
-#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
-#include <yql/essentials/core/yql_opt_utils.h>
-#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
-#include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/providers/common/transform/yql_optimize.h>
+#include <yql/essentials/providers/common/provider/yql_provider.h>
#include <yql/essentials/providers/common/provider/yql_provider.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
-#include <ydb/library/yql/providers/common/pushdown/collection.h>
-#include <ydb/library/yql/providers/common/pushdown/physical_opt.h>
-#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
-#include <yql/essentials/providers/common/transform/yql_optimize.h>
+#include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>
+#include <yql/essentials/core/yql_opt_utils.h>
+#include <yql/essentials/core/services/yql_transform_pipeline.h>
+#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
-#include <yql/essentials/utils/log/log.h>
-#include <yql/essentials/providers/common/provider/yql_provider.h>
+#include <ydb/library/yql/providers/common/pushdown/predicate_node.h>
+#include <ydb/library/yql/providers/common/pushdown/settings.h>
+#include <ydb/library/yql/providers/common/pushdown/physical_opt.h>
+#include <ydb/library/yql/providers/common/pushdown/collection.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
namespace NYql {
@@ -38,7 +41,9 @@ namespace NYql {
EFlag::JustPassthroughOperators | // To pushdown REGEXP over String column
EFlag::FlatMapOverOptionals | // To pushdown REGEXP over Utf8 column
EFlag::ToStringFromStringExpressions | // To pushdown REGEXP over Utf8 column
- EFlag::DecimalType | EFlag::DecimalCtor
+ EFlag::DecimalType | EFlag::DecimalCtor |
+ EFlag::IntervalCtor |
+ EFlag::DateCtor
);
EnableFunction("Re2.Grep"); // For REGEXP pushdown
}
@@ -207,10 +212,60 @@ namespace NYql {
private:
const TGenericState::TPtr State_;
};
+
+ class TGenericPhysicalOptProposalWithListTransformer : public TGraphTransformerBase {
+ public:
+ explicit TGenericPhysicalOptProposalWithListTransformer(TGenericState::TPtr state)
+ : PhysicalOptTransformer_(std::make_unique<TGenericPhysicalOptProposalTransformer>(state))
+ , ListTransformer_(CreateGenericListSplitTransformer(state))
+ , AllowAsync_(false)
+ { }
+
+ public:
+ TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
+ auto resultStatus = PhysicalOptTransformer_->DoTransform(input, output, ctx);
+
+ Y_ENSURE(resultStatus != TStatus::Async);
+
+ if (resultStatus != TStatus::Ok) {
+ return resultStatus;
+ }
+
+ input = output;
+ resultStatus = ListTransformer_->DoTransform(input, output, ctx);
+
+ if (resultStatus == TStatus::Async) {
+ AllowAsync_ = true;
+ }
+
+ return resultStatus;
+ }
+
+ NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& node) final {
+ Y_ENSURE(AllowAsync_);
+ return ListTransformer_->DoGetAsyncFuture(node);
+ }
+
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
+ Y_ENSURE(AllowAsync_);
+ return ListTransformer_->DoApplyAsyncChanges(input, output, ctx);
+ }
+
+ void Rewind() final {
+ AllowAsync_ = false;
+ PhysicalOptTransformer_->Rewind();
+ ListTransformer_->Rewind();
+ }
+
+ private:
+ const std::unique_ptr<TGenericPhysicalOptProposalTransformer> PhysicalOptTransformer_;
+ const THolder<TGraphTransformerBase> ListTransformer_;
+ bool AllowAsync_;
+ };
} // namespace
THolder<IGraphTransformer> CreateGenericPhysicalOptProposalTransformer(TGenericState::TPtr state) {
- return MakeHolder<TGenericPhysicalOptProposalTransformer>(state);
+ return MakeHolder<TGenericPhysicalOptProposalWithListTransformer>(state);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
index 0b987d422b2..ea7ee42990d 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp
@@ -341,6 +341,10 @@ namespace NYql {
MATCH_ATOM(Utf8, UTF8, text, TString);
MATCH_ATOM(Timestamp, TIMESTAMP, int64, i64);
MATCH_ATOM(Interval, INTERVAL, int64, i64);
+ // Arrow vector holds value with ui16 type
+ // Proto value does not have ability to store
+ // ui16 type that's why uint32 is used
+ MATCH_ATOM(Date, DATE, uint32, ui16);
MATCH_ARITHMETICAL(Sub, SUB);
MATCH_ARITHMETICAL(Add, ADD);
MATCH_ARITHMETICAL(Mul, MUL);
@@ -683,6 +687,8 @@ namespace NYql {
return "Utf8";
case Ydb::Type::JSON:
return "Json";
+ case Ydb::Type::DATE:
+ return "Date";
default:
throw yexception() << "Failed to format primitive type, type case " << static_cast<ui64>(typeId) << " is not supported";
}
@@ -716,6 +722,16 @@ namespace NYql {
[[fallthrough]];
}
}
+ case Ydb::Type::DATE: {
+ const auto& value = typedValue.value();
+ switch (value.value_case()) {
+ case Ydb::Value::kUint32Value:
+ return TStringBuilder() << FormatType(typedValue.type()) << "(\""
+ << TInstant::Days(value.uint32_value()).FormatLocalTime("%Y-%m-%d") << "\")";
+ default:
+ [[fallthrough]];
+ }
+ }
default:
[[fallthrough]];
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
index 61e86bf982c..7c9564294da 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp
@@ -1,12 +1,87 @@
#include "yql_generic_state.h"
+#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_utils.h>
namespace NYql {
+ std::vector<TString> GetColumns(const NConnector::NApi::TSelect& select) {
+ std::vector<TString> columns;
+
+ if (select.has_what() && !select.what().items().empty()) {
+ columns.reserve(select.what().items().size());
+
+ for (const auto& item : select.what().items()) {
+ if (!item.column().name().empty()) {
+ columns.push_back(item.column().name());
+ }
+ }
+
+ std::sort(columns.begin(), columns.end());
+ }
+
+ return columns;
+ }
+
+ TSelectKey::TSelectKey(const TString& cluster, const NConnector::NApi::TSelect& select)
+ : Cluster(cluster)
+ , Table(select.has_from() ? select.from().table() : "")
+ , Columns(GetColumns(select))
+ , Where(select.has_where() && select.where().has_filter_typed() ?
+ select.where().filter_typed().SerializeAsString() : "")
+ , Hash(CalculateHash())
+ { }
+
+ size_t TSelectKey::CalculateHash() const {
+ auto hash = CombineHashes(std::hash<TString>()(Cluster), std::hash<TString>()(Table));
+ hash = CombineHashes(hash, std::hash<TString>()(Where));
+
+ for (const auto& col : Columns) {
+ hash = CombineHashes(hash, std::hash<TString>()(col));
+ }
+
+ return hash;
+ }
+
+ TGenericState::TTableAddress::operator size_t() const {
+ return CombineHashes(std::hash<TString>()(ClusterName), std::hash<TString>()(TableName));
+ }
+
+ TSelectKey TGenericState::TTableAddress::MakeKeyFor(const NConnector::NApi::TSelect& select) const {
+ return TSelectKey(ClusterName, select);
+ }
+
+ bool TGenericState::TTableMeta::HasSplitsForSelect(const TSelectKey& key) const {
+ return SelectSplits.contains(key);
+ }
+
+ void TGenericState::TTableMeta::AttachSplitsForSelect(const TSelectKey& key, std::vector<NYql::NConnector::NApi::TSplit>& splits) {
+ Y_ENSURE(splits.size());
+ Y_ENSURE(SelectSplits.emplace(key, std::move(splits)).second);
+ }
+
+ const std::vector<NYql::NConnector::NApi::TSplit>& TGenericState::TTableMeta::GetSplitsForSelect(
+ const TSelectKey& key) const {
+ const auto it = SelectSplits.find(key);
+
+ if (it == SelectSplits.end()) {
+ throw yexception()
+ << "Table metadata does not contain splits for a select from the table: " << key.Table
+ << " with where:" << key.Where;
+ }
+
+ return it->second;
+ }
+
+ bool TGenericState::HasTable(const TTableAddress& tableAddress) {
+ return Tables_.contains(tableAddress);
+ }
+
void TGenericState::AddTable(const TTableAddress& tableAddress, TTableMeta&& tableMeta) {
Tables_.emplace(tableAddress, std::move(tableMeta));
}
TGenericState::TGetTableResult TGenericState::GetTable(const TTableAddress& tableAddress) const {
auto result = Tables_.FindPtr(tableAddress);
+
if (result) {
return std::make_pair(result, TIssues{});
}
@@ -17,4 +92,17 @@ namespace NYql {
return std::make_pair<TTableMeta*, TIssues>(nullptr, std::move(issues));
}
+ std::optional<TIssue> TGenericState::AttachSplitsToTable(const TTableAddress& tableAddress,
+ const TSelectKey& key,
+ std::vector<NYql::NConnector::NApi::TSplit>& splits) {
+ auto result = Tables_.FindPtr(tableAddress);
+
+ if (!result) {
+ return {TIssue(TStringBuilder() << "no metadata for table " << tableAddress.ToString())};
+ }
+
+ result->AttachSplitsForSelect(key, splits);
+ return std::nullopt;
+ }
+
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h
index 4abefb0e89d..bce6170de19 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h
@@ -13,6 +13,37 @@ namespace NKikimr::NMiniKQL {
} // namespace NKikimr::NMiniKQL
namespace NYql {
+ ///
+ /// A key for a select query on a cluster table. Hash value is
+ /// calculated in a constructor and stored in a Hash field.
+ ///
+ struct TSelectKey {
+ const TString Cluster;
+ const TString Table;
+ const std::vector<TString> Columns;
+ const TString Where;
+ const size_t Hash;
+
+ TSelectKey(const TSelectKey& select) = default;
+
+ TSelectKey(const TString& Cluster, const NConnector::NApi::TSelect& select);
+
+ bool operator==(const TSelectKey& other) const = default;
+
+ TSelectKey& operator=(const TSelectKey& other) = default;
+
+ size_t CalculateHash() const;
+ };
+
+ ///
+ /// Hasher for TSelectKey
+ ///
+ struct TSelectKeyHash {
+ size_t operator()(const TSelectKey& key) const noexcept {
+ return key.Hash;
+ }
+ };
+
struct TGenericState: public TThrRefBase {
using TPtr = TIntrusivePtr<TGenericState>;
@@ -28,9 +59,12 @@ namespace NYql {
return ClusterName == other.ClusterName && TableName == other.TableName;
}
- explicit operator size_t() const {
- return CombineHashes(std::hash<TString>()(ClusterName), std::hash<TString>()(TableName));
- }
+ explicit operator size_t() const;
+
+ ///
+ /// Make a key for a select request on a cluster table
+ ///
+ TSelectKey MakeKeyFor(const NConnector::NApi::TSelect& select) const;
};
struct TTableMeta {
@@ -41,8 +75,18 @@ namespace NYql {
NYql::TGenericDataSourceInstance DataSourceInstance;
// External table schema
NYql::NConnector::NApi::TSchema Schema;
+ // Deprecated
// Contains some binary description of table splits (partitions) produced by Connector
std::vector<NYql::NConnector::NApi::TSplit> Splits;
+ // Contains splits for a particular select
+ std::unordered_map<TSelectKey, std::vector<NYql::NConnector::NApi::TSplit>, TSelectKeyHash> SelectSplits;
+
+ bool HasSplitsForSelect(const TSelectKey& key) const;
+
+ void AttachSplitsForSelect(const TSelectKey& key,
+ std::vector<NYql::NConnector::NApi::TSplit>& splits);
+
+ const std::vector<NYql::NConnector::NApi::TSplit>& GetSplitsForSelect(const TSelectKey& key) const;
};
using TGetTableResult = std::pair<const TTableMeta*, TIssues>;
@@ -66,7 +110,11 @@ namespace NYql {
Configuration->Init(gatewayConfig, databaseResolver, DatabaseAuth, types->Credentials);
}
+ bool HasTable(const TTableAddress& tableAddress);
void AddTable(const TTableAddress& tableAddress, TTableMeta&& tableMeta);
+ std::optional<TIssue> AttachSplitsToTable(const TTableAddress& tableAddress,
+ const TSelectKey& key,
+ std::vector<NYql::NConnector::NApi::TSplit>& splits);
TGetTableResult GetTable(const TTableAddress& tableAddress) const;
TTypeAnnotationContext* Types;
@@ -88,4 +136,5 @@ namespace NYql {
private:
THashMap<TTableAddress, TTableMeta> Tables_;
};
+
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp
index 9d88896b889..b14af1091c6 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp
@@ -1,6 +1,8 @@
#include "yql_generic_utils.h"
#include <util/string/builder.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h>
namespace NYql {
TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) {
@@ -19,4 +21,40 @@ namespace NYql {
return sb;
}
+
+ ///
+ /// Fill a select from a TGenSourceSettings
+ ///
+ void FillSelectFromGenSourceSettings(NConnector::NApi::TSelect& select,
+ const NNodes::TGenSourceSettings& settings,
+ TExprContext& ctx,
+ const TGenericState::TTableMeta* tableMeta) {
+ const auto& tableName = settings.Table().StringValue();
+
+ select.mutable_from()->set_table(TString(tableName));
+ *select.mutable_data_source_instance() = tableMeta->DataSourceInstance;
+
+ const auto& columns = settings.Columns();
+ auto items = select.mutable_what()->mutable_items();
+
+ for (size_t i = 0; i < columns.Size(); i++) {
+ // assign column name
+ auto column = items->Add()->mutable_column();
+ auto columnName = columns.Item(i).StringValue();
+ column->mutable_name()->assign(columnName);
+
+ // assign column type
+ auto type = NConnector::GetColumnTypeByName(tableMeta->Schema, columnName);
+ *column->mutable_type() = type;
+ }
+
+ if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) {
+ TStringBuilder err;
+
+ if (!SerializeFilterPredicate(ctx, predicate, select.mutable_where()->mutable_filter_typed(), err)) {
+ throw yexception() << "Failed to serialize filter predicate for source: " << err;
+ }
+ }
+ }
+
} // namespace NYql
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.h b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h
index 4370e4f5ece..dd271a245b0 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_utils.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h
@@ -1,8 +1,16 @@
#pragma once
-#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
-#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
namespace NYql {
TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig);
+
+ ///
+ /// Fill a select from a TGenSourceSettings
+ ///
+ void FillSelectFromGenSourceSettings(NConnector::NApi::TSelect& select,
+ const NNodes::TGenSourceSettings& settings,
+ TExprContext& ctx,
+ const TGenericState::TTableMeta* tableMeta);
} // namespace NYql