diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-10-10 11:47:29 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-10-10 12:10:18 +0300 |
commit | 79d1b401b8e5ad6d481f5e90da4d63a81abaaa45 (patch) | |
tree | 4abfe7b9180cdeb2f4b12502eda66f468408bf91 | |
parent | 656bb1fb2814e586db5de6166ebbb17363d5325b (diff) | |
download | ydb-79d1b401b8e5ad6d481f5e90da4d63a81abaaa45.tar.gz |
YQ Connector:Remove requirement for not empty rows set in connector
34 files changed, 1248 insertions, 305 deletions
diff --git a/.mapping.json b/.mapping.json index e6f2333474..5075538ec7 100644 --- a/.mapping.json +++ b/.mapping.json @@ -7632,6 +7632,11 @@ "ydb/library/yql/providers/generic/connector/libcpp/cli/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/providers/generic/connector/libcpp/cli/CMakeLists.txt":"", "ydb/library/yql/providers/generic/connector/libcpp/cli/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt":"", + "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/providers/generic/expr_nodes/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/providers/generic/expr_nodes/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/providers/generic/expr_nodes/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h index 5a5c75c46d..9c9ab9b85a 100644 --- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h +++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h @@ -171,7 +171,7 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets } public: - TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime) + TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime) : SecretIds(CreateSecretIds(ownerUserId, secretIds)) , Promise(promise) , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") }) @@ -184,7 +184,7 @@ public: PassAway(); return; } - + this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup()); Become(&TDescribeSecretsActor::StateFunc); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index b878612f62..64e9f21a58 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -415,7 +415,7 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour SetError(externalDataSourceMetadata, "identity case is not specified in case of update external data source secrets"); return; } - } + } } } @@ -452,7 +452,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName(); auto promise = NewPromise<TDescribeSecretsResponse>(); actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime)); - return promise.GetFuture(); + return promise.GetFuture(); } case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: @@ -791,7 +791,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta TActorSystem* actorSystem = ActorSystem; - return future.Apply([actorSystem,table](const TFuture<TTableMetadataResult>& f) { + return future.Apply([actorSystem,table](const TFuture<TTableMetadataResult>& f) { auto result = f.GetValue(); if (!result.Success()) { return MakeFuture(result); @@ -815,7 +815,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta auto statServiceId = NStat::MakeStatServiceID(actorSystem->NodeId); - + return SendActorRequest<NStat::TEvStatistics::TEvGetStatistics, NStat::TEvStatistics::TEvGetStatisticsResult, TResult>( actorSystem, statServiceId, diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 6e4ef4f038..9f13a00abe 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1538,11 +1538,6 @@ private: } void Init(EKikimrQueryType queryType) { - if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) { - InitS3Provider(); - InitGenericProvider(); - } - KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry, TAppData::TimeProvider, TAppData::RandomProvider); ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef(); @@ -1568,6 +1563,12 @@ private: TypesCtx->AddDataSource(providerNames, kikimrDataSource); TypesCtx->AddDataSink(providerNames, kikimrDataSink); + + if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) { + InitS3Provider(); + InitGenericProvider(); + } + TypesCtx->UdfResolver = CreateSimpleUdfResolver(FuncRegistry); TypesCtx->TimeProvider = TAppData::TimeProvider; TypesCtx->RandomProvider = TAppData::RandomProvider; diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt index 05878a9cdd..36b6c93dc7 100644 --- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt @@ -24,6 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC contrib-libs-fmt kqp-ut-common ut-federated_query-common + connector-libcpp-ut_helpers yql-sql-pg_dummy ) target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE @@ -41,7 +42,7 @@ set_property( ydb-core-kqp-ut-federated_query-generic PROPERTY SPLIT_FACTOR - 1 + 10 ) add_yunittest( NAME diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt index eebaeaaa73..2d291e2452 100644 --- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC contrib-libs-fmt kqp-ut-common ut-federated_query-common + connector-libcpp-ut_helpers yql-sql-pg_dummy ) target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE @@ -44,7 +45,7 @@ set_property( ydb-core-kqp-ut-federated_query-generic PROPERTY SPLIT_FACTOR - 1 + 10 ) add_yunittest( NAME diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt index 08954421d3..4c269c3e99 100644 --- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC contrib-libs-fmt kqp-ut-common ut-federated_query-common + connector-libcpp-ut_helpers yql-sql-pg_dummy ) target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE @@ -45,7 +46,7 @@ set_property( ydb-core-kqp-ut-federated_query-generic PROPERTY SPLIT_FACTOR - 1 + 10 ) add_yunittest( NAME diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt index 2e8597955e..4bf5801d31 100644 --- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt @@ -24,6 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC contrib-libs-fmt kqp-ut-common ut-federated_query-common + connector-libcpp-ut_helpers yql-sql-pg_dummy ) target_sources(ydb-core-kqp-ut-federated_query-generic PRIVATE @@ -34,7 +35,7 @@ set_property( ydb-core-kqp-ut-federated_query-generic PROPERTY SPLIT_FACTOR - 1 + 10 ) add_yunittest( NAME diff --git a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp index 87a8f6ca4c..68959b4bfa 100644 --- a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp @@ -1,178 +1,124 @@ -#include <arrow/api.h> -#include <google/protobuf/util/message_differencer.h> - -#include <fmt/format.h> -#include <library/cpp/testing/gmock_in_unittest/gmock.h> -#include <library/cpp/testing/unittest/registar.h> -#include <util/system/env.h> - #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/kqp/ut/federated_query/common/common.h> #include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> #include <ydb/library/yql/providers/generic/connector/libcpp/client.h> #include <ydb/library/yql/providers/generic/connector/libcpp/external_data_source.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_query/query.h> #include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/defaults.h> +#include <util/system/env.h> + +#include <arrow/api.h> + +#include <google/protobuf/util/message_differencer.h> + +#include <fmt/format.h> + namespace NKikimr::NKqp { using namespace NYdb; using namespace NYdb::NQuery; using namespace NYql::NConnector; + using namespace NYql::NConnector::NTest; using namespace NKikimr::NKqp::NFederatedQueryTest; using namespace testing; using namespace fmt::literals; - class TConnectorClientMock: public IClient { - public: - MOCK_METHOD(TDescribeTableResult::TPtr, DescribeTable, (const NApi::TDescribeTableRequest& request), (override)); - MOCK_METHOD(TListSplitsResult::TPtr, ListSplits, (const NApi::TListSplitsRequest& request), (override)); - MOCK_METHOD(TReadSplitsResult::TPtr, ReadSplits, (const NApi::TReadSplitsRequest& request), (override)); + enum class EProviderType { + PostgreSQL, + ClickHouse, }; - MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") { - return google::protobuf::util::MessageDifferencer::Equals(arg, expected); - } - - class TDatabaseAsyncResolverMock: public NYql::IDatabaseAsyncResolver { - public: - MOCK_METHOD(NThreading::TFuture<NYql::TDatabaseResolverResponse>, ResolveIds, (const TDatabaseAuthMap& ids), (const override)); - }; - - MATCHER_P(DatabaseAuthMapMatcher, expected, "database auth map matcher") { - return arg == expected; - } - -#define PREPARE_RECORD_BATCH(COLUMN_NAME, INPUT, BUILDER_TYPE, ARROW_TYPE, OUTPUT) \ - { \ - arrow::BUILDER_TYPE builder; \ - UNIT_ASSERT_EQUAL(builder.AppendValues(INPUT), arrow::Status::OK()); \ - std::shared_ptr<arrow::Array> columnData; \ - UNIT_ASSERT_EQUAL(builder.Finish(&columnData), arrow::Status::OK()); \ - auto field = arrow::field(COLUMN_NAME, ARROW_TYPE()); \ - auto schema = arrow::schema({field}); \ - OUTPUT = arrow::RecordBatch::Make(schema, columnData->length(), {columnData}); \ + NApi::TDataSourceInstance MakeDataSourceInstance(EProviderType providerType) { + switch (providerType) { + case EProviderType::PostgreSQL: + return TConnectorClientMock::TPostgreSQLDataSourceInstanceBuilder<>().GetResult(); + case EProviderType::ClickHouse: + return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult(); + } } -#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \ - { \ - for (const auto& val : INPUT) { \ - UNIT_ASSERT(RESULT_SET.TryNextRow()); \ - UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(0).GETTER(), val); \ - } \ + void CreateExternalDataSource(EProviderType providerType, const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr) { + switch (providerType) { + case EProviderType::PostgreSQL: + return CreatePostgreSQLExternalDataSource(kikimr); + case EProviderType::ClickHouse: + return CreateClickHouseExternalDataSource(kikimr); + } } Y_UNIT_TEST_SUITE(GenericFederatedQuery) { - Y_UNIT_TEST(PostgreSQLLocal) { + void TestSelectAllFields(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared<TConnectorClientMock>(); - IClient::TPtr client = clientMock; - - // prepare common fields - const TString host = "localhost"; - const int port = 5432; - const TString databaseName = "dqrun"; - const TString login = "crab"; - const TString password = "qwerty12345"; - const TString tableName = "example_1"; - const bool useTls = true; - const TString protocol = "NATIVE"; - const TString sourceType = "PostgreSQL"; - - NApi::TDataSourceInstance dataSourceInstance; - dataSourceInstance.set_database(databaseName); - dataSourceInstance.mutable_credentials()->mutable_basic()->set_username(login); - dataSourceInstance.mutable_credentials()->mutable_basic()->set_password(password); - dataSourceInstance.mutable_endpoint()->set_host(host); - dataSourceInstance.mutable_endpoint()->set_port(port); - dataSourceInstance.set_use_tls(useTls); - dataSourceInstance.set_kind(NYql::NConnector::NApi::EDataSourceKind::POSTGRESQL); - dataSourceInstance.set_protocol(NYql::NConnector::NApi::EProtocol::NATIVE); - // step 1: DescribeTable - NApi::TDescribeTableRequest describeTableRequest; - describeTableRequest.set_table(tableName); - describeTableRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance); - - TDescribeTableResult::TPtr describeTableResult = std::make_shared<TDescribeTableResult>(); - describeTableResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); - auto col1 = describeTableResult->Schema.add_columns(); - col1->set_name("col1"); - col1->mutable_type()->set_type_id(Ydb::Type::UINT16); + const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); - EXPECT_CALL(*clientMock, DescribeTable(ProtobufRequestMatcher(describeTableRequest))).WillOnce(Return(describeTableResult)); + // step 1: DescribeTable + clientMock->ExpectDescribeTable() + .DataSourceInstance(dataSourceInstance) + .Response() + .Column("col1", Ydb::Type::UINT16); // step 2: ListSplits - NApi::TListSplitsRequest listSplitsRequest; - - auto select = listSplitsRequest.add_selects(); - select->mutable_from()->set_table(tableName); - select->mutable_data_source_instance()->CopyFrom(dataSourceInstance); - auto item = select->mutable_what() -> add_items(); - item->mutable_column()->CopyFrom(*col1); - - TListSplitsResult::TPtr listSplitsResult = std::make_shared<TListSplitsResult>(); - listSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); - NApi::TSplit split; - split.mutable_select()->CopyFrom(*select); - split.set_description("some binary description"); - listSplitsResult->Splits.emplace_back(std::move(split)); - - EXPECT_CALL(*clientMock, ListSplits(ProtobufRequestMatcher(listSplitsRequest))).WillOnce(Return(listSplitsResult)); + clientMock->ExpectListSplits() + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + .Column("col1", Ydb::Type::UINT16) + .Done() + .Done() + .Response() + .Split() + .Description("some binary description") + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + .Column("col1", Ydb::Type::UINT16); // step 3: ReadSplits - NApi::TReadSplitsRequest readSplitsRequest; - readSplitsRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance); - readSplitsRequest.add_splits()->CopyFrom(listSplitsResult->Splits[0]); - readSplitsRequest.set_format(NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING); - - TReadSplitsResult::TPtr readSplitsResult = std::make_shared<TReadSplitsResult>(); - readSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); - readSplitsResult->RecordBatches.push_back({}); std::vector<ui16> colData = {10, 20, 30, 40, 50}; + clientMock->ExpectReadSplits() + .DataSourceInstance(dataSourceInstance) + .Split() + .Description("some binary description") + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + .Column("col1", Ydb::Type::UINT16) + .Done() + .Done() + .Done() + .Response() + .RecordBatch(MakeRecordBatch<arrow::UInt16Builder>("col1", colData, arrow::uint16())); - PREPARE_RECORD_BATCH(col1->name(), colData, UInt16Builder, arrow::uint16, readSplitsResult->RecordBatches[0]); - - EXPECT_CALL(*clientMock, ReadSplits(ProtobufRequestMatcher(readSplitsRequest))).WillOnce(Return(readSplitsResult)); + // prepare database resolver mock + std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock; + if (providerType == EProviderType::ClickHouse) { + databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>(); + databaseAsyncResolverMock->AddClickHouseCluster(); + } // run test - auto kikimr = MakeKikimrRunner(nullptr, client); + auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock); - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query1 = fmt::format( - R"( - CREATE OBJECT pg_local_password (TYPE SECRET) WITH (value={password}); - - CREATE EXTERNAL DATA SOURCE pg_local WITH ( - SOURCE_TYPE="{source_type}", - LOCATION="{host}:{port}", - AUTH_METHOD="BASIC", - LOGIN="{login}", - PASSWORD_SECRET_NAME="pg_local_password", - USE_TLS="{use_tls}", - PROTOCOL="{protocol}" - ); - )", - "host"_a = host, - "port"_a = port, - "password"_a = password, - "login"_a = login, - "use_tls"_a = useTls ? "TRUE" : "FALSE", - "protocol"_a = protocol, - "source_type"_a = sourceType); - auto result = session.ExecuteSchemeQuery(query1).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString query2 = fmt::format( + CreateExternalDataSource(providerType, kikimr); + + const TString query = fmt::format( R"( - SELECT * FROM pg_local.`{database_name}.{table_name}`; + SELECT * FROM {data_source_name}.`{database_name}.{table_name}`; )", - "database_name"_a = databaseName, - "table_name"_a = tableName); + "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, + "database_name"_a = DEFAULT_DATABASE, + "table_name"_a = DEFAULT_TABLE); auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(query2).ExtractValueSync(); + auto scriptExecutionOperation = db.ExecuteScript(query).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); @@ -189,159 +135,182 @@ namespace NKikimr::NKqp { MATCH_RESULT_WITH_INPUT(colData, resultSet, GetUint16); } + Y_UNIT_TEST(PostgreSQLLocal) { + TestSelectAllFields(EProviderType::PostgreSQL); + } + Y_UNIT_TEST(ClickHouseManaged) { - // prepare connector client mock - auto connectorClientMock = std::make_shared<TConnectorClientMock>(); - IClient::TPtr connectorClient = connectorClientMock; - - // prepare common fields - const TString clusterId = "ch-managed"; - const TString hostName = "rc1a-d6dv17lv47v5mcop"; - const TString hostFqdn = hostName + ".db.yandex.net"; - const int port = 8443; - const TString endpoint = TStringBuilder() << hostFqdn << ":" << ToString(port); - const TString databaseName = "dqrun"; - const TString login = "crab"; - const TString password = "qwerty12345"; - const TString tableName = "example_1"; - const bool useTls = true; - const auto protocol = NYql::NConnector::NApi::EProtocol::HTTP; - const auto sourceType = NYql::NConnector::EExternalDataSource::ClickHouse; - const auto dataSourceKind = NYql::NConnector::NApi::EDataSourceKind::CLICKHOUSE; - const TString serviceAccountId = "sa"; - const TString serviceAccountIdSignature = "sa_signature"; - - NApi::TDataSourceInstance dataSourceInstance; - dataSourceInstance.set_database(databaseName); - dataSourceInstance.mutable_credentials()->mutable_basic()->set_username(login); - dataSourceInstance.mutable_credentials()->mutable_basic()->set_password(password); - dataSourceInstance.mutable_endpoint()->set_host(hostFqdn); - dataSourceInstance.mutable_endpoint()->set_port(port); - dataSourceInstance.set_use_tls(useTls); - dataSourceInstance.set_kind(dataSourceKind); - dataSourceInstance.set_protocol(protocol); + TestSelectAllFields(EProviderType::ClickHouse); + } - // step 1: DescribeTable - NApi::TDescribeTableRequest describeTableRequest; - describeTableRequest.set_table(tableName); - describeTableRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance); + void TestSelectConstant(EProviderType providerType) { + // prepare mock + auto clientMock = std::make_shared<TConnectorClientMock>(); + + const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); - TDescribeTableResult::TPtr describeTableResult = std::make_shared<TDescribeTableResult>(); - describeTableResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); - auto col1 = describeTableResult->Schema.add_columns(); - col1->set_name("col1"); - col1->mutable_type()->set_type_id(Ydb::Type::UINT16); + constexpr size_t ROWS_COUNT = 5; - EXPECT_CALL(*connectorClientMock, DescribeTable(ProtobufRequestMatcher(describeTableRequest))).WillOnce(Return(describeTableResult)); + // step 1: DescribeTable + clientMock->ExpectDescribeTable() + .DataSourceInstance(dataSourceInstance) + .Response() + .Column("col1", Ydb::Type::UINT16) + .Column("col2", Ydb::Type::DOUBLE); // step 2: ListSplits - NApi::TListSplitsRequest listSplitsRequest; + clientMock->ExpectListSplits() + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + // Empty + .Done() + .Done() + .Response() + .Split() + .Description("some binary description") + .Select() + .DataSourceInstance(dataSourceInstance) + .What(); - auto select = listSplitsRequest.add_selects(); - select->mutable_from()->set_table(tableName); - select->mutable_data_source_instance()->CopyFrom(dataSourceInstance); - auto item = select->mutable_what() -> add_items(); - item->mutable_column()->CopyFrom(*col1); + // step 3: ReadSplits + clientMock->ExpectReadSplits() + .DataSourceInstance(dataSourceInstance) + .Split() + .Description("some binary description") + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + .Done() + .Done() + .Done() + .Response() + .RecordBatch(MakeEmptyRecordBatch(ROWS_COUNT)); - TListSplitsResult::TPtr listSplitsResult = std::make_shared<TListSplitsResult>(); - listSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); - NApi::TSplit split; - split.mutable_select()->CopyFrom(*select); - split.set_description("some binary description"); - listSplitsResult->Splits.emplace_back(std::move(split)); + // prepare database resolver mock + std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock; + if (providerType == EProviderType::ClickHouse) { + databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>(); + databaseAsyncResolverMock->AddClickHouseCluster(); + } - EXPECT_CALL(*connectorClientMock, ListSplits(ProtobufRequestMatcher(listSplitsRequest))).WillOnce(Return(listSplitsResult)); + // run test + auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock); - // step 3: ReadSplits - NApi::TReadSplitsRequest readSplitsRequest; - readSplitsRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance); - readSplitsRequest.add_splits()->CopyFrom(listSplitsResult->Splits[0]); - readSplitsRequest.set_format(NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING); - - TReadSplitsResult::TPtr readSplitsResult = std::make_shared<TReadSplitsResult>(); - readSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS); - readSplitsResult->RecordBatches.push_back({}); - std::vector<ui16> colData = {10, 20, 30, 40, 50}; + CreateExternalDataSource(providerType, kikimr); - PREPARE_RECORD_BATCH(col1->name(), colData, UInt16Builder, arrow::uint16, readSplitsResult->RecordBatches[0]); + const TString query = fmt::format( + R"( + SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`; + )", + "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, + "database_name"_a = DEFAULT_DATABASE, + "table_name"_a = DEFAULT_TABLE); - EXPECT_CALL(*connectorClientMock, ReadSplits(ProtobufRequestMatcher(readSplitsRequest))).WillOnce(Return(readSplitsResult)); + auto db = kikimr->GetQueryClient(); + auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString()); - // prepare database resolver mock - auto databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>(); - NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = databaseAsyncResolverMock; + TResultSetParser resultSet(queryResult.GetResultSetParser(0)); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_COUNT); - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap dbResolverReq; - dbResolverReq[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] = - NYql::TDatabaseAuth{ - NYql::TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, serviceAccountIdSignature).ToJson(), - true, - true}; + // check every row + std::vector<i32> constants(ROWS_COUNT, 42); + MATCH_RESULT_WITH_INPUT(constants, resultSet, GetInt32); + } + + Y_UNIT_TEST(PostgreSQLSelectConstant) { + TestSelectConstant(EProviderType::PostgreSQL); + } - NYql::TDatabaseResolverResponse::TDatabaseEndpointsMap databaseEndpointsMap; - databaseEndpointsMap[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] = - NYql::TDatabaseResolverResponse::TEndpoint{endpoint, clusterId}; - auto dbResolverPromise = NThreading::NewPromise<NYql::TDatabaseResolverResponse>(); - dbResolverPromise.SetValue(NYql::TDatabaseResolverResponse(std::move(databaseEndpointsMap), true)); + Y_UNIT_TEST(ClickHouseManagedSelectConstant) { + TestSelectConstant(EProviderType::ClickHouse); + } - EXPECT_CALL(*databaseAsyncResolverMock, ResolveIds(DatabaseAuthMapMatcher(dbResolverReq))).WillOnce(Return(dbResolverPromise.GetFuture())); + void TestSelectCount(EProviderType providerType) { + // prepare mock + auto clientMock = std::make_shared<TConnectorClientMock>(); + + const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); + + constexpr size_t ROWS_COUNT = 5; + + // step 1: DescribeTable + clientMock->ExpectDescribeTable() + .DataSourceInstance(dataSourceInstance) + .Response() + .Column("col1", Ydb::Type::UINT16) + .Column("col2", Ydb::Type::DOUBLE); + + // step 2: ListSplits + clientMock->ExpectListSplits() + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + // Empty + .Done() + .Done() + .Response() + .Split() + .Description("some binary description") + .Select() + .DataSourceInstance(dataSourceInstance) + .What(); + + // step 3: ReadSplits + clientMock->ExpectReadSplits() + .DataSourceInstance(dataSourceInstance) + .Split() + .Description("some binary description") + .Select() + .DataSourceInstance(dataSourceInstance) + .What() + .Done() + .Done() + .Done() + .Response() + .RecordBatch(MakeEmptyRecordBatch(ROWS_COUNT)); + + // prepare database resolver mock + std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock; + if (providerType == EProviderType::ClickHouse) { + databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>(); + databaseAsyncResolverMock->AddClickHouseCluster(); + } // run test - auto kikimr = MakeKikimrRunner(nullptr, connectorClient, databaseAsyncResolver); + auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock); - auto tc = kikimr->GetTableClient(); - auto session = tc.CreateSession().GetValueSync().GetSession(); - const TString query1 = fmt::format( - R"( - CREATE OBJECT sa_signature (TYPE SECRET) WITH (value=sa_signature); - CREATE OBJECT ch_managed_password (TYPE SECRET) WITH (value={password}); - - CREATE EXTERNAL DATA SOURCE ch_managed WITH ( - SOURCE_TYPE="{source_type}", - MDB_CLUSTER_ID="{cluster_id}", - AUTH_METHOD="MDB_BASIC", - SERVICE_ACCOUNT_ID="{service_account_id}", - SERVICE_ACCOUNT_SECRET_NAME="{service_account_id_signature}", - LOGIN="{login}", - PASSWORD_SECRET_NAME="ch_managed_password", - USE_TLS="{use_tls}", - PROTOCOL="{protocol}" - ); - )", - "cluster_id"_a = clusterId, - "password"_a = password, - "login"_a = login, - "use_tls"_a = useTls ? "TRUE" : "FALSE", - "protocol"_a = NYql::NConnector::NApi::EProtocol_Name(protocol), - "service_account_id"_a = serviceAccountId, - "service_account_id_signature"_a = serviceAccountIdSignature, - "source_type"_a = ToString(sourceType)); - auto result = session.ExecuteSchemeQuery(query1).GetValueSync(); - UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - - const TString query2 = fmt::format( + CreateExternalDataSource(providerType, kikimr); + + const TString query = fmt::format( R"( - SELECT * FROM ch_managed.`{database_name}.{table_name}`; + SELECT COUNT(*) FROM {data_source_name}.`{database_name}.{table_name}`; )", - "database_name"_a = databaseName, - "table_name"_a = tableName); + "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME, + "database_name"_a = DEFAULT_DATABASE, + "table_name"_a = DEFAULT_TABLE); auto db = kikimr->GetQueryClient(); - auto scriptExecutionOperation = db.ExecuteScript(query2).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); - UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString()); - TResultSetParser resultSet(results.ExtractResultSet()); + TResultSetParser resultSet(queryResult.GetResultSetParser(0)); UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), colData.size()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); // check every row - MATCH_RESULT_WITH_INPUT(colData, resultSet, GetUint16); + std::vector<ui64> result = { ROWS_COUNT }; + MATCH_RESULT_WITH_INPUT(result, resultSet, GetUint64); + } + + Y_UNIT_TEST(PostgreSQLSelectCount) { + TestSelectCount(EProviderType::PostgreSQL); + } + + Y_UNIT_TEST(ClickHouseSelectCount) { + TestSelectCount(EProviderType::ClickHouse); } } } diff --git a/ydb/core/kqp/ut/federated_query/generic/ya.make b/ydb/core/kqp/ut/federated_query/generic/ya.make index 6b90a9d5dd..8ea901c175 100644 --- a/ydb/core/kqp/ut/federated_query/generic/ya.make +++ b/ydb/core/kqp/ut/federated_query/generic/ya.make @@ -1,5 +1,7 @@ UNITTEST_FOR(ydb/core/kqp) +FORK_SUBTESTS() + SRCS( kqp_generic_provider_ut.cpp ) @@ -9,6 +11,7 @@ PEERDIR( contrib/libs/fmt ydb/core/kqp/ut/common ydb/core/kqp/ut/federated_query/common + ydb/library/yql/providers/generic/connector/libcpp/ut_helpers ydb/library/yql/sql/pg_dummy ) diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 5bfbe4a47e..0e1bbe3a53 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -742,7 +742,14 @@ namespace Tests { Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } if (Settings->IsEnableMetadataProvider()) { - auto* actor = NMetadata::NProvider::CreateService(NMetadata::NProvider::TConfig()); + NKikimrConfig::TMetadataProviderConfig cfgProto; + cfgProto.SetRefreshPeriodSeconds(1); + cfgProto.SetEnabled(true); + cfgProto.MutableRequestConfig()->SetRetryPeriodStartSeconds(1); + cfgProto.MutableRequestConfig()->SetRetryPeriodFinishSeconds(30); + NMetadata::NProvider::TConfig cfg; + cfg.DeserializeFromProto(cfgProto); + auto* actor = NMetadata::NProvider::CreateService(cfg); const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go index 772ba72f63..cd4e747fcb 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go +++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb/library/go/core/log" api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common" "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils" @@ -113,21 +114,37 @@ func (h *handlerImpl[CONN]) ReadSplit( return fmt.Errorf("convert Select.What.Items to Ydb.Columns: %w", err) } - for i, column := range columns { - sb.WriteString(column.GetName()) - - if i != len(columns)-1 { - sb.WriteString(", ") - } + // for the case of empty column set select some constant for constructing a valid sql statement + if len(columns) == 0 { + sb.WriteString("0") var acceptor any - acceptor, err = h.typeMapper.YDBTypeToAcceptor(column.GetType()) + ydbType := Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}} + acceptor, err = h.typeMapper.YDBTypeToAcceptor(&ydbType) + if err != nil { return fmt.Errorf("map ydb column to acceptor: %w", err) } acceptors = append(acceptors, acceptor) + } else { + for i, column := range columns { + sb.WriteString(column.GetName()) + + if i != len(columns)-1 { + sb.WriteString(", ") + } + + var acceptor any + + acceptor, err = h.typeMapper.YDBTypeToAcceptor(column.GetType()) + if err != nil { + return fmt.Errorf("map ydb column to acceptor: %w", err) + } + + acceptors = append(acceptors, acceptor) + } } // SELECT $columns FROM $from diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go index 0db0782a43..651d381383 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go @@ -82,3 +82,59 @@ func (cb *columnarBufferArrowIPCStreaming) Release() { b.Release() } } + +// special implementation for buffer that writes schema with empty columns set +type columnarBufferArrowIPCStreamingEmptyColumns struct { + arrowAllocator memory.Allocator + readLimiter ReadLimiter + schema *arrow.Schema + typeMapper TypeMapper + rowsAdded int64 +} + +// AddRow saves a row obtained from the datasource into the buffer +func (cb *columnarBufferArrowIPCStreamingEmptyColumns) AddRow(acceptors []any) error { + if len(acceptors) != 1 { + return fmt.Errorf("expected 1 rows acceptor, got %v", len(acceptors)) + } + + if err := cb.readLimiter.AddRow(); err != nil { + return fmt.Errorf("check read limiter: %w", err) + } + + cb.rowsAdded++ + + return nil +} + +// ToResponse returns all the accumulated data and clears buffer +func (cb *columnarBufferArrowIPCStreamingEmptyColumns) ToResponse() (*api_service_protos.TReadSplitsResponse, error) { + columns := make([]arrow.Array, 0) + + record := array.NewRecord(cb.schema, columns, cb.rowsAdded) + + // prepare arrow writer + var buf bytes.Buffer + + writer := ipc.NewWriter(&buf, ipc.WithSchema(cb.schema), ipc.WithAllocator(cb.arrowAllocator)) + + if err := writer.Write(record); err != nil { + return nil, fmt.Errorf("write record: %w", err) + } + + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("close arrow writer: %w", err) + } + + out := &api_service_protos.TReadSplitsResponse{ + Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{ + ArrowIpcStreaming: buf.Bytes(), + }, + } + + return out, nil +} + +// Frees resources if buffer is no longer used +func (cb *columnarBufferArrowIPCStreamingEmptyColumns) Release() { +} diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go index 5d6b683940..460387701f 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go +++ b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go @@ -40,6 +40,16 @@ func (cbf *ColumnarBufferFactory) MakeBuffer( return nil, fmt.Errorf("convert Select.What to Ydb.Types: %w", err) } + if len(ydbTypes) == 0 { + return &columnarBufferArrowIPCStreamingEmptyColumns{ + arrowAllocator: cbf.arrowAllocator, + schema: schema, + readLimiter: cbf.readLimiterFactory.MakeReadLimiter(logger), + typeMapper: typeMapper, + rowsAdded: 0, + }, nil + } + return &columnarBufferArrowIPCStreaming{ arrowAllocator: cbf.arrowAllocator, schema: schema, diff --git a/ydb/library/yql/providers/generic/connector/app/server/validate.go b/ydb/library/yql/providers/generic/connector/app/server/validate.go index 486d11ea0e..ac9b5df2df 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/validate.go +++ b/ydb/library/yql/providers/generic/connector/app/server/validate.go @@ -52,10 +52,6 @@ func validateSelect(logger log.Logger, slct *api_service_protos.TSelect) error { return fmt.Errorf("validate data source instance: %w", err) } - if len(slct.GetWhat().GetItems()) == 0 { - return fmt.Errorf("empty items: %w", utils.ErrInvalidRequest) - } - return nil } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt index 3e40d9e467..82847e3680 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt @@ -7,6 +7,7 @@ add_subdirectory(cli) +add_subdirectory(ut_helpers) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt index cc8e33fa78..822344f764 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt @@ -7,6 +7,7 @@ add_subdirectory(cli) +add_subdirectory(ut_helpers) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt index cc8e33fa78..822344f764 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt @@ -7,6 +7,7 @@ add_subdirectory(cli) +add_subdirectory(ut_helpers) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt index 3e40d9e467..82847e3680 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt @@ -7,6 +7,7 @@ add_subdirectory(cli) +add_subdirectory(ut_helpers) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..de9202ff92 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(connector-libcpp-ut_helpers) +target_compile_options(connector-libcpp-ut_helpers PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(connector-libcpp-ut_helpers PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + cpp-testing-gmock_in_unittest + cpp-testing-unittest + kqp-ut-common + providers-common-db_id_async_resolver + providers-common-structured_token + connector-api-common + generic-connector-libcpp +) +target_sources(connector-libcpp-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +) diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..7894a2a5b6 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(connector-libcpp-ut_helpers) +target_compile_options(connector-libcpp-ut_helpers PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(connector-libcpp-ut_helpers PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + cpp-testing-gmock_in_unittest + cpp-testing-unittest + kqp-ut-common + providers-common-db_id_async_resolver + providers-common-structured_token + connector-api-common + generic-connector-libcpp +) +target_sources(connector-libcpp-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +) diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..7894a2a5b6 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(connector-libcpp-ut_helpers) +target_compile_options(connector-libcpp-ut_helpers PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(connector-libcpp-ut_helpers PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + cpp-testing-gmock_in_unittest + cpp-testing-unittest + kqp-ut-common + providers-common-db_id_async_resolver + providers-common-structured_token + connector-api-common + generic-connector-libcpp +) +target_sources(connector-libcpp-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +) diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..de9202ff92 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(connector-libcpp-ut_helpers) +target_compile_options(connector-libcpp-ut_helpers PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(connector-libcpp-ut_helpers PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-fmt + cpp-testing-gmock_in_unittest + cpp-testing-unittest + kqp-ut-common + providers-common-db_id_async_resolver + providers-common-structured_token + connector-api-common + generic-connector-libcpp +) +target_sources(connector-libcpp-ut_helpers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +) diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp new file mode 100644 index 0000000000..2fa4b253a0 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp @@ -0,0 +1,98 @@ +#include "connector_client_mock.h" + +#include <fmt/format.h> + +namespace NYql::NConnector::NTest { + + using namespace fmt::literals; + + void CreatePostgreSQLExternalDataSource( + const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr, + const TString& dataSourceName, + NApi::EProtocol protocol, + const TString& host, + int port, + const TString& login, + const TString& password, + bool useTls) + { + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format( + R"( + CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password}); + + CREATE EXTERNAL DATA SOURCE {data_source_name} WITH ( + SOURCE_TYPE="{source_type}", + LOCATION="{host}:{port}", + AUTH_METHOD="BASIC", + LOGIN="{login}", + PASSWORD_SECRET_NAME="{data_source_name}_password", + USE_TLS="{use_tls}", + PROTOCOL="{protocol}" + ); + )", + "data_source_name"_a = dataSourceName, + "host"_a = host, + "port"_a = port, + "login"_a = login, + "password"_a = password, + "use_tls"_a = useTls ? "TRUE" : "FALSE", + "protocol"_a = NApi::EProtocol_Name(protocol), + "source_type"_a = PG_SOURCE_TYPE); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + void CreateClickHouseExternalDataSource( + const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr, + const TString& dataSourceName, + NApi::EProtocol protocol, + const TString& clickHouseClusterId, + const TString& login, + const TString& password, + bool useTls, + const TString& serviceAccountId, + const TString& serviceAccountIdSignature, + NYql::NConnector::EExternalDataSource sourceType) + { + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format( + R"( + CREATE OBJECT sa_signature (TYPE SECRET) WITH (value=sa_signature); + CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password}); + + CREATE EXTERNAL DATA SOURCE {data_source_name} WITH ( + SOURCE_TYPE="{source_type}", + MDB_CLUSTER_ID="{cluster_id}", + AUTH_METHOD="MDB_BASIC", + SERVICE_ACCOUNT_ID="{service_account_id}", + SERVICE_ACCOUNT_SECRET_NAME="{service_account_id_signature}", + LOGIN="{login}", + PASSWORD_SECRET_NAME="{data_source_name}_password", + USE_TLS="{use_tls}", + PROTOCOL="{protocol}" + ); + )", + "cluster_id"_a = clickHouseClusterId, + "data_source_name"_a = dataSourceName, + "login"_a = login, + "password"_a = password, + "use_tls"_a = useTls ? "TRUE" : "FALSE", + "protocol"_a = NYql::NConnector::NApi::EProtocol_Name(protocol), + "service_account_id"_a = serviceAccountId, + "service_account_id_signature"_a = serviceAccountIdSignature, + "source_type"_a = ToString(sourceType)); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount) { + return arrow::RecordBatch::Make( + std::make_shared<arrow::Schema>(arrow::FieldVector()), + static_cast<i64>(rowsCount), + std::vector<std::shared_ptr<arrow::Array>>()); + } + +} // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h new file mode 100644 index 0000000000..58fb24caf4 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -0,0 +1,502 @@ +#pragma once +#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h> + +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> + +#include <library/cpp/testing/gmock_in_unittest/gmock.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/string.h> + +#include <arrow/api.h> + +#include <google/protobuf/util/message_differencer.h> + +#include <memory> + +namespace NYql::NConnector::NTest { + using namespace testing; + +#define EXPR_SETTER(name, set_expr) \ + TBuilder& name(const auto& value) { \ + this->Result_->set_expr(value); \ + return static_cast<TBuilder&>(*this); \ + } + +#define SETTER(name, protoName) EXPR_SETTER(name, Y_CAT(set_, protoName)) + +#define SUBPROTO_BUILDER(name, fieldExpr, protoType, builderType) \ + builderType name() { \ + return builderType(this->Result_->fieldExpr(), static_cast<TBuilder*>(this)); \ + } \ + TBuilder& name(const protoType& proto) { \ + this->Result_->fieldExpr()->CopyFrom(proto); \ + return static_cast<TBuilder&>(*this); \ + } + +#define DATA_SOURCE_INSTANCE_SUBBUILDER() \ + TBuilder& DataSourceInstance(const NApi::TDataSourceInstance& proto) { \ + this->Result_->mutable_data_source_instance()->CopyFrom(proto); \ + return static_cast<TBuilder&>(*this); \ + } \ + TPostgreSQLDataSourceInstanceBuilder<TBuilder> PostgreSQLDataSourceInstance() { \ + return TPostgreSQLDataSourceInstanceBuilder<TBuilder>( \ + this->Result_->mutable_data_source_instance(), \ + static_cast<TBuilder*>(this)); \ + } \ + TClickHouseDataSourceInstanceBuilder<TBuilder> ClickHouseDataSourceInstance() { \ + return TClickHouseDataSourceInstanceBuilder<TBuilder>( \ + this->Result_->mutable_data_source_instance(), \ + static_cast<TBuilder*>(this)); \ + } + + MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") { + return google::protobuf::util::MessageDifferencer::Equals(arg, expected); + } + +#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \ + { \ + for (const auto& val : INPUT) { \ + UNIT_ASSERT(RESULT_SET.TryNextRow()); \ + UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(0).GETTER(), val); \ + } \ + } + + template <class TArrowBuilderType, class TColDataType> + std::shared_ptr<arrow::RecordBatch> MakeRecordBatch( + const TString& columnName, + const std::vector<TColDataType>& input, + std::shared_ptr<arrow::DataType> dataType) { + TArrowBuilderType builder; + UNIT_ASSERT_EQUAL(builder.AppendValues(input), arrow::Status::OK()); + std::shared_ptr<arrow::Array> columnData; + UNIT_ASSERT_EQUAL(builder.Finish(&columnData), arrow::Status::OK()); + auto field = arrow::field(columnName, std::move(dataType)); + auto schema = arrow::schema({field}); + return arrow::RecordBatch::Make(schema, columnData->length(), {columnData}); + } + + // Make record batch with schema with no columns + std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount); + + template <class TParent> + struct TWithParentBuilder { + explicit TWithParentBuilder(TParent* parent) + : Parent_(parent) + { + } + + TParent& Done() { + Y_ABORT_UNLESS(Parent_); // Use only with parent builder + return *Parent_; + } + + protected: + TParent* Parent_ = nullptr; + }; + + // No parent + template <> + struct TWithParentBuilder<void> { + explicit TWithParentBuilder(void*) + { + } + }; + + template <class TParent, class TResultPtrType> + struct TResponseBuilder: public TWithParentBuilder<TParent> { + explicit TResponseBuilder(TResultPtrType result = std::make_shared<typename TResultPtrType::element_type>(), TParent* parent = nullptr) + : TWithParentBuilder<TParent>(parent) + , Result_(std::move(result)) + { + } + + TResultPtrType GetResult() { + return Result_; + } + + protected: + TResultPtrType Result_; + }; + + template <class TParent, class TProto> + struct TProtoBuilder: public TWithParentBuilder<TParent> { + explicit TProtoBuilder(TProto* result = nullptr, TParent* parent = nullptr) + : TWithParentBuilder<TParent>(parent) + , Result_(result) + { + if (!Result_) { + Result_ = &MaybeResult_.ConstructInPlace(); + } + } + + TProto GetResult() { + return *Result_; + } + + protected: + TProto* Result_ = nullptr; + TMaybe<TProto> MaybeResult_; + }; + + void CreatePostgreSQLExternalDataSource( + const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr, + const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME, + NApi::EProtocol protocol = DEFAULT_PG_PROTOCOL, + const TString& host = DEFAULT_PG_HOST, + int port = DEFAULT_PG_PORT, + const TString& login = DEFAULT_LOGIN, + const TString& password = DEFAULT_PASSWORD, + bool useTls = DEFAULT_USE_TLS); + + void CreateClickHouseExternalDataSource( + const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr, + const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME, + NApi::EProtocol protocol = DEFAULT_CH_PROTOCOL, + const TString& clickHouseClusterId = DEFAULT_CH_CLUSTER_ID, + const TString& login = DEFAULT_LOGIN, + const TString& password = DEFAULT_PASSWORD, + bool useTls = DEFAULT_USE_TLS, + const TString& serviceAccountId = DEFAULT_CH_SERVICE_ACCOUNT_ID, + const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE, + NYql::NConnector::EExternalDataSource sourceType = DEFAULT_CH_SOURCE_TYPE); + + class TConnectorClientMock: public NYql::NConnector::IClient { + public: + MOCK_METHOD(TDescribeTableResult::TPtr, DescribeTable, (const NApi::TDescribeTableRequest& request), (override)); + MOCK_METHOD(TListSplitsResult::TPtr, ListSplits, (const NApi::TListSplitsRequest& request), (override)); + MOCK_METHOD(TReadSplitsResult::TPtr, ReadSplits, (const NApi::TReadSplitsRequest& request), (override)); + + // + // Expectation helpers + // + + template <class TDerived, class TParent = void /* no parent by default */> + struct TBaseDataSourceInstanceBuilder: public TProtoBuilder<TParent, NApi::TDataSourceInstance> { + using TBuilder = TDerived; + + explicit TBaseDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TDataSourceInstance>(result, parent) + { + } + + SETTER(Database, database); + EXPR_SETTER(Login, mutable_credentials()->mutable_basic()->set_username); + EXPR_SETTER(Password, mutable_credentials()->mutable_basic()->set_password); + EXPR_SETTER(Host, mutable_endpoint()->set_host); + EXPR_SETTER(Port, mutable_endpoint()->set_port); + SETTER(UseTls, use_tls); + SETTER(Kind, kind); + SETTER(Protocol, protocol); + + protected: + void FillWithDefaults() { + Database(DEFAULT_DATABASE); + Login(DEFAULT_LOGIN); + Password(DEFAULT_PASSWORD); + UseTls(DEFAULT_USE_TLS); + } + }; + + template <class TParent = void /* no parent by default */> + struct TPostgreSQLDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder<TPostgreSQLDataSourceInstanceBuilder<TParent>, TParent> { + using TBase = TBaseDataSourceInstanceBuilder<TPostgreSQLDataSourceInstanceBuilder<TParent>, TParent>; + + explicit TPostgreSQLDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr) + : TBase(result, parent) + { + FillWithDefaults(); + } + + void FillWithDefaults() { + TBase::FillWithDefaults(); + this->Host(DEFAULT_PG_HOST); + this->Port(DEFAULT_PG_PORT); + this->Kind(NApi::EDataSourceKind::POSTGRESQL); + this->Protocol(DEFAULT_PG_PROTOCOL); + } + }; + + template <class TParent = void /* no parent by default */> + struct TClickHouseDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder<TClickHouseDataSourceInstanceBuilder<TParent>, TParent> { + using TBase = TBaseDataSourceInstanceBuilder<TClickHouseDataSourceInstanceBuilder<TParent>, TParent>; + + explicit TClickHouseDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr) + : TBase(result, parent) + { + FillWithDefaults(); + } + + void FillWithDefaults() { + TBase::FillWithDefaults(); + this->Host(DEFAULT_CH_HOST); + this->Port(DEFAULT_CH_PORT); + this->Kind(NApi::EDataSourceKind::CLICKHOUSE); + this->Protocol(DEFAULT_CH_PROTOCOL); + } + }; + + template <class TParent = void /* no parent by default */> + struct TDescribeTableResultBuilder: public TResponseBuilder<TParent, TDescribeTableResult::TPtr> { + using TBuilder = TDescribeTableResultBuilder<TParent>; + + explicit TDescribeTableResultBuilder(TDescribeTableResult::TPtr result = std::make_shared<TDescribeTableResult>(), TParent* parent = nullptr) + : TResponseBuilder<TParent, TDescribeTableResult::TPtr>(std::move(result), parent) + { + FillWithDefaults(); + } + + EXPR_SETTER(Status, Error.set_status); + + // TODO: add nonprimitive types + TBuilder& Column(const TString& name, Ydb::Type::PrimitiveTypeId typeId) { + auto* col = this->Result_->Schema.add_columns(); + col->set_name(name); + col->mutable_type()->set_type_id(typeId); + return *this; + } + + void FillWithDefaults() { + Status(Ydb::StatusIds::SUCCESS); + } + }; + + struct TDescribeTableExpectationBuilder: public TProtoBuilder<int, NApi::TDescribeTableRequest> { + using TBuilder = TDescribeTableExpectationBuilder; + + explicit TDescribeTableExpectationBuilder(NApi::TDescribeTableRequest* result = nullptr, TConnectorClientMock* mock = nullptr) + : TProtoBuilder<int, NApi::TDescribeTableRequest>(result) + , Mock_(mock) + { + FillWithDefaults(); + } + + explicit TDescribeTableExpectationBuilder(TConnectorClientMock* mock) + : TDescribeTableExpectationBuilder(nullptr, mock) + { + } + + ~TDescribeTableExpectationBuilder() { + SetExpectation(); + } + + SETTER(Table, table); + DATA_SOURCE_INSTANCE_SUBBUILDER(); + + TDescribeTableResultBuilder<TBuilder> Response() { + return TDescribeTableResultBuilder<TBuilder>(ResponseResult_, this); + } + + void FillWithDefaults() { + Table(DEFAULT_TABLE); + Response(); + } + + private: + void SetExpectation() { + EXPECT_CALL(*Mock_, DescribeTable(ProtobufRequestMatcher(*Result_))) + .WillOnce(Return(ResponseResult_)); + } + + private: + TConnectorClientMock* Mock_ = nullptr; + TDescribeTableResult::TPtr ResponseResult_ = std::make_shared<TDescribeTableResult>(); + }; + + template <class TParent = void /* no parent by default */> + struct TWhatBuilder: public TProtoBuilder<TParent, NApi::TSelect::TWhat> { + using TBuilder = TWhatBuilder<TParent>; + + explicit TWhatBuilder(NApi::TSelect::TWhat* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TSelect::TWhat>(result, parent) + { + FillWithDefaults(); + } + + // TODO: add nonprimitive types + TBuilder& Column(const TString& name, Ydb::Type::PrimitiveTypeId typeId) { + auto* col = this->Result_->add_items()->mutable_column(); + col->set_name(name); + col->mutable_type()->set_type_id(typeId); + return *this; + } + + void FillWithDefaults() { + } + }; + + template <class TParent = void /* no parent by default */> + struct TSelectBuilder: public TProtoBuilder<TParent, NApi::TSelect> { + using TBuilder = TSelectBuilder<TParent>; + + explicit TSelectBuilder(NApi::TSelect* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TSelect>(result, parent) + { + FillWithDefaults(); + } + + EXPR_SETTER(Table, mutable_from()->set_table); + DATA_SOURCE_INSTANCE_SUBBUILDER(); + SUBPROTO_BUILDER(What, mutable_what, NApi::TSelect::TWhat, TWhatBuilder<TBuilder>); + + void FillWithDefaults() { + Table(DEFAULT_TABLE); + } + }; + + template <class TParent = void /* no parent by default */> + struct TSplitBuilder: public TProtoBuilder<TParent, NApi::TSplit> { + using TBuilder = TSplitBuilder<TParent>; + + explicit TSplitBuilder(NApi::TSplit* result = nullptr, TParent* parent = nullptr) + : TProtoBuilder<TParent, NApi::TSplit>(result, parent) + { + FillWithDefaults(); + } + + SETTER(Description, description); + SUBPROTO_BUILDER(Select, mutable_select, NApi::TSelect, TSelectBuilder<TBuilder>); + + void FillWithDefaults() { + Select(); + } + }; + + template <class TParent = void /* no parent by default */> + struct TListSplitsResultBuilder: public TResponseBuilder<TParent, TListSplitsResult::TPtr> { + using TBuilder = TListSplitsResultBuilder<TParent>; + + explicit TListSplitsResultBuilder(TListSplitsResult::TPtr result = std::make_shared<TListSplitsResult>(), TParent* parent = nullptr) + : TResponseBuilder<TParent, TListSplitsResult::TPtr>(std::move(result), parent) + { + FillWithDefaults(); + } + + EXPR_SETTER(Status, Error.set_status); + + TSplitBuilder<TBuilder> Split() { + return TSplitBuilder<TBuilder>(&this->Result_->Splits.emplace_back(), this); + } + + void FillWithDefaults() { + Status(Ydb::StatusIds::SUCCESS); + } + }; + + struct TListSplitsExpectationBuilder: public TProtoBuilder<int, NApi::TListSplitsRequest> { + using TBuilder = TListSplitsExpectationBuilder; + + explicit TListSplitsExpectationBuilder(NApi::TListSplitsRequest* result = nullptr, TConnectorClientMock* mock = nullptr) + : TProtoBuilder<int, NApi::TListSplitsRequest>(result) + , Mock_(mock) + + { + FillWithDefaults(); + } + + explicit TListSplitsExpectationBuilder(TConnectorClientMock* mock) + : TListSplitsExpectationBuilder(nullptr, mock) + { + } + + ~TListSplitsExpectationBuilder() { + SetExpectation(); + } + + SUBPROTO_BUILDER(Select, add_selects, NApi::TSelect, TSelectBuilder<TBuilder>); + + TListSplitsResultBuilder<TBuilder> Response() { + return TListSplitsResultBuilder<TBuilder>(ResponseResult_, this); + } + + void FillWithDefaults() { + Response(); + } + + private: + void SetExpectation() { + EXPECT_CALL(*Mock_, ListSplits(ProtobufRequestMatcher(*Result_))) + .WillOnce(Return(ResponseResult_)); + } + + private: + TConnectorClientMock* Mock_ = nullptr; + TListSplitsResult::TPtr ResponseResult_ = std::make_shared<TListSplitsResult>(); + }; + + template <class TParent = void /* no parent by default */> + struct TReadSplitsResultBuilder: public TResponseBuilder<TParent, TReadSplitsResult::TPtr> { + using TBuilder = TReadSplitsResultBuilder<TParent>; + + explicit TReadSplitsResultBuilder(TReadSplitsResult::TPtr result = std::make_shared<TReadSplitsResult>(), TParent* parent = nullptr) + : TResponseBuilder<TParent, TReadSplitsResult::TPtr>(std::move(result), parent) + { + FillWithDefaults(); + } + + EXPR_SETTER(Status, Error.set_status); + EXPR_SETTER(RecordBatch, RecordBatches.push_back); + + void FillWithDefaults() { + Status(Ydb::StatusIds::SUCCESS); + } + }; + + struct TReadSplitsExpectationBuilder: public TProtoBuilder<int, NApi::TReadSplitsRequest> { + using TBuilder = TReadSplitsExpectationBuilder; + + explicit TReadSplitsExpectationBuilder(NApi::TReadSplitsRequest* result = nullptr, TConnectorClientMock* mock = nullptr) + : TProtoBuilder<int, NApi::TReadSplitsRequest>(result) + , Mock_(mock) + + { + FillWithDefaults(); + } + + explicit TReadSplitsExpectationBuilder(TConnectorClientMock* mock) + : TReadSplitsExpectationBuilder(nullptr, mock) + { + } + + ~TReadSplitsExpectationBuilder() { + SetExpectation(); + } + + DATA_SOURCE_INSTANCE_SUBBUILDER(); + SUBPROTO_BUILDER(Split, add_splits, NApi::TSplit, TSplitBuilder<TBuilder>); + SETTER(Format, format); + + TReadSplitsResultBuilder<TBuilder> Response() { + return TReadSplitsResultBuilder<TBuilder>(ResponseResult_, this); + } + + void FillWithDefaults() { + Format(NApi::TReadSplitsRequest::ARROW_IPC_STREAMING); + } + + private: + void SetExpectation() { + EXPECT_CALL(*Mock_, ReadSplits(ProtobufRequestMatcher(*Result_))) + .WillOnce(Return(ResponseResult_)); + } + + private: + TConnectorClientMock* Mock_ = nullptr; + TReadSplitsResult::TPtr ResponseResult_ = std::make_shared<TReadSplitsResult>(); + }; + + TDescribeTableExpectationBuilder ExpectDescribeTable() { + return TDescribeTableExpectationBuilder(this); + } + + TListSplitsExpectationBuilder ExpectListSplits() { + return TListSplitsExpectationBuilder(this); + } + + TReadSplitsExpectationBuilder ExpectReadSplits() { + return TReadSplitsExpectationBuilder(this); + } + }; +} // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp new file mode 100644 index 0000000000..fa7ffafd20 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp @@ -0,0 +1 @@ +#include "database_resolver_mock.h" diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h new file mode 100644 index 0000000000..b21259cdcd --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h @@ -0,0 +1,44 @@ +#pragma once +#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h> + +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> + +#include <library/cpp/testing/gmock_in_unittest/gmock.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql::NConnector::NTest { + using namespace testing; + + MATCHER_P(DatabaseAuthMapMatcher, expected, "database auth map matcher") { + return arg == expected; + } + + class TDatabaseAsyncResolverMock: public NYql::IDatabaseAsyncResolver { + public: + MOCK_METHOD(NThreading::TFuture<NYql::TDatabaseResolverResponse>, ResolveIds, (const TDatabaseAuthMap& ids), (const override)); + + void AddClickHouseCluster( + const TString& clusterId = DEFAULT_CH_CLUSTER_ID, + const TString& endpoint = DEFAULT_CH_ENDPOINT, + const TString& serviceAccountId = DEFAULT_CH_SERVICE_ACCOUNT_ID, + const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE) { + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap dbResolverReq; + dbResolverReq[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] = + NYql::TDatabaseAuth{ + NYql::TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, serviceAccountIdSignature).ToJson(), + true, + true}; + + NYql::TDatabaseResolverResponse::TDatabaseEndpointsMap databaseEndpointsMap; + databaseEndpointsMap[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] = + NYql::TDatabaseResolverResponse::TEndpoint{endpoint, clusterId}; + auto dbResolverPromise = NThreading::NewPromise<NYql::TDatabaseResolverResponse>(); + dbResolverPromise.SetValue(NYql::TDatabaseResolverResponse(std::move(databaseEndpointsMap), true)); + + auto result = dbResolverPromise.GetFuture(); + EXPECT_CALL(*this, ResolveIds(DatabaseAuthMapMatcher(dbResolverReq))) + .WillOnce(Return(result)); + } + }; +} // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp new file mode 100644 index 0000000000..71d90e1099 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp @@ -0,0 +1,18 @@ +#include "defaults.h" + +namespace NYql::NConnector::NTest { + extern const TString DEFAULT_DATABASE = "pgdb"; + extern const TString DEFAULT_LOGIN = "crab"; + extern const TString DEFAULT_PASSWORD = "qwerty12345"; + extern const TString DEFAULT_TABLE = "example_1"; + extern const TString DEFAULT_DATA_SOURCE_NAME = "external_data_source"; + + extern const TString DEFAULT_PG_HOST = "localhost"; + extern const TString PG_SOURCE_TYPE = "PostgreSQL"; + + extern const TString DEFAULT_CH_HOST = "rc1a-d6dv17lv47v5mcop.db.yandex.net"; + extern const TString DEFAULT_CH_ENDPOINT = TStringBuilder() << DEFAULT_CH_HOST << ':' << DEFAULT_CH_PORT; + extern const TString DEFAULT_CH_CLUSTER_ID = "ch-managed"; + extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID = "sa"; + extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE = "sa_signature"; +} // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h new file mode 100644 index 0000000000..e2b82727ed --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h @@ -0,0 +1,29 @@ +#pragma once +#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/external_data_source.h> + +#include <util/generic/string.h> +#include <util/string/builder.h> + +namespace NYql::NConnector::NTest { + extern const TString DEFAULT_DATABASE; + extern const TString DEFAULT_LOGIN; + extern const TString DEFAULT_PASSWORD; + extern const TString DEFAULT_TABLE; + extern const TString DEFAULT_DATA_SOURCE_NAME; + + extern const TString DEFAULT_PG_HOST; + constexpr int DEFAULT_PG_PORT = 5432; + constexpr bool DEFAULT_USE_TLS = true; + extern const TString PG_SOURCE_TYPE; + constexpr NApi::EProtocol DEFAULT_PG_PROTOCOL = NApi::EProtocol::NATIVE; + + extern const TString DEFAULT_CH_HOST; + constexpr int DEFAULT_CH_PORT = 8443; + extern const TString DEFAULT_CH_ENDPOINT; + extern const TString DEFAULT_CH_CLUSTER_ID; + constexpr NApi::EProtocol DEFAULT_CH_PROTOCOL = NApi::EProtocol::HTTP; + constexpr NYql::NConnector::EExternalDataSource DEFAULT_CH_SOURCE_TYPE = NYql::NConnector::EExternalDataSource::ClickHouse; + extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID; + extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE; +} // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make new file mode 100644 index 0000000000..660ed340f2 --- /dev/null +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +SRCS( + connector_client_mock.cpp + database_resolver_mock.cpp + defaults.cpp +) + +PEERDIR( + contrib/libs/fmt + library/cpp/testing/gmock_in_unittest + library/cpp/testing/unittest + ydb/core/kqp/ut/common + ydb/library/yql/providers/common/db_id_async_resolver + ydb/library/yql/providers/common/structured_token + ydb/library/yql/providers/generic/connector/api/common + ydb/library/yql/providers/generic/connector/libcpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ya.make index 9c9cf6e52c..f389d52c34 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ya.make +++ b/ydb/library/yql/providers/generic/connector/libcpp/ya.make @@ -26,4 +26,5 @@ END() RECURSE( cli + ut_helpers ) diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp index 719316c467..be30204547 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp @@ -29,9 +29,14 @@ namespace NYql { } TMaybeNode<TExprBase> TrimReadWorld(TExprBase node, TExprContext& ctx) const { - if (const auto maybeRead = node.Cast<TCoLeft>().Input().Maybe<TGenReadTable>()) - return TExprBase(ctx.NewWorld(node.Pos())); - return node; + Y_UNUSED(ctx); + + const auto& maybeRead = node.Cast<TCoLeft>().Input().Maybe<TGenReadTable>(); + if (!maybeRead) { + return node; + } + + return TExprBase(maybeRead.Cast().World().Ptr()); } TMaybeNode<TExprBase> ReadZeroColumns(TExprBase node, TExprContext& ctx) const { diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp index 3c4a439059..bd70ddf3cc 100644 --- a/ydb/services/metadata/secret/ut/ut_secret.cpp +++ b/ydb/services/metadata/secret/ut/ut_secret.cpp @@ -66,8 +66,13 @@ Y_UNIT_TEST_SUITE(Secret) { YDB_ACCESSOR(ui32, ExpectedAccessCount, 1); using TKeyCheckers = TMap<NMetadata::NSecret::TSecretId, TJsonChecker>; YDB_ACCESSOR_DEF(TKeyCheckers, Checkers); - public: + private: + ui64 SecretsCountInLastSnapshot = 0; + ui64 AccessCountInLastSnapshot = 0; + TString LastSnapshotDebugString; + + public: void ResetConditions() { FoundFlag = false; Checkers.clear(); @@ -106,22 +111,29 @@ Y_UNIT_TEST_SUITE(Secret) { void CheckFound(NMetadata::NProvider::TEvRefreshSubscriberData* event) { auto snapshot = event->GetSnapshotAs<NMetadata::NSecret::TSnapshot>(); Y_ABORT_UNLESS(!!snapshot); + SecretsCountInLastSnapshot = snapshot->GetSecrets().size(); + AccessCountInLastSnapshot = snapshot->GetAccess().size(); + LastSnapshotDebugString = snapshot->SerializeToString(); + CheckFound(); + } + + void CheckFound() { if (ExpectedSecretsCount) { - if (snapshot->GetSecrets().size() != ExpectedSecretsCount) { - Cerr << "snapshot->GetSecrets().size() incorrect: " << snapshot->SerializeToString() << Endl; + if (SecretsCountInLastSnapshot != ExpectedSecretsCount) { + Cerr << "snapshot->GetSecrets().size() incorrect: " << LastSnapshotDebugString << Endl; return; } - } else if (snapshot->GetSecrets().size()) { - Cerr << "snapshot->GetSecrets().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl; + } else if (SecretsCountInLastSnapshot) { + Cerr << "snapshot->GetSecrets().size() incorrect (zero expects): " << LastSnapshotDebugString << Endl; return; } if (ExpectedAccessCount) { - if (snapshot->GetAccess().size() != ExpectedAccessCount) { - Cerr << "snapshot->GetAccess().size() incorrect: " << snapshot->SerializeToString() << Endl; + if (AccessCountInLastSnapshot != ExpectedAccessCount) { + Cerr << "snapshot->GetAccess().size() incorrect: " << LastSnapshotDebugString << Endl; return; } - } else if (snapshot->GetAccess().size()) { - Cerr << "snapshot->GetAccess().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl; + } else if (AccessCountInLastSnapshot) { + Cerr << "snapshot->GetAccess().size() incorrect (zero expects): " << LastSnapshotDebugString << Endl; return; } FoundFlag = true; @@ -182,13 +194,13 @@ Y_UNIT_TEST_SUITE(Secret) { UNIT_ASSERT_EQUAL_C(resultData, "[6u]", resultData); } - emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(0); + emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(0).CheckFound(); { const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { runtime.SimulateSleep(TDuration::Seconds(1)); } - Y_ABORT_UNLESS(emulator->IsFound()); + UNIT_ASSERT(emulator->IsFound()); } lHelper.StartSchemaRequest("ALTER OBJECT secret1 (TYPE SECRET) SET value = `abcde`"); @@ -199,26 +211,26 @@ Y_UNIT_TEST_SUITE(Secret) { UNIT_ASSERT_EQUAL_C(resultData, "[10u]", resultData); } - emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(1); + emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(1).CheckFound(); { const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { runtime.SimulateSleep(TDuration::Seconds(1)); } - Y_ABORT_UNLESS(emulator->IsFound()); + UNIT_ASSERT(emulator->IsFound()); } lHelper.StartSchemaRequest("DROP OBJECT `secret1:test@test1` (TYPE SECRET_ACCESS)"); lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET)"); lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/initialization/migrations`"); - emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(0); + emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(0).CheckFound(); { const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { runtime.SimulateSleep(TDuration::Seconds(1)); } - Y_ABORT_UNLESS(emulator->IsFound()); + UNIT_ASSERT(emulator->IsFound()); } } } |