aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-10-10 11:47:29 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-10-10 12:10:18 +0300
commit79d1b401b8e5ad6d481f5e90da4d63a81abaaa45 (patch)
tree4abfe7b9180cdeb2f4b12502eda66f468408bf91
parent656bb1fb2814e586db5de6166ebbb17363d5325b (diff)
downloadydb-79d1b401b8e5ad6d481f5e90da4d63a81abaaa45.tar.gz
YQ Connector:Remove requirement for not empty rows set in connector
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h4
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.cpp8
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp11
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp489
-rw-r--r--ydb/core/kqp/ut/federated_query/generic/ya.make3
-rw-r--r--ydb/core/testlib/test_client.cpp9
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go31
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go56
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go10
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/validate.go4
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt30
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt31
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt31
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt17
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt30
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp98
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h502
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp1
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h44
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp18
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h29
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make22
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/ya.make1
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp11
-rw-r--r--ydb/services/metadata/secret/ut/ut_secret.cpp42
34 files changed, 1248 insertions, 305 deletions
diff --git a/.mapping.json b/.mapping.json
index e6f2333474..5075538ec7 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -7632,6 +7632,11 @@
"ydb/library/yql/providers/generic/connector/libcpp/cli/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/providers/generic/connector/libcpp/cli/CMakeLists.txt":"",
"ydb/library/yql/providers/generic/connector/libcpp/cli/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt":"",
+ "ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/providers/generic/expr_nodes/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/providers/generic/expr_nodes/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/providers/generic/expr_nodes/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
index 5a5c75c46d..9c9ab9b85a 100644
--- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
+++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
@@ -171,7 +171,7 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets
}
public:
- TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime)
+ TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime)
: SecretIds(CreateSecretIds(ownerUserId, secretIds))
, Promise(promise)
, LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") })
@@ -184,7 +184,7 @@ public:
PassAway();
return;
}
-
+
this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser()));
this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup());
Become(&TDescribeSecretsActor::StateFunc);
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
index b878612f62..64e9f21a58 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
@@ -415,7 +415,7 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour
SetError(externalDataSourceMetadata, "identity case is not specified in case of update external data source secrets");
return;
}
- }
+ }
}
}
@@ -452,7 +452,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues
const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName();
auto promise = NewPromise<TDescribeSecretsResponse>();
actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime));
- return promise.GetFuture();
+ return promise.GetFuture();
}
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
@@ -791,7 +791,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
TActorSystem* actorSystem = ActorSystem;
- return future.Apply([actorSystem,table](const TFuture<TTableMetadataResult>& f) {
+ return future.Apply([actorSystem,table](const TFuture<TTableMetadataResult>& f) {
auto result = f.GetValue();
if (!result.Success()) {
return MakeFuture(result);
@@ -815,7 +815,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
auto statServiceId = NStat::MakeStatServiceID(actorSystem->NodeId);
-
+
return SendActorRequest<NStat::TEvStatistics::TEvGetStatistics, NStat::TEvStatistics::TEvGetStatisticsResult, TResult>(
actorSystem,
statServiceId,
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 6e4ef4f038..9f13a00abe 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1538,11 +1538,6 @@ private:
}
void Init(EKikimrQueryType queryType) {
- if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) {
- InitS3Provider();
- InitGenericProvider();
- }
-
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry, TAppData::TimeProvider, TAppData::RandomProvider);
ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
@@ -1568,6 +1563,12 @@ private:
TypesCtx->AddDataSource(providerNames, kikimrDataSource);
TypesCtx->AddDataSink(providerNames, kikimrDataSink);
+
+ if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) {
+ InitS3Provider();
+ InitGenericProvider();
+ }
+
TypesCtx->UdfResolver = CreateSimpleUdfResolver(FuncRegistry);
TypesCtx->TimeProvider = TAppData::TimeProvider;
TypesCtx->RandomProvider = TAppData::RandomProvider;
diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt
index 05878a9cdd..36b6c93dc7 100644
--- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.darwin-x86_64.txt
@@ -24,6 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC
contrib-libs-fmt
kqp-ut-common
ut-federated_query-common
+ connector-libcpp-ut_helpers
yql-sql-pg_dummy
)
target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE
@@ -41,7 +42,7 @@ set_property(
ydb-core-kqp-ut-federated_query-generic
PROPERTY
SPLIT_FACTOR
- 1
+ 10
)
add_yunittest(
NAME
diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt
index eebaeaaa73..2d291e2452 100644
--- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-aarch64.txt
@@ -24,6 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC
contrib-libs-fmt
kqp-ut-common
ut-federated_query-common
+ connector-libcpp-ut_helpers
yql-sql-pg_dummy
)
target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE
@@ -44,7 +45,7 @@ set_property(
ydb-core-kqp-ut-federated_query-generic
PROPERTY
SPLIT_FACTOR
- 1
+ 10
)
add_yunittest(
NAME
diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt
index 08954421d3..4c269c3e99 100644
--- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.linux-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC
contrib-libs-fmt
kqp-ut-common
ut-federated_query-common
+ connector-libcpp-ut_helpers
yql-sql-pg_dummy
)
target_link_options(ydb-core-kqp-ut-federated_query-generic PRIVATE
@@ -45,7 +46,7 @@ set_property(
ydb-core-kqp-ut-federated_query-generic
PROPERTY
SPLIT_FACTOR
- 1
+ 10
)
add_yunittest(
NAME
diff --git a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt
index 2e8597955e..4bf5801d31 100644
--- a/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/ut/federated_query/generic/CMakeLists.windows-x86_64.txt
@@ -24,6 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-federated_query-generic PUBLIC
contrib-libs-fmt
kqp-ut-common
ut-federated_query-common
+ connector-libcpp-ut_helpers
yql-sql-pg_dummy
)
target_sources(ydb-core-kqp-ut-federated_query-generic PRIVATE
@@ -34,7 +35,7 @@ set_property(
ydb-core-kqp-ut-federated_query-generic
PROPERTY
SPLIT_FACTOR
- 1
+ 10
)
add_yunittest(
NAME
diff --git a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
index 87a8f6ca4c..68959b4bfa 100644
--- a/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/generic/kqp_generic_provider_ut.cpp
@@ -1,178 +1,124 @@
-#include <arrow/api.h>
-#include <google/protobuf/util/message_differencer.h>
-
-#include <fmt/format.h>
-#include <library/cpp/testing/gmock_in_unittest/gmock.h>
-#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/env.h>
-
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/federated_query/common/common.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/external_data_source.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_query/query.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/defaults.h>
+#include <util/system/env.h>
+
+#include <arrow/api.h>
+
+#include <google/protobuf/util/message_differencer.h>
+
+#include <fmt/format.h>
+
namespace NKikimr::NKqp {
using namespace NYdb;
using namespace NYdb::NQuery;
using namespace NYql::NConnector;
+ using namespace NYql::NConnector::NTest;
using namespace NKikimr::NKqp::NFederatedQueryTest;
using namespace testing;
using namespace fmt::literals;
- class TConnectorClientMock: public IClient {
- public:
- MOCK_METHOD(TDescribeTableResult::TPtr, DescribeTable, (const NApi::TDescribeTableRequest& request), (override));
- MOCK_METHOD(TListSplitsResult::TPtr, ListSplits, (const NApi::TListSplitsRequest& request), (override));
- MOCK_METHOD(TReadSplitsResult::TPtr, ReadSplits, (const NApi::TReadSplitsRequest& request), (override));
+ enum class EProviderType {
+ PostgreSQL,
+ ClickHouse,
};
- MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") {
- return google::protobuf::util::MessageDifferencer::Equals(arg, expected);
- }
-
- class TDatabaseAsyncResolverMock: public NYql::IDatabaseAsyncResolver {
- public:
- MOCK_METHOD(NThreading::TFuture<NYql::TDatabaseResolverResponse>, ResolveIds, (const TDatabaseAuthMap& ids), (const override));
- };
-
- MATCHER_P(DatabaseAuthMapMatcher, expected, "database auth map matcher") {
- return arg == expected;
- }
-
-#define PREPARE_RECORD_BATCH(COLUMN_NAME, INPUT, BUILDER_TYPE, ARROW_TYPE, OUTPUT) \
- { \
- arrow::BUILDER_TYPE builder; \
- UNIT_ASSERT_EQUAL(builder.AppendValues(INPUT), arrow::Status::OK()); \
- std::shared_ptr<arrow::Array> columnData; \
- UNIT_ASSERT_EQUAL(builder.Finish(&columnData), arrow::Status::OK()); \
- auto field = arrow::field(COLUMN_NAME, ARROW_TYPE()); \
- auto schema = arrow::schema({field}); \
- OUTPUT = arrow::RecordBatch::Make(schema, columnData->length(), {columnData}); \
+ NApi::TDataSourceInstance MakeDataSourceInstance(EProviderType providerType) {
+ switch (providerType) {
+ case EProviderType::PostgreSQL:
+ return TConnectorClientMock::TPostgreSQLDataSourceInstanceBuilder<>().GetResult();
+ case EProviderType::ClickHouse:
+ return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult();
+ }
}
-#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \
- { \
- for (const auto& val : INPUT) { \
- UNIT_ASSERT(RESULT_SET.TryNextRow()); \
- UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(0).GETTER(), val); \
- } \
+ void CreateExternalDataSource(EProviderType providerType, const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr) {
+ switch (providerType) {
+ case EProviderType::PostgreSQL:
+ return CreatePostgreSQLExternalDataSource(kikimr);
+ case EProviderType::ClickHouse:
+ return CreateClickHouseExternalDataSource(kikimr);
+ }
}
Y_UNIT_TEST_SUITE(GenericFederatedQuery) {
- Y_UNIT_TEST(PostgreSQLLocal) {
+ void TestSelectAllFields(EProviderType providerType) {
// prepare mock
auto clientMock = std::make_shared<TConnectorClientMock>();
- IClient::TPtr client = clientMock;
-
- // prepare common fields
- const TString host = "localhost";
- const int port = 5432;
- const TString databaseName = "dqrun";
- const TString login = "crab";
- const TString password = "qwerty12345";
- const TString tableName = "example_1";
- const bool useTls = true;
- const TString protocol = "NATIVE";
- const TString sourceType = "PostgreSQL";
-
- NApi::TDataSourceInstance dataSourceInstance;
- dataSourceInstance.set_database(databaseName);
- dataSourceInstance.mutable_credentials()->mutable_basic()->set_username(login);
- dataSourceInstance.mutable_credentials()->mutable_basic()->set_password(password);
- dataSourceInstance.mutable_endpoint()->set_host(host);
- dataSourceInstance.mutable_endpoint()->set_port(port);
- dataSourceInstance.set_use_tls(useTls);
- dataSourceInstance.set_kind(NYql::NConnector::NApi::EDataSourceKind::POSTGRESQL);
- dataSourceInstance.set_protocol(NYql::NConnector::NApi::EProtocol::NATIVE);
- // step 1: DescribeTable
- NApi::TDescribeTableRequest describeTableRequest;
- describeTableRequest.set_table(tableName);
- describeTableRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance);
-
- TDescribeTableResult::TPtr describeTableResult = std::make_shared<TDescribeTableResult>();
- describeTableResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
- auto col1 = describeTableResult->Schema.add_columns();
- col1->set_name("col1");
- col1->mutable_type()->set_type_id(Ydb::Type::UINT16);
+ const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
- EXPECT_CALL(*clientMock, DescribeTable(ProtobufRequestMatcher(describeTableRequest))).WillOnce(Return(describeTableResult));
+ // step 1: DescribeTable
+ clientMock->ExpectDescribeTable()
+ .DataSourceInstance(dataSourceInstance)
+ .Response()
+ .Column("col1", Ydb::Type::UINT16);
// step 2: ListSplits
- NApi::TListSplitsRequest listSplitsRequest;
-
- auto select = listSplitsRequest.add_selects();
- select->mutable_from()->set_table(tableName);
- select->mutable_data_source_instance()->CopyFrom(dataSourceInstance);
- auto item = select->mutable_what() -> add_items();
- item->mutable_column()->CopyFrom(*col1);
-
- TListSplitsResult::TPtr listSplitsResult = std::make_shared<TListSplitsResult>();
- listSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
- NApi::TSplit split;
- split.mutable_select()->CopyFrom(*select);
- split.set_description("some binary description");
- listSplitsResult->Splits.emplace_back(std::move(split));
-
- EXPECT_CALL(*clientMock, ListSplits(ProtobufRequestMatcher(listSplitsRequest))).WillOnce(Return(listSplitsResult));
+ clientMock->ExpectListSplits()
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ .Column("col1", Ydb::Type::UINT16)
+ .Done()
+ .Done()
+ .Response()
+ .Split()
+ .Description("some binary description")
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ .Column("col1", Ydb::Type::UINT16);
// step 3: ReadSplits
- NApi::TReadSplitsRequest readSplitsRequest;
- readSplitsRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance);
- readSplitsRequest.add_splits()->CopyFrom(listSplitsResult->Splits[0]);
- readSplitsRequest.set_format(NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
-
- TReadSplitsResult::TPtr readSplitsResult = std::make_shared<TReadSplitsResult>();
- readSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
- readSplitsResult->RecordBatches.push_back({});
std::vector<ui16> colData = {10, 20, 30, 40, 50};
+ clientMock->ExpectReadSplits()
+ .DataSourceInstance(dataSourceInstance)
+ .Split()
+ .Description("some binary description")
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ .Column("col1", Ydb::Type::UINT16)
+ .Done()
+ .Done()
+ .Done()
+ .Response()
+ .RecordBatch(MakeRecordBatch<arrow::UInt16Builder>("col1", colData, arrow::uint16()));
- PREPARE_RECORD_BATCH(col1->name(), colData, UInt16Builder, arrow::uint16, readSplitsResult->RecordBatches[0]);
-
- EXPECT_CALL(*clientMock, ReadSplits(ProtobufRequestMatcher(readSplitsRequest))).WillOnce(Return(readSplitsResult));
+ // prepare database resolver mock
+ std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock;
+ if (providerType == EProviderType::ClickHouse) {
+ databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>();
+ databaseAsyncResolverMock->AddClickHouseCluster();
+ }
// run test
- auto kikimr = MakeKikimrRunner(nullptr, client);
+ auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock);
- auto tc = kikimr->GetTableClient();
- auto session = tc.CreateSession().GetValueSync().GetSession();
- const TString query1 = fmt::format(
- R"(
- CREATE OBJECT pg_local_password (TYPE SECRET) WITH (value={password});
-
- CREATE EXTERNAL DATA SOURCE pg_local WITH (
- SOURCE_TYPE="{source_type}",
- LOCATION="{host}:{port}",
- AUTH_METHOD="BASIC",
- LOGIN="{login}",
- PASSWORD_SECRET_NAME="pg_local_password",
- USE_TLS="{use_tls}",
- PROTOCOL="{protocol}"
- );
- )",
- "host"_a = host,
- "port"_a = port,
- "password"_a = password,
- "login"_a = login,
- "use_tls"_a = useTls ? "TRUE" : "FALSE",
- "protocol"_a = protocol,
- "source_type"_a = sourceType);
- auto result = session.ExecuteSchemeQuery(query1).GetValueSync();
- UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
-
- const TString query2 = fmt::format(
+ CreateExternalDataSource(providerType, kikimr);
+
+ const TString query = fmt::format(
R"(
- SELECT * FROM pg_local.`{database_name}.{table_name}`;
+ SELECT * FROM {data_source_name}.`{database_name}.{table_name}`;
)",
- "database_name"_a = databaseName,
- "table_name"_a = tableName);
+ "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
+ "database_name"_a = DEFAULT_DATABASE,
+ "table_name"_a = DEFAULT_TABLE);
auto db = kikimr->GetQueryClient();
- auto scriptExecutionOperation = db.ExecuteScript(query2).ExtractValueSync();
+ auto scriptExecutionOperation = db.ExecuteScript(query).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
@@ -189,159 +135,182 @@ namespace NKikimr::NKqp {
MATCH_RESULT_WITH_INPUT(colData, resultSet, GetUint16);
}
+ Y_UNIT_TEST(PostgreSQLLocal) {
+ TestSelectAllFields(EProviderType::PostgreSQL);
+ }
+
Y_UNIT_TEST(ClickHouseManaged) {
- // prepare connector client mock
- auto connectorClientMock = std::make_shared<TConnectorClientMock>();
- IClient::TPtr connectorClient = connectorClientMock;
-
- // prepare common fields
- const TString clusterId = "ch-managed";
- const TString hostName = "rc1a-d6dv17lv47v5mcop";
- const TString hostFqdn = hostName + ".db.yandex.net";
- const int port = 8443;
- const TString endpoint = TStringBuilder() << hostFqdn << ":" << ToString(port);
- const TString databaseName = "dqrun";
- const TString login = "crab";
- const TString password = "qwerty12345";
- const TString tableName = "example_1";
- const bool useTls = true;
- const auto protocol = NYql::NConnector::NApi::EProtocol::HTTP;
- const auto sourceType = NYql::NConnector::EExternalDataSource::ClickHouse;
- const auto dataSourceKind = NYql::NConnector::NApi::EDataSourceKind::CLICKHOUSE;
- const TString serviceAccountId = "sa";
- const TString serviceAccountIdSignature = "sa_signature";
-
- NApi::TDataSourceInstance dataSourceInstance;
- dataSourceInstance.set_database(databaseName);
- dataSourceInstance.mutable_credentials()->mutable_basic()->set_username(login);
- dataSourceInstance.mutable_credentials()->mutable_basic()->set_password(password);
- dataSourceInstance.mutable_endpoint()->set_host(hostFqdn);
- dataSourceInstance.mutable_endpoint()->set_port(port);
- dataSourceInstance.set_use_tls(useTls);
- dataSourceInstance.set_kind(dataSourceKind);
- dataSourceInstance.set_protocol(protocol);
+ TestSelectAllFields(EProviderType::ClickHouse);
+ }
- // step 1: DescribeTable
- NApi::TDescribeTableRequest describeTableRequest;
- describeTableRequest.set_table(tableName);
- describeTableRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance);
+ void TestSelectConstant(EProviderType providerType) {
+ // prepare mock
+ auto clientMock = std::make_shared<TConnectorClientMock>();
+
+ const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
- TDescribeTableResult::TPtr describeTableResult = std::make_shared<TDescribeTableResult>();
- describeTableResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
- auto col1 = describeTableResult->Schema.add_columns();
- col1->set_name("col1");
- col1->mutable_type()->set_type_id(Ydb::Type::UINT16);
+ constexpr size_t ROWS_COUNT = 5;
- EXPECT_CALL(*connectorClientMock, DescribeTable(ProtobufRequestMatcher(describeTableRequest))).WillOnce(Return(describeTableResult));
+ // step 1: DescribeTable
+ clientMock->ExpectDescribeTable()
+ .DataSourceInstance(dataSourceInstance)
+ .Response()
+ .Column("col1", Ydb::Type::UINT16)
+ .Column("col2", Ydb::Type::DOUBLE);
// step 2: ListSplits
- NApi::TListSplitsRequest listSplitsRequest;
+ clientMock->ExpectListSplits()
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ // Empty
+ .Done()
+ .Done()
+ .Response()
+ .Split()
+ .Description("some binary description")
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What();
- auto select = listSplitsRequest.add_selects();
- select->mutable_from()->set_table(tableName);
- select->mutable_data_source_instance()->CopyFrom(dataSourceInstance);
- auto item = select->mutable_what() -> add_items();
- item->mutable_column()->CopyFrom(*col1);
+ // step 3: ReadSplits
+ clientMock->ExpectReadSplits()
+ .DataSourceInstance(dataSourceInstance)
+ .Split()
+ .Description("some binary description")
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ .Done()
+ .Done()
+ .Done()
+ .Response()
+ .RecordBatch(MakeEmptyRecordBatch(ROWS_COUNT));
- TListSplitsResult::TPtr listSplitsResult = std::make_shared<TListSplitsResult>();
- listSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
- NApi::TSplit split;
- split.mutable_select()->CopyFrom(*select);
- split.set_description("some binary description");
- listSplitsResult->Splits.emplace_back(std::move(split));
+ // prepare database resolver mock
+ std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock;
+ if (providerType == EProviderType::ClickHouse) {
+ databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>();
+ databaseAsyncResolverMock->AddClickHouseCluster();
+ }
- EXPECT_CALL(*connectorClientMock, ListSplits(ProtobufRequestMatcher(listSplitsRequest))).WillOnce(Return(listSplitsResult));
+ // run test
+ auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock);
- // step 3: ReadSplits
- NApi::TReadSplitsRequest readSplitsRequest;
- readSplitsRequest.mutable_data_source_instance()->CopyFrom(dataSourceInstance);
- readSplitsRequest.add_splits()->CopyFrom(listSplitsResult->Splits[0]);
- readSplitsRequest.set_format(NApi::TReadSplitsRequest_EFormat::TReadSplitsRequest_EFormat_ARROW_IPC_STREAMING);
-
- TReadSplitsResult::TPtr readSplitsResult = std::make_shared<TReadSplitsResult>();
- readSplitsResult->Error.set_status(::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
- readSplitsResult->RecordBatches.push_back({});
- std::vector<ui16> colData = {10, 20, 30, 40, 50};
+ CreateExternalDataSource(providerType, kikimr);
- PREPARE_RECORD_BATCH(col1->name(), colData, UInt16Builder, arrow::uint16, readSplitsResult->RecordBatches[0]);
+ const TString query = fmt::format(
+ R"(
+ SELECT 42 FROM {data_source_name}.`{database_name}.{table_name}`;
+ )",
+ "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
+ "database_name"_a = DEFAULT_DATABASE,
+ "table_name"_a = DEFAULT_TABLE);
- EXPECT_CALL(*connectorClientMock, ReadSplits(ProtobufRequestMatcher(readSplitsRequest))).WillOnce(Return(readSplitsResult));
+ auto db = kikimr->GetQueryClient();
+ auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());
- // prepare database resolver mock
- auto databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>();
- NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver = databaseAsyncResolverMock;
+ TResultSetParser resultSet(queryResult.GetResultSetParser(0));
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), ROWS_COUNT);
- NYql::IDatabaseAsyncResolver::TDatabaseAuthMap dbResolverReq;
- dbResolverReq[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] =
- NYql::TDatabaseAuth{
- NYql::TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, serviceAccountIdSignature).ToJson(),
- true,
- true};
+ // check every row
+ std::vector<i32> constants(ROWS_COUNT, 42);
+ MATCH_RESULT_WITH_INPUT(constants, resultSet, GetInt32);
+ }
+
+ Y_UNIT_TEST(PostgreSQLSelectConstant) {
+ TestSelectConstant(EProviderType::PostgreSQL);
+ }
- NYql::TDatabaseResolverResponse::TDatabaseEndpointsMap databaseEndpointsMap;
- databaseEndpointsMap[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] =
- NYql::TDatabaseResolverResponse::TEndpoint{endpoint, clusterId};
- auto dbResolverPromise = NThreading::NewPromise<NYql::TDatabaseResolverResponse>();
- dbResolverPromise.SetValue(NYql::TDatabaseResolverResponse(std::move(databaseEndpointsMap), true));
+ Y_UNIT_TEST(ClickHouseManagedSelectConstant) {
+ TestSelectConstant(EProviderType::ClickHouse);
+ }
- EXPECT_CALL(*databaseAsyncResolverMock, ResolveIds(DatabaseAuthMapMatcher(dbResolverReq))).WillOnce(Return(dbResolverPromise.GetFuture()));
+ void TestSelectCount(EProviderType providerType) {
+ // prepare mock
+ auto clientMock = std::make_shared<TConnectorClientMock>();
+
+ const NApi::TDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
+
+ constexpr size_t ROWS_COUNT = 5;
+
+ // step 1: DescribeTable
+ clientMock->ExpectDescribeTable()
+ .DataSourceInstance(dataSourceInstance)
+ .Response()
+ .Column("col1", Ydb::Type::UINT16)
+ .Column("col2", Ydb::Type::DOUBLE);
+
+ // step 2: ListSplits
+ clientMock->ExpectListSplits()
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ // Empty
+ .Done()
+ .Done()
+ .Response()
+ .Split()
+ .Description("some binary description")
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What();
+
+ // step 3: ReadSplits
+ clientMock->ExpectReadSplits()
+ .DataSourceInstance(dataSourceInstance)
+ .Split()
+ .Description("some binary description")
+ .Select()
+ .DataSourceInstance(dataSourceInstance)
+ .What()
+ .Done()
+ .Done()
+ .Done()
+ .Response()
+ .RecordBatch(MakeEmptyRecordBatch(ROWS_COUNT));
+
+ // prepare database resolver mock
+ std::shared_ptr<TDatabaseAsyncResolverMock> databaseAsyncResolverMock;
+ if (providerType == EProviderType::ClickHouse) {
+ databaseAsyncResolverMock = std::make_shared<TDatabaseAsyncResolverMock>();
+ databaseAsyncResolverMock->AddClickHouseCluster();
+ }
// run test
- auto kikimr = MakeKikimrRunner(nullptr, connectorClient, databaseAsyncResolver);
+ auto kikimr = MakeKikimrRunner(nullptr, clientMock, databaseAsyncResolverMock);
- auto tc = kikimr->GetTableClient();
- auto session = tc.CreateSession().GetValueSync().GetSession();
- const TString query1 = fmt::format(
- R"(
- CREATE OBJECT sa_signature (TYPE SECRET) WITH (value=sa_signature);
- CREATE OBJECT ch_managed_password (TYPE SECRET) WITH (value={password});
-
- CREATE EXTERNAL DATA SOURCE ch_managed WITH (
- SOURCE_TYPE="{source_type}",
- MDB_CLUSTER_ID="{cluster_id}",
- AUTH_METHOD="MDB_BASIC",
- SERVICE_ACCOUNT_ID="{service_account_id}",
- SERVICE_ACCOUNT_SECRET_NAME="{service_account_id_signature}",
- LOGIN="{login}",
- PASSWORD_SECRET_NAME="ch_managed_password",
- USE_TLS="{use_tls}",
- PROTOCOL="{protocol}"
- );
- )",
- "cluster_id"_a = clusterId,
- "password"_a = password,
- "login"_a = login,
- "use_tls"_a = useTls ? "TRUE" : "FALSE",
- "protocol"_a = NYql::NConnector::NApi::EProtocol_Name(protocol),
- "service_account_id"_a = serviceAccountId,
- "service_account_id_signature"_a = serviceAccountIdSignature,
- "source_type"_a = ToString(sourceType));
- auto result = session.ExecuteSchemeQuery(query1).GetValueSync();
- UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
-
- const TString query2 = fmt::format(
+ CreateExternalDataSource(providerType, kikimr);
+
+ const TString query = fmt::format(
R"(
- SELECT * FROM ch_managed.`{database_name}.{table_name}`;
+ SELECT COUNT(*) FROM {data_source_name}.`{database_name}.{table_name}`;
)",
- "database_name"_a = databaseName,
- "table_name"_a = tableName);
+ "data_source_name"_a = DEFAULT_DATA_SOURCE_NAME,
+ "database_name"_a = DEFAULT_DATABASE,
+ "table_name"_a = DEFAULT_TABLE);
auto db = kikimr->GetQueryClient();
- auto scriptExecutionOperation = db.ExecuteScript(query2).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
- UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
-
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
- UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
- UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+ auto queryResult = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), TExecuteQuerySettings()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), EStatus::SUCCESS, queryResult.GetIssues().ToString());
- TResultSetParser resultSet(results.ExtractResultSet());
+ TResultSetParser resultSet(queryResult.GetResultSetParser(0));
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), colData.size());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
// check every row
- MATCH_RESULT_WITH_INPUT(colData, resultSet, GetUint16);
+ std::vector<ui64> result = { ROWS_COUNT };
+ MATCH_RESULT_WITH_INPUT(result, resultSet, GetUint64);
+ }
+
+ Y_UNIT_TEST(PostgreSQLSelectCount) {
+ TestSelectCount(EProviderType::PostgreSQL);
+ }
+
+ Y_UNIT_TEST(ClickHouseSelectCount) {
+ TestSelectCount(EProviderType::ClickHouse);
}
}
}
diff --git a/ydb/core/kqp/ut/federated_query/generic/ya.make b/ydb/core/kqp/ut/federated_query/generic/ya.make
index 6b90a9d5dd..8ea901c175 100644
--- a/ydb/core/kqp/ut/federated_query/generic/ya.make
+++ b/ydb/core/kqp/ut/federated_query/generic/ya.make
@@ -1,5 +1,7 @@
UNITTEST_FOR(ydb/core/kqp)
+FORK_SUBTESTS()
+
SRCS(
kqp_generic_provider_ut.cpp
)
@@ -9,6 +11,7 @@ PEERDIR(
contrib/libs/fmt
ydb/core/kqp/ut/common
ydb/core/kqp/ut/federated_query/common
+ ydb/library/yql/providers/generic/connector/libcpp/ut_helpers
ydb/library/yql/sql/pg_dummy
)
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 5bfbe4a47e..0e1bbe3a53 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -742,7 +742,14 @@ namespace Tests {
Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableMetadataProvider()) {
- auto* actor = NMetadata::NProvider::CreateService(NMetadata::NProvider::TConfig());
+ NKikimrConfig::TMetadataProviderConfig cfgProto;
+ cfgProto.SetRefreshPeriodSeconds(1);
+ cfgProto.SetEnabled(true);
+ cfgProto.MutableRequestConfig()->SetRetryPeriodStartSeconds(1);
+ cfgProto.MutableRequestConfig()->SetRetryPeriodFinishSeconds(30);
+ NMetadata::NProvider::TConfig cfg;
+ cfg.DeserializeFromProto(cfgProto);
+ auto* actor = NMetadata::NProvider::CreateService(cfg);
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NMetadata::NProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
index 772ba72f63..cd4e747fcb 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
@@ -5,6 +5,7 @@ import (
"fmt"
"strings"
+ "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb/library/go/core/log"
api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
@@ -113,21 +114,37 @@ func (h *handlerImpl[CONN]) ReadSplit(
return fmt.Errorf("convert Select.What.Items to Ydb.Columns: %w", err)
}
- for i, column := range columns {
- sb.WriteString(column.GetName())
-
- if i != len(columns)-1 {
- sb.WriteString(", ")
- }
+ // for the case of empty column set select some constant for constructing a valid sql statement
+ if len(columns) == 0 {
+ sb.WriteString("0")
var acceptor any
- acceptor, err = h.typeMapper.YDBTypeToAcceptor(column.GetType())
+ ydbType := Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}}
+ acceptor, err = h.typeMapper.YDBTypeToAcceptor(&ydbType)
+
if err != nil {
return fmt.Errorf("map ydb column to acceptor: %w", err)
}
acceptors = append(acceptors, acceptor)
+ } else {
+ for i, column := range columns {
+ sb.WriteString(column.GetName())
+
+ if i != len(columns)-1 {
+ sb.WriteString(", ")
+ }
+
+ var acceptor any
+
+ acceptor, err = h.typeMapper.YDBTypeToAcceptor(column.GetType())
+ if err != nil {
+ return fmt.Errorf("map ydb column to acceptor: %w", err)
+ }
+
+ acceptors = append(acceptors, acceptor)
+ }
}
// SELECT $columns FROM $from
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go
index 0db0782a43..651d381383 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go
@@ -82,3 +82,59 @@ func (cb *columnarBufferArrowIPCStreaming) Release() {
b.Release()
}
}
+
+// special implementation for buffer that writes schema with empty columns set
+type columnarBufferArrowIPCStreamingEmptyColumns struct {
+ arrowAllocator memory.Allocator
+ readLimiter ReadLimiter
+ schema *arrow.Schema
+ typeMapper TypeMapper
+ rowsAdded int64
+}
+
+// AddRow saves a row obtained from the datasource into the buffer
+func (cb *columnarBufferArrowIPCStreamingEmptyColumns) AddRow(acceptors []any) error {
+ if len(acceptors) != 1 {
+ return fmt.Errorf("expected 1 rows acceptor, got %v", len(acceptors))
+ }
+
+ if err := cb.readLimiter.AddRow(); err != nil {
+ return fmt.Errorf("check read limiter: %w", err)
+ }
+
+ cb.rowsAdded++
+
+ return nil
+}
+
+// ToResponse returns all the accumulated data and clears buffer
+func (cb *columnarBufferArrowIPCStreamingEmptyColumns) ToResponse() (*api_service_protos.TReadSplitsResponse, error) {
+ columns := make([]arrow.Array, 0)
+
+ record := array.NewRecord(cb.schema, columns, cb.rowsAdded)
+
+ // prepare arrow writer
+ var buf bytes.Buffer
+
+ writer := ipc.NewWriter(&buf, ipc.WithSchema(cb.schema), ipc.WithAllocator(cb.arrowAllocator))
+
+ if err := writer.Write(record); err != nil {
+ return nil, fmt.Errorf("write record: %w", err)
+ }
+
+ if err := writer.Close(); err != nil {
+ return nil, fmt.Errorf("close arrow writer: %w", err)
+ }
+
+ out := &api_service_protos.TReadSplitsResponse{
+ Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
+ ArrowIpcStreaming: buf.Bytes(),
+ },
+ }
+
+ return out, nil
+}
+
+// Frees resources if buffer is no longer used
+func (cb *columnarBufferArrowIPCStreamingEmptyColumns) Release() {
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go
index 5d6b683940..460387701f 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go
@@ -40,6 +40,16 @@ func (cbf *ColumnarBufferFactory) MakeBuffer(
return nil, fmt.Errorf("convert Select.What to Ydb.Types: %w", err)
}
+ if len(ydbTypes) == 0 {
+ return &columnarBufferArrowIPCStreamingEmptyColumns{
+ arrowAllocator: cbf.arrowAllocator,
+ schema: schema,
+ readLimiter: cbf.readLimiterFactory.MakeReadLimiter(logger),
+ typeMapper: typeMapper,
+ rowsAdded: 0,
+ }, nil
+ }
+
return &columnarBufferArrowIPCStreaming{
arrowAllocator: cbf.arrowAllocator,
schema: schema,
diff --git a/ydb/library/yql/providers/generic/connector/app/server/validate.go b/ydb/library/yql/providers/generic/connector/app/server/validate.go
index 486d11ea0e..ac9b5df2df 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/validate.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/validate.go
@@ -52,10 +52,6 @@ func validateSelect(logger log.Logger, slct *api_service_protos.TSelect) error {
return fmt.Errorf("validate data source instance: %w", err)
}
- if len(slct.GetWhat().GetItems()) == 0 {
- return fmt.Errorf("empty items: %w", utils.ErrInvalidRequest)
- }
-
return nil
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt
index 3e40d9e467..82847e3680 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.darwin-x86_64.txt
@@ -7,6 +7,7 @@
add_subdirectory(cli)
+add_subdirectory(ut_helpers)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt
index cc8e33fa78..822344f764 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-aarch64.txt
@@ -7,6 +7,7 @@
add_subdirectory(cli)
+add_subdirectory(ut_helpers)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt
index cc8e33fa78..822344f764 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.linux-x86_64.txt
@@ -7,6 +7,7 @@
add_subdirectory(cli)
+add_subdirectory(ut_helpers)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt
index 3e40d9e467..82847e3680 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/generic/connector/libcpp/CMakeLists.windows-x86_64.txt
@@ -7,6 +7,7 @@
add_subdirectory(cli)
+add_subdirectory(ut_helpers)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..de9202ff92
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(connector-libcpp-ut_helpers)
+target_compile_options(connector-libcpp-ut_helpers PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(connector-libcpp-ut_helpers PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ cpp-testing-gmock_in_unittest
+ cpp-testing-unittest
+ kqp-ut-common
+ providers-common-db_id_async_resolver
+ providers-common-structured_token
+ connector-api-common
+ generic-connector-libcpp
+)
+target_sources(connector-libcpp-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp
+)
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..7894a2a5b6
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(connector-libcpp-ut_helpers)
+target_compile_options(connector-libcpp-ut_helpers PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(connector-libcpp-ut_helpers PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ cpp-testing-gmock_in_unittest
+ cpp-testing-unittest
+ kqp-ut-common
+ providers-common-db_id_async_resolver
+ providers-common-structured_token
+ connector-api-common
+ generic-connector-libcpp
+)
+target_sources(connector-libcpp-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp
+)
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..7894a2a5b6
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(connector-libcpp-ut_helpers)
+target_compile_options(connector-libcpp-ut_helpers PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(connector-libcpp-ut_helpers PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ cpp-testing-gmock_in_unittest
+ cpp-testing-unittest
+ kqp-ut-common
+ providers-common-db_id_async_resolver
+ providers-common-structured_token
+ connector-api-common
+ generic-connector-libcpp
+)
+target_sources(connector-libcpp-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp
+)
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..de9202ff92
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(connector-libcpp-ut_helpers)
+target_compile_options(connector-libcpp-ut_helpers PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(connector-libcpp-ut_helpers PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-fmt
+ cpp-testing-gmock_in_unittest
+ cpp-testing-unittest
+ kqp-ut-common
+ providers-common-db_id_async_resolver
+ providers-common-structured_token
+ connector-api-common
+ generic-connector-libcpp
+)
+target_sources(connector-libcpp-ut_helpers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp
+)
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
new file mode 100644
index 0000000000..2fa4b253a0
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp
@@ -0,0 +1,98 @@
+#include "connector_client_mock.h"
+
+#include <fmt/format.h>
+
+namespace NYql::NConnector::NTest {
+
+ using namespace fmt::literals;
+
+ void CreatePostgreSQLExternalDataSource(
+ const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
+ const TString& dataSourceName,
+ NApi::EProtocol protocol,
+ const TString& host,
+ int port,
+ const TString& login,
+ const TString& password,
+ bool useTls)
+ {
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(
+ R"(
+ CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password});
+
+ CREATE EXTERNAL DATA SOURCE {data_source_name} WITH (
+ SOURCE_TYPE="{source_type}",
+ LOCATION="{host}:{port}",
+ AUTH_METHOD="BASIC",
+ LOGIN="{login}",
+ PASSWORD_SECRET_NAME="{data_source_name}_password",
+ USE_TLS="{use_tls}",
+ PROTOCOL="{protocol}"
+ );
+ )",
+ "data_source_name"_a = dataSourceName,
+ "host"_a = host,
+ "port"_a = port,
+ "login"_a = login,
+ "password"_a = password,
+ "use_tls"_a = useTls ? "TRUE" : "FALSE",
+ "protocol"_a = NApi::EProtocol_Name(protocol),
+ "source_type"_a = PG_SOURCE_TYPE);
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ void CreateClickHouseExternalDataSource(
+ const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
+ const TString& dataSourceName,
+ NApi::EProtocol protocol,
+ const TString& clickHouseClusterId,
+ const TString& login,
+ const TString& password,
+ bool useTls,
+ const TString& serviceAccountId,
+ const TString& serviceAccountIdSignature,
+ NYql::NConnector::EExternalDataSource sourceType)
+ {
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(
+ R"(
+ CREATE OBJECT sa_signature (TYPE SECRET) WITH (value=sa_signature);
+ CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password});
+
+ CREATE EXTERNAL DATA SOURCE {data_source_name} WITH (
+ SOURCE_TYPE="{source_type}",
+ MDB_CLUSTER_ID="{cluster_id}",
+ AUTH_METHOD="MDB_BASIC",
+ SERVICE_ACCOUNT_ID="{service_account_id}",
+ SERVICE_ACCOUNT_SECRET_NAME="{service_account_id_signature}",
+ LOGIN="{login}",
+ PASSWORD_SECRET_NAME="{data_source_name}_password",
+ USE_TLS="{use_tls}",
+ PROTOCOL="{protocol}"
+ );
+ )",
+ "cluster_id"_a = clickHouseClusterId,
+ "data_source_name"_a = dataSourceName,
+ "login"_a = login,
+ "password"_a = password,
+ "use_tls"_a = useTls ? "TRUE" : "FALSE",
+ "protocol"_a = NYql::NConnector::NApi::EProtocol_Name(protocol),
+ "service_account_id"_a = serviceAccountId,
+ "service_account_id_signature"_a = serviceAccountIdSignature,
+ "source_type"_a = ToString(sourceType));
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount) {
+ return arrow::RecordBatch::Make(
+ std::make_shared<arrow::Schema>(arrow::FieldVector()),
+ static_cast<i64>(rowsCount),
+ std::vector<std::shared_ptr<arrow::Array>>());
+ }
+
+} // namespace NYql::NConnector::NTest
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
new file mode 100644
index 0000000000..58fb24caf4
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h
@@ -0,0 +1,502 @@
+#pragma once
+#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h>
+
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
+
+#include <library/cpp/testing/gmock_in_unittest/gmock.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/string.h>
+
+#include <arrow/api.h>
+
+#include <google/protobuf/util/message_differencer.h>
+
+#include <memory>
+
+namespace NYql::NConnector::NTest {
+ using namespace testing;
+
+#define EXPR_SETTER(name, set_expr) \
+ TBuilder& name(const auto& value) { \
+ this->Result_->set_expr(value); \
+ return static_cast<TBuilder&>(*this); \
+ }
+
+#define SETTER(name, protoName) EXPR_SETTER(name, Y_CAT(set_, protoName))
+
+#define SUBPROTO_BUILDER(name, fieldExpr, protoType, builderType) \
+ builderType name() { \
+ return builderType(this->Result_->fieldExpr(), static_cast<TBuilder*>(this)); \
+ } \
+ TBuilder& name(const protoType& proto) { \
+ this->Result_->fieldExpr()->CopyFrom(proto); \
+ return static_cast<TBuilder&>(*this); \
+ }
+
+#define DATA_SOURCE_INSTANCE_SUBBUILDER() \
+ TBuilder& DataSourceInstance(const NApi::TDataSourceInstance& proto) { \
+ this->Result_->mutable_data_source_instance()->CopyFrom(proto); \
+ return static_cast<TBuilder&>(*this); \
+ } \
+ TPostgreSQLDataSourceInstanceBuilder<TBuilder> PostgreSQLDataSourceInstance() { \
+ return TPostgreSQLDataSourceInstanceBuilder<TBuilder>( \
+ this->Result_->mutable_data_source_instance(), \
+ static_cast<TBuilder*>(this)); \
+ } \
+ TClickHouseDataSourceInstanceBuilder<TBuilder> ClickHouseDataSourceInstance() { \
+ return TClickHouseDataSourceInstanceBuilder<TBuilder>( \
+ this->Result_->mutable_data_source_instance(), \
+ static_cast<TBuilder*>(this)); \
+ }
+
+ MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") {
+ return google::protobuf::util::MessageDifferencer::Equals(arg, expected);
+ }
+
+#define MATCH_RESULT_WITH_INPUT(INPUT, RESULT_SET, GETTER) \
+ { \
+ for (const auto& val : INPUT) { \
+ UNIT_ASSERT(RESULT_SET.TryNextRow()); \
+ UNIT_ASSERT_VALUES_EQUAL(RESULT_SET.ColumnParser(0).GETTER(), val); \
+ } \
+ }
+
+ template <class TArrowBuilderType, class TColDataType>
+ std::shared_ptr<arrow::RecordBatch> MakeRecordBatch(
+ const TString& columnName,
+ const std::vector<TColDataType>& input,
+ std::shared_ptr<arrow::DataType> dataType) {
+ TArrowBuilderType builder;
+ UNIT_ASSERT_EQUAL(builder.AppendValues(input), arrow::Status::OK());
+ std::shared_ptr<arrow::Array> columnData;
+ UNIT_ASSERT_EQUAL(builder.Finish(&columnData), arrow::Status::OK());
+ auto field = arrow::field(columnName, std::move(dataType));
+ auto schema = arrow::schema({field});
+ return arrow::RecordBatch::Make(schema, columnData->length(), {columnData});
+ }
+
+ // Make record batch with schema with no columns
+ std::shared_ptr<arrow::RecordBatch> MakeEmptyRecordBatch(size_t rowsCount);
+
+ template <class TParent>
+ struct TWithParentBuilder {
+ explicit TWithParentBuilder(TParent* parent)
+ : Parent_(parent)
+ {
+ }
+
+ TParent& Done() {
+ Y_ABORT_UNLESS(Parent_); // Use only with parent builder
+ return *Parent_;
+ }
+
+ protected:
+ TParent* Parent_ = nullptr;
+ };
+
+ // No parent
+ template <>
+ struct TWithParentBuilder<void> {
+ explicit TWithParentBuilder(void*)
+ {
+ }
+ };
+
+ template <class TParent, class TResultPtrType>
+ struct TResponseBuilder: public TWithParentBuilder<TParent> {
+ explicit TResponseBuilder(TResultPtrType result = std::make_shared<typename TResultPtrType::element_type>(), TParent* parent = nullptr)
+ : TWithParentBuilder<TParent>(parent)
+ , Result_(std::move(result))
+ {
+ }
+
+ TResultPtrType GetResult() {
+ return Result_;
+ }
+
+ protected:
+ TResultPtrType Result_;
+ };
+
+ template <class TParent, class TProto>
+ struct TProtoBuilder: public TWithParentBuilder<TParent> {
+ explicit TProtoBuilder(TProto* result = nullptr, TParent* parent = nullptr)
+ : TWithParentBuilder<TParent>(parent)
+ , Result_(result)
+ {
+ if (!Result_) {
+ Result_ = &MaybeResult_.ConstructInPlace();
+ }
+ }
+
+ TProto GetResult() {
+ return *Result_;
+ }
+
+ protected:
+ TProto* Result_ = nullptr;
+ TMaybe<TProto> MaybeResult_;
+ };
+
+ void CreatePostgreSQLExternalDataSource(
+ const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
+ const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME,
+ NApi::EProtocol protocol = DEFAULT_PG_PROTOCOL,
+ const TString& host = DEFAULT_PG_HOST,
+ int port = DEFAULT_PG_PORT,
+ const TString& login = DEFAULT_LOGIN,
+ const TString& password = DEFAULT_PASSWORD,
+ bool useTls = DEFAULT_USE_TLS);
+
+ void CreateClickHouseExternalDataSource(
+ const std::shared_ptr<NKikimr::NKqp::TKikimrRunner>& kikimr,
+ const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME,
+ NApi::EProtocol protocol = DEFAULT_CH_PROTOCOL,
+ const TString& clickHouseClusterId = DEFAULT_CH_CLUSTER_ID,
+ const TString& login = DEFAULT_LOGIN,
+ const TString& password = DEFAULT_PASSWORD,
+ bool useTls = DEFAULT_USE_TLS,
+ const TString& serviceAccountId = DEFAULT_CH_SERVICE_ACCOUNT_ID,
+ const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE,
+ NYql::NConnector::EExternalDataSource sourceType = DEFAULT_CH_SOURCE_TYPE);
+
+ class TConnectorClientMock: public NYql::NConnector::IClient {
+ public:
+ MOCK_METHOD(TDescribeTableResult::TPtr, DescribeTable, (const NApi::TDescribeTableRequest& request), (override));
+ MOCK_METHOD(TListSplitsResult::TPtr, ListSplits, (const NApi::TListSplitsRequest& request), (override));
+ MOCK_METHOD(TReadSplitsResult::TPtr, ReadSplits, (const NApi::TReadSplitsRequest& request), (override));
+
+ //
+ // Expectation helpers
+ //
+
+ template <class TDerived, class TParent = void /* no parent by default */>
+ struct TBaseDataSourceInstanceBuilder: public TProtoBuilder<TParent, NApi::TDataSourceInstance> {
+ using TBuilder = TDerived;
+
+ explicit TBaseDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TDataSourceInstance>(result, parent)
+ {
+ }
+
+ SETTER(Database, database);
+ EXPR_SETTER(Login, mutable_credentials()->mutable_basic()->set_username);
+ EXPR_SETTER(Password, mutable_credentials()->mutable_basic()->set_password);
+ EXPR_SETTER(Host, mutable_endpoint()->set_host);
+ EXPR_SETTER(Port, mutable_endpoint()->set_port);
+ SETTER(UseTls, use_tls);
+ SETTER(Kind, kind);
+ SETTER(Protocol, protocol);
+
+ protected:
+ void FillWithDefaults() {
+ Database(DEFAULT_DATABASE);
+ Login(DEFAULT_LOGIN);
+ Password(DEFAULT_PASSWORD);
+ UseTls(DEFAULT_USE_TLS);
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TPostgreSQLDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder<TPostgreSQLDataSourceInstanceBuilder<TParent>, TParent> {
+ using TBase = TBaseDataSourceInstanceBuilder<TPostgreSQLDataSourceInstanceBuilder<TParent>, TParent>;
+
+ explicit TPostgreSQLDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr)
+ : TBase(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ void FillWithDefaults() {
+ TBase::FillWithDefaults();
+ this->Host(DEFAULT_PG_HOST);
+ this->Port(DEFAULT_PG_PORT);
+ this->Kind(NApi::EDataSourceKind::POSTGRESQL);
+ this->Protocol(DEFAULT_PG_PROTOCOL);
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TClickHouseDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder<TClickHouseDataSourceInstanceBuilder<TParent>, TParent> {
+ using TBase = TBaseDataSourceInstanceBuilder<TClickHouseDataSourceInstanceBuilder<TParent>, TParent>;
+
+ explicit TClickHouseDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr)
+ : TBase(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ void FillWithDefaults() {
+ TBase::FillWithDefaults();
+ this->Host(DEFAULT_CH_HOST);
+ this->Port(DEFAULT_CH_PORT);
+ this->Kind(NApi::EDataSourceKind::CLICKHOUSE);
+ this->Protocol(DEFAULT_CH_PROTOCOL);
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TDescribeTableResultBuilder: public TResponseBuilder<TParent, TDescribeTableResult::TPtr> {
+ using TBuilder = TDescribeTableResultBuilder<TParent>;
+
+ explicit TDescribeTableResultBuilder(TDescribeTableResult::TPtr result = std::make_shared<TDescribeTableResult>(), TParent* parent = nullptr)
+ : TResponseBuilder<TParent, TDescribeTableResult::TPtr>(std::move(result), parent)
+ {
+ FillWithDefaults();
+ }
+
+ EXPR_SETTER(Status, Error.set_status);
+
+ // TODO: add nonprimitive types
+ TBuilder& Column(const TString& name, Ydb::Type::PrimitiveTypeId typeId) {
+ auto* col = this->Result_->Schema.add_columns();
+ col->set_name(name);
+ col->mutable_type()->set_type_id(typeId);
+ return *this;
+ }
+
+ void FillWithDefaults() {
+ Status(Ydb::StatusIds::SUCCESS);
+ }
+ };
+
+ struct TDescribeTableExpectationBuilder: public TProtoBuilder<int, NApi::TDescribeTableRequest> {
+ using TBuilder = TDescribeTableExpectationBuilder;
+
+ explicit TDescribeTableExpectationBuilder(NApi::TDescribeTableRequest* result = nullptr, TConnectorClientMock* mock = nullptr)
+ : TProtoBuilder<int, NApi::TDescribeTableRequest>(result)
+ , Mock_(mock)
+ {
+ FillWithDefaults();
+ }
+
+ explicit TDescribeTableExpectationBuilder(TConnectorClientMock* mock)
+ : TDescribeTableExpectationBuilder(nullptr, mock)
+ {
+ }
+
+ ~TDescribeTableExpectationBuilder() {
+ SetExpectation();
+ }
+
+ SETTER(Table, table);
+ DATA_SOURCE_INSTANCE_SUBBUILDER();
+
+ TDescribeTableResultBuilder<TBuilder> Response() {
+ return TDescribeTableResultBuilder<TBuilder>(ResponseResult_, this);
+ }
+
+ void FillWithDefaults() {
+ Table(DEFAULT_TABLE);
+ Response();
+ }
+
+ private:
+ void SetExpectation() {
+ EXPECT_CALL(*Mock_, DescribeTable(ProtobufRequestMatcher(*Result_)))
+ .WillOnce(Return(ResponseResult_));
+ }
+
+ private:
+ TConnectorClientMock* Mock_ = nullptr;
+ TDescribeTableResult::TPtr ResponseResult_ = std::make_shared<TDescribeTableResult>();
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TWhatBuilder: public TProtoBuilder<TParent, NApi::TSelect::TWhat> {
+ using TBuilder = TWhatBuilder<TParent>;
+
+ explicit TWhatBuilder(NApi::TSelect::TWhat* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TSelect::TWhat>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ // TODO: add nonprimitive types
+ TBuilder& Column(const TString& name, Ydb::Type::PrimitiveTypeId typeId) {
+ auto* col = this->Result_->add_items()->mutable_column();
+ col->set_name(name);
+ col->mutable_type()->set_type_id(typeId);
+ return *this;
+ }
+
+ void FillWithDefaults() {
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TSelectBuilder: public TProtoBuilder<TParent, NApi::TSelect> {
+ using TBuilder = TSelectBuilder<TParent>;
+
+ explicit TSelectBuilder(NApi::TSelect* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TSelect>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ EXPR_SETTER(Table, mutable_from()->set_table);
+ DATA_SOURCE_INSTANCE_SUBBUILDER();
+ SUBPROTO_BUILDER(What, mutable_what, NApi::TSelect::TWhat, TWhatBuilder<TBuilder>);
+
+ void FillWithDefaults() {
+ Table(DEFAULT_TABLE);
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TSplitBuilder: public TProtoBuilder<TParent, NApi::TSplit> {
+ using TBuilder = TSplitBuilder<TParent>;
+
+ explicit TSplitBuilder(NApi::TSplit* result = nullptr, TParent* parent = nullptr)
+ : TProtoBuilder<TParent, NApi::TSplit>(result, parent)
+ {
+ FillWithDefaults();
+ }
+
+ SETTER(Description, description);
+ SUBPROTO_BUILDER(Select, mutable_select, NApi::TSelect, TSelectBuilder<TBuilder>);
+
+ void FillWithDefaults() {
+ Select();
+ }
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TListSplitsResultBuilder: public TResponseBuilder<TParent, TListSplitsResult::TPtr> {
+ using TBuilder = TListSplitsResultBuilder<TParent>;
+
+ explicit TListSplitsResultBuilder(TListSplitsResult::TPtr result = std::make_shared<TListSplitsResult>(), TParent* parent = nullptr)
+ : TResponseBuilder<TParent, TListSplitsResult::TPtr>(std::move(result), parent)
+ {
+ FillWithDefaults();
+ }
+
+ EXPR_SETTER(Status, Error.set_status);
+
+ TSplitBuilder<TBuilder> Split() {
+ return TSplitBuilder<TBuilder>(&this->Result_->Splits.emplace_back(), this);
+ }
+
+ void FillWithDefaults() {
+ Status(Ydb::StatusIds::SUCCESS);
+ }
+ };
+
+ struct TListSplitsExpectationBuilder: public TProtoBuilder<int, NApi::TListSplitsRequest> {
+ using TBuilder = TListSplitsExpectationBuilder;
+
+ explicit TListSplitsExpectationBuilder(NApi::TListSplitsRequest* result = nullptr, TConnectorClientMock* mock = nullptr)
+ : TProtoBuilder<int, NApi::TListSplitsRequest>(result)
+ , Mock_(mock)
+
+ {
+ FillWithDefaults();
+ }
+
+ explicit TListSplitsExpectationBuilder(TConnectorClientMock* mock)
+ : TListSplitsExpectationBuilder(nullptr, mock)
+ {
+ }
+
+ ~TListSplitsExpectationBuilder() {
+ SetExpectation();
+ }
+
+ SUBPROTO_BUILDER(Select, add_selects, NApi::TSelect, TSelectBuilder<TBuilder>);
+
+ TListSplitsResultBuilder<TBuilder> Response() {
+ return TListSplitsResultBuilder<TBuilder>(ResponseResult_, this);
+ }
+
+ void FillWithDefaults() {
+ Response();
+ }
+
+ private:
+ void SetExpectation() {
+ EXPECT_CALL(*Mock_, ListSplits(ProtobufRequestMatcher(*Result_)))
+ .WillOnce(Return(ResponseResult_));
+ }
+
+ private:
+ TConnectorClientMock* Mock_ = nullptr;
+ TListSplitsResult::TPtr ResponseResult_ = std::make_shared<TListSplitsResult>();
+ };
+
+ template <class TParent = void /* no parent by default */>
+ struct TReadSplitsResultBuilder: public TResponseBuilder<TParent, TReadSplitsResult::TPtr> {
+ using TBuilder = TReadSplitsResultBuilder<TParent>;
+
+ explicit TReadSplitsResultBuilder(TReadSplitsResult::TPtr result = std::make_shared<TReadSplitsResult>(), TParent* parent = nullptr)
+ : TResponseBuilder<TParent, TReadSplitsResult::TPtr>(std::move(result), parent)
+ {
+ FillWithDefaults();
+ }
+
+ EXPR_SETTER(Status, Error.set_status);
+ EXPR_SETTER(RecordBatch, RecordBatches.push_back);
+
+ void FillWithDefaults() {
+ Status(Ydb::StatusIds::SUCCESS);
+ }
+ };
+
+ struct TReadSplitsExpectationBuilder: public TProtoBuilder<int, NApi::TReadSplitsRequest> {
+ using TBuilder = TReadSplitsExpectationBuilder;
+
+ explicit TReadSplitsExpectationBuilder(NApi::TReadSplitsRequest* result = nullptr, TConnectorClientMock* mock = nullptr)
+ : TProtoBuilder<int, NApi::TReadSplitsRequest>(result)
+ , Mock_(mock)
+
+ {
+ FillWithDefaults();
+ }
+
+ explicit TReadSplitsExpectationBuilder(TConnectorClientMock* mock)
+ : TReadSplitsExpectationBuilder(nullptr, mock)
+ {
+ }
+
+ ~TReadSplitsExpectationBuilder() {
+ SetExpectation();
+ }
+
+ DATA_SOURCE_INSTANCE_SUBBUILDER();
+ SUBPROTO_BUILDER(Split, add_splits, NApi::TSplit, TSplitBuilder<TBuilder>);
+ SETTER(Format, format);
+
+ TReadSplitsResultBuilder<TBuilder> Response() {
+ return TReadSplitsResultBuilder<TBuilder>(ResponseResult_, this);
+ }
+
+ void FillWithDefaults() {
+ Format(NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
+ }
+
+ private:
+ void SetExpectation() {
+ EXPECT_CALL(*Mock_, ReadSplits(ProtobufRequestMatcher(*Result_)))
+ .WillOnce(Return(ResponseResult_));
+ }
+
+ private:
+ TConnectorClientMock* Mock_ = nullptr;
+ TReadSplitsResult::TPtr ResponseResult_ = std::make_shared<TReadSplitsResult>();
+ };
+
+ TDescribeTableExpectationBuilder ExpectDescribeTable() {
+ return TDescribeTableExpectationBuilder(this);
+ }
+
+ TListSplitsExpectationBuilder ExpectListSplits() {
+ return TListSplitsExpectationBuilder(this);
+ }
+
+ TReadSplitsExpectationBuilder ExpectReadSplits() {
+ return TReadSplitsExpectationBuilder(this);
+ }
+ };
+} // namespace NYql::NConnector::NTest
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp
new file mode 100644
index 0000000000..fa7ffafd20
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.cpp
@@ -0,0 +1 @@
+#include "database_resolver_mock.h"
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h
new file mode 100644
index 0000000000..b21259cdcd
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/database_resolver_mock.h
@@ -0,0 +1,44 @@
+#pragma once
+#include <ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h>
+
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
+
+#include <library/cpp/testing/gmock_in_unittest/gmock.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql::NConnector::NTest {
+ using namespace testing;
+
+ MATCHER_P(DatabaseAuthMapMatcher, expected, "database auth map matcher") {
+ return arg == expected;
+ }
+
+ class TDatabaseAsyncResolverMock: public NYql::IDatabaseAsyncResolver {
+ public:
+ MOCK_METHOD(NThreading::TFuture<NYql::TDatabaseResolverResponse>, ResolveIds, (const TDatabaseAuthMap& ids), (const override));
+
+ void AddClickHouseCluster(
+ const TString& clusterId = DEFAULT_CH_CLUSTER_ID,
+ const TString& endpoint = DEFAULT_CH_ENDPOINT,
+ const TString& serviceAccountId = DEFAULT_CH_SERVICE_ACCOUNT_ID,
+ const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE) {
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap dbResolverReq;
+ dbResolverReq[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] =
+ NYql::TDatabaseAuth{
+ NYql::TStructuredTokenBuilder().SetServiceAccountIdAuth(serviceAccountId, serviceAccountIdSignature).ToJson(),
+ true,
+ true};
+
+ NYql::TDatabaseResolverResponse::TDatabaseEndpointsMap databaseEndpointsMap;
+ databaseEndpointsMap[std::make_pair(clusterId, NYql::EDatabaseType::ClickHouse)] =
+ NYql::TDatabaseResolverResponse::TEndpoint{endpoint, clusterId};
+ auto dbResolverPromise = NThreading::NewPromise<NYql::TDatabaseResolverResponse>();
+ dbResolverPromise.SetValue(NYql::TDatabaseResolverResponse(std::move(databaseEndpointsMap), true));
+
+ auto result = dbResolverPromise.GetFuture();
+ EXPECT_CALL(*this, ResolveIds(DatabaseAuthMapMatcher(dbResolverReq)))
+ .WillOnce(Return(result));
+ }
+ };
+} // namespace NYql::NConnector::NTest
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp
new file mode 100644
index 0000000000..71d90e1099
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp
@@ -0,0 +1,18 @@
+#include "defaults.h"
+
+namespace NYql::NConnector::NTest {
+ extern const TString DEFAULT_DATABASE = "pgdb";
+ extern const TString DEFAULT_LOGIN = "crab";
+ extern const TString DEFAULT_PASSWORD = "qwerty12345";
+ extern const TString DEFAULT_TABLE = "example_1";
+ extern const TString DEFAULT_DATA_SOURCE_NAME = "external_data_source";
+
+ extern const TString DEFAULT_PG_HOST = "localhost";
+ extern const TString PG_SOURCE_TYPE = "PostgreSQL";
+
+ extern const TString DEFAULT_CH_HOST = "rc1a-d6dv17lv47v5mcop.db.yandex.net";
+ extern const TString DEFAULT_CH_ENDPOINT = TStringBuilder() << DEFAULT_CH_HOST << ':' << DEFAULT_CH_PORT;
+ extern const TString DEFAULT_CH_CLUSTER_ID = "ch-managed";
+ extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID = "sa";
+ extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE = "sa_signature";
+} // namespace NYql::NConnector::NTest
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h
new file mode 100644
index 0000000000..e2b82727ed
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h
@@ -0,0 +1,29 @@
+#pragma once
+#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/external_data_source.h>
+
+#include <util/generic/string.h>
+#include <util/string/builder.h>
+
+namespace NYql::NConnector::NTest {
+ extern const TString DEFAULT_DATABASE;
+ extern const TString DEFAULT_LOGIN;
+ extern const TString DEFAULT_PASSWORD;
+ extern const TString DEFAULT_TABLE;
+ extern const TString DEFAULT_DATA_SOURCE_NAME;
+
+ extern const TString DEFAULT_PG_HOST;
+ constexpr int DEFAULT_PG_PORT = 5432;
+ constexpr bool DEFAULT_USE_TLS = true;
+ extern const TString PG_SOURCE_TYPE;
+ constexpr NApi::EProtocol DEFAULT_PG_PROTOCOL = NApi::EProtocol::NATIVE;
+
+ extern const TString DEFAULT_CH_HOST;
+ constexpr int DEFAULT_CH_PORT = 8443;
+ extern const TString DEFAULT_CH_ENDPOINT;
+ extern const TString DEFAULT_CH_CLUSTER_ID;
+ constexpr NApi::EProtocol DEFAULT_CH_PROTOCOL = NApi::EProtocol::HTTP;
+ constexpr NYql::NConnector::EExternalDataSource DEFAULT_CH_SOURCE_TYPE = NYql::NConnector::EExternalDataSource::ClickHouse;
+ extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID;
+ extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE;
+} // namespace NYql::NConnector::NTest
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make
new file mode 100644
index 0000000000..660ed340f2
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/ya.make
@@ -0,0 +1,22 @@
+LIBRARY()
+
+SRCS(
+ connector_client_mock.cpp
+ database_resolver_mock.cpp
+ defaults.cpp
+)
+
+PEERDIR(
+ contrib/libs/fmt
+ library/cpp/testing/gmock_in_unittest
+ library/cpp/testing/unittest
+ ydb/core/kqp/ut/common
+ ydb/library/yql/providers/common/db_id_async_resolver
+ ydb/library/yql/providers/common/structured_token
+ ydb/library/yql/providers/generic/connector/api/common
+ ydb/library/yql/providers/generic/connector/libcpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ya.make b/ydb/library/yql/providers/generic/connector/libcpp/ya.make
index 9c9cf6e52c..f389d52c34 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/ya.make
+++ b/ydb/library/yql/providers/generic/connector/libcpp/ya.make
@@ -26,4 +26,5 @@ END()
RECURSE(
cli
+ ut_helpers
)
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
index 719316c467..be30204547 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
@@ -29,9 +29,14 @@ namespace NYql {
}
TMaybeNode<TExprBase> TrimReadWorld(TExprBase node, TExprContext& ctx) const {
- if (const auto maybeRead = node.Cast<TCoLeft>().Input().Maybe<TGenReadTable>())
- return TExprBase(ctx.NewWorld(node.Pos()));
- return node;
+ Y_UNUSED(ctx);
+
+ const auto& maybeRead = node.Cast<TCoLeft>().Input().Maybe<TGenReadTable>();
+ if (!maybeRead) {
+ return node;
+ }
+
+ return TExprBase(maybeRead.Cast().World().Ptr());
}
TMaybeNode<TExprBase> ReadZeroColumns(TExprBase node, TExprContext& ctx) const {
diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp
index 3c4a439059..bd70ddf3cc 100644
--- a/ydb/services/metadata/secret/ut/ut_secret.cpp
+++ b/ydb/services/metadata/secret/ut/ut_secret.cpp
@@ -66,8 +66,13 @@ Y_UNIT_TEST_SUITE(Secret) {
YDB_ACCESSOR(ui32, ExpectedAccessCount, 1);
using TKeyCheckers = TMap<NMetadata::NSecret::TSecretId, TJsonChecker>;
YDB_ACCESSOR_DEF(TKeyCheckers, Checkers);
- public:
+ private:
+ ui64 SecretsCountInLastSnapshot = 0;
+ ui64 AccessCountInLastSnapshot = 0;
+ TString LastSnapshotDebugString;
+
+ public:
void ResetConditions() {
FoundFlag = false;
Checkers.clear();
@@ -106,22 +111,29 @@ Y_UNIT_TEST_SUITE(Secret) {
void CheckFound(NMetadata::NProvider::TEvRefreshSubscriberData* event) {
auto snapshot = event->GetSnapshotAs<NMetadata::NSecret::TSnapshot>();
Y_ABORT_UNLESS(!!snapshot);
+ SecretsCountInLastSnapshot = snapshot->GetSecrets().size();
+ AccessCountInLastSnapshot = snapshot->GetAccess().size();
+ LastSnapshotDebugString = snapshot->SerializeToString();
+ CheckFound();
+ }
+
+ void CheckFound() {
if (ExpectedSecretsCount) {
- if (snapshot->GetSecrets().size() != ExpectedSecretsCount) {
- Cerr << "snapshot->GetSecrets().size() incorrect: " << snapshot->SerializeToString() << Endl;
+ if (SecretsCountInLastSnapshot != ExpectedSecretsCount) {
+ Cerr << "snapshot->GetSecrets().size() incorrect: " << LastSnapshotDebugString << Endl;
return;
}
- } else if (snapshot->GetSecrets().size()) {
- Cerr << "snapshot->GetSecrets().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl;
+ } else if (SecretsCountInLastSnapshot) {
+ Cerr << "snapshot->GetSecrets().size() incorrect (zero expects): " << LastSnapshotDebugString << Endl;
return;
}
if (ExpectedAccessCount) {
- if (snapshot->GetAccess().size() != ExpectedAccessCount) {
- Cerr << "snapshot->GetAccess().size() incorrect: " << snapshot->SerializeToString() << Endl;
+ if (AccessCountInLastSnapshot != ExpectedAccessCount) {
+ Cerr << "snapshot->GetAccess().size() incorrect: " << LastSnapshotDebugString << Endl;
return;
}
- } else if (snapshot->GetAccess().size()) {
- Cerr << "snapshot->GetAccess().size() incorrect (zero expects): " << snapshot->SerializeToString() << Endl;
+ } else if (AccessCountInLastSnapshot) {
+ Cerr << "snapshot->GetAccess().size() incorrect (zero expects): " << LastSnapshotDebugString << Endl;
return;
}
FoundFlag = true;
@@ -182,13 +194,13 @@ Y_UNIT_TEST_SUITE(Secret) {
UNIT_ASSERT_EQUAL_C(resultData, "[6u]", resultData);
}
- emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(0);
+ emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(0).CheckFound();
{
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
- Y_ABORT_UNLESS(emulator->IsFound());
+ UNIT_ASSERT(emulator->IsFound());
}
lHelper.StartSchemaRequest("ALTER OBJECT secret1 (TYPE SECRET) SET value = `abcde`");
@@ -199,26 +211,26 @@ Y_UNIT_TEST_SUITE(Secret) {
UNIT_ASSERT_EQUAL_C(resultData, "[10u]", resultData);
}
- emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(1);
+ emulator->SetExpectedSecretsCount(2).SetExpectedAccessCount(1).CheckFound();
{
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
- Y_ABORT_UNLESS(emulator->IsFound());
+ UNIT_ASSERT(emulator->IsFound());
}
lHelper.StartSchemaRequest("DROP OBJECT `secret1:test@test1` (TYPE SECRET_ACCESS)");
lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET)");
lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/initialization/migrations`");
- emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(0);
+ emulator->SetExpectedSecretsCount(1).SetExpectedAccessCount(0).CheckFound();
{
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
- Y_ABORT_UNLESS(emulator->IsFound());
+ UNIT_ASSERT(emulator->IsFound());
}
}
}