aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@yandex-team.com>2023-06-27 19:00:22 +0300
committervitalyisaev <vitalyisaev@yandex-team.com>2023-06-27 19:00:22 +0300
commitcd23cf14ba0e02916da11dafb03202741f855ab3 (patch)
tree05cb957a0d40e2b1de5adec6701c61f3da164d4c
parentc560c220eadb1368069c19fca7ebc4dad43a30b6 (diff)
downloadydb-cd23cf14ba0e02916da11dafb03202741f855ab3.tar.gz
Add access to MDB API for Generic provider
1. В dqrun добавлен класс, который создаёт акторную систему. В дальнейшем можно будет отрефакторить `TServiceNode` так, чтобы и он её переиспользовал. 2. В Generic провайдер добавлены изменения для корректной работы с MDB API.
-rw-r--r--ydb/core/fq/libs/actors/database_resolver.cpp1
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp2
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp13
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h3
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h6
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto16
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp72
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp12
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/utils.cpp3
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/yql/providers/generic/provider/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp27
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_provider.h22
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp19
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h109
19 files changed, 176 insertions, 141 deletions
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp
index 53dfc18ebbe..5f93243f9eb 100644
--- a/ydb/core/fq/libs/actors/database_resolver.cpp
+++ b/ydb/core/fq/libs/actors/database_resolver.cpp
@@ -313,6 +313,7 @@ private:
.AddUrlParam("databaseId", databaseId)
.Build();
} else {
+ YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
url = TUrlBuilder(ev->Get()->MdbGateway + "/managed-clickhouse/v1/clusters/")
.AddPathComponent(databaseId)
.AddPathComponent("hosts")
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index c69cae91bc2..561f80e100b 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -1835,7 +1835,7 @@ private:
}
{
- dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
+ dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}
{
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
index 2231401c426..82c9e8f90bf 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
@@ -18,11 +18,17 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl(
, MdbGateway(mdbGateway)
, MdbTransformHost(mdbTransformHost)
, TraceId(traceId)
-{}
+{
+}
-TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
- const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const
+TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const DatabaseIds& ids) const
{
+ // Cloud database ids validataion
+ for (const auto& kv: ids) {
+ // empty cluster name is not good
+ YQL_ENSURE(kv.first.first, "empty cluster name");
+ }
+
auto promise = NewPromise<NYql::TDbResolverResponse>();
TDuration timeout = TDuration::Seconds(40);
auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>(
@@ -40,6 +46,7 @@ TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId,
new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway,
TraceId, MdbTransformHost)));
+
return promise.GetFuture();
}
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
index db317bc493f..66f7cf50ffd 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
@@ -16,8 +16,7 @@ public:
const TString& traceId = ""
);
- NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(
- const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const override;
+ NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const DatabaseIds& ids) const override;
private:
NActors::TActorSystem* ActorSystem;
const NActors::TActorId Recipient;
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
index 6a9385996f5..e3a00c02a84 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
@@ -13,6 +13,7 @@ enum class DatabaseType {
Generic
};
+
struct TDatabaseAuth {
TString StructuredToken;
bool AddBearerToToken = false;
@@ -49,8 +50,9 @@ struct TDbResolverResponse {
class IDatabaseAsyncResolver {
public:
- virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(
- const THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>& ids) const = 0;
+ using DatabaseIds = THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>;
+
+ virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const DatabaseIds& ids) const = 0;
virtual ~IDatabaseAsyncResolver() = default;
};
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 1a96f12b172..9054e853a94 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -548,19 +548,31 @@ message TGenericClusterConfig {
oneof Location {
// Endpoint must be used for on-premise deployments.
NYql.Connector.API.Endpoint Endpoint = 3;
- // DatabaseID must be used when the data source is deployed in cloud.
+ // DatabaseId must be used when the data source is deployed in cloud.
// Data source FQDN and port will be resolved by MDB service.
- string DatabaseID = 4;
+ string DatabaseId = 4;
}
+ // Credentials used to access data source instance
required NYql.Connector.API.Credentials Credentials = 5;
+
+ // Credentials used to access MDB API.
+ // When working with data source instances deployed in a cloud,
+ // you should either set (ServiceAccountId, ServiceAccountIdSignature) pair,
+ // or set IAM Token via env variable / credential table.
+ optional string ServiceAccountId = 6;
+ optional string ServiceAccountIdSignature = 7;
}
message TGenericGatewayConfig {
// TODO: replace with map<DataSourceKind, Endpoint>
required string Endpoint = 1;
+
reserved 2;
repeated TGenericClusterConfig ClusterMapping = 3;
+
+ // MDB API endpoint (do not fill in case of on-prem deployment)
+ optional string MdbGateway = 4;
}
/////////////////////////////// Root ///////////////////////////////
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
index 70345916a69..82af2dbec2b 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
@@ -60,13 +60,13 @@ namespace NYql::NDq {
TGenericReadActor(ui64 inputIndex, Connector::IClient::TPtr genericClient, const NYql::Connector::API::Select& select,
const NYql::Connector::API::DataSourceInstance& dataSourceInstance,
const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory)
- : InputIndex(inputIndex)
- , ComputeActorId(computeActorId)
- , ActorSystem(TActivationContext::ActorSystem())
- , ConnectorClient(genericClient)
- , HolderFactory(holderFactory)
- , Select(select)
- , DataSourceInstance(dataSourceInstance)
+ : InputIndex_(inputIndex)
+ , ComputeActorId_(computeActorId)
+ , ActorSystem_(TActivationContext::ActorSystem())
+ , ConnectorClient_(genericClient)
+ , HolderFactory_(holderFactory)
+ , Select_(select)
+ , DataSourceInstance_(dataSourceInstance)
{
}
@@ -74,13 +74,13 @@ namespace NYql::NDq {
Become(&TGenericReadActor::StateFunc);
Connector::API::ListSplitsRequest listSplitsRequest;
- listSplitsRequest.mutable_selects()->Add()->CopyFrom(Select);
- listSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance);
+ listSplitsRequest.mutable_selects()->Add()->CopyFrom(Select_);
+ listSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance_);
- auto listSplitsResult = ConnectorClient->ListSplits(listSplitsRequest);
+ auto listSplitsResult = ConnectorClient_->ListSplits(listSplitsRequest);
if (!Connector::ErrorIsSuccess(listSplitsResult->Error)) {
YQL_CLOG(ERROR, ProviderGeneric) << "ListSplits failure" << listSplitsResult->Error.DebugString();
- ActorSystem->Send(new IEventHandle(
+ ActorSystem_->Send(new IEventHandle(
SelfId(), TActorId(), new TEvPrivate::TEvReadError(Connector::ErrorToIssues(listSplitsResult->Error))));
return;
}
@@ -93,12 +93,12 @@ namespace NYql::NDq {
std::for_each(
listSplitsResult->Splits.cbegin(), listSplitsResult->Splits.cend(),
[&](const Connector::API::Split& split) { readSplitsRequest.mutable_splits()->Add()->CopyFrom(split); });
- readSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance);
+ readSplitsRequest.mutable_data_source_instance()->CopyFrom(DataSourceInstance_);
- auto readSplitsResult = ConnectorClient->ReadSplits(readSplitsRequest);
+ auto readSplitsResult = ConnectorClient_->ReadSplits(readSplitsRequest);
if (!Connector::ErrorIsSuccess(listSplitsResult->Error)) {
YQL_CLOG(ERROR, ProviderGeneric) << "ReadSplits failure" << readSplitsResult->Error.DebugString();
- ActorSystem->Send(new IEventHandle(
+ ActorSystem_->Send(new IEventHandle(
SelfId(), TActorId(), new TEvPrivate::TEvReadError(Connector::ErrorToIssues(listSplitsResult->Error))));
return;
}
@@ -106,7 +106,7 @@ namespace NYql::NDq {
YQL_CLOG(INFO, ProviderGeneric) << "ReadSplits succeess, total batches: "
<< readSplitsResult->RecordBatches.size();
- ActorSystem->Send(new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvReadResult(readSplitsResult)));
+ ActorSystem_->Send(new IEventHandle(SelfId(), TActorId(), new TEvPrivate::TEvReadResult(readSplitsResult)));
}
static constexpr char ActorName[] = "Generic_READ_ACTOR";
@@ -119,7 +119,7 @@ namespace NYql::NDq {
void CommitState(const NDqProto::TCheckpoint&) final {
}
ui64 GetInputIndex() const final {
- return InputIndex;
+ return InputIndex_;
}
STRICT_STFUNC(StateFunc,
@@ -129,12 +129,12 @@ namespace NYql::NDq {
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished,
i64 /*freeSpace*/) final {
YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
- if (Result) {
+ if (Result_) {
NUdf::TUnboxedValue value;
ui64 total = 0;
- for (const auto& batch : Result->RecordBatches) {
+ for (const auto& batch : Result_->RecordBatches) {
total += NUdf::GetSizeOfArrowBatchInBytes(*batch);
YQL_CLOG(TRACE, ProviderGeneric) << "Converting arrow::RecordBatch into NUdf::UnboxedValue:\n"
@@ -151,14 +151,14 @@ namespace NYql::NDq {
}
NUdf::TUnboxedValue* structItems = nullptr;
- auto structObj = ArrowRowContainerCache.NewArray(HolderFactory, 1 + batch->num_columns(), structItems);
+ auto structObj = ArrowRowContainerCache_.NewArray(HolderFactory_, 1 + batch->num_columns(), structItems);
for (int i = 0; i < batch->num_columns(); ++i) {
const auto& columnName = batch->schema()->field(i)->name();
const auto ix = fieldNameOrder[columnName];
- structItems[ix] = HolderFactory.CreateArrowBlock(arrow::Datum(batch->column(i)));
+ structItems[ix] = HolderFactory_.CreateArrowBlock(arrow::Datum(batch->column(i)));
}
- structItems[fieldNameOrder[std::string(BlockLengthColumnName)]] = HolderFactory.CreateArrowBlock(
+ structItems[fieldNameOrder[std::string(BlockLengthColumnName)]] = HolderFactory_.CreateArrowBlock(
arrow::Datum(std::make_shared<arrow::UInt64Scalar>(batch->num_rows())));
value = structObj;
@@ -167,11 +167,11 @@ namespace NYql::NDq {
// freeSpace -= size;
finished = true;
- Result.reset();
+ Result_.reset();
// TODO: check it, because in S3 the generic cache clearing happens only when LastFileWasProcessed:
// https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp?rev=r11543410#L2497
- ArrowRowContainerCache.Clear();
+ ArrowRowContainerCache_.Clear();
return total;
}
@@ -180,13 +180,13 @@ namespace NYql::NDq {
}
void Handle(TEvPrivate::TEvReadResult::TPtr& evReadResult) {
- Result = evReadResult->Get()->Result;
- Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ Result_ = evReadResult->Get()->Result;
+ Send(ComputeActorId_, new TEvNewAsyncInputDataArrived(InputIndex_));
}
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
- Send(ComputeActorId,
- new TEvAsyncInputError(InputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ Send(ComputeActorId_,
+ new TEvAsyncInputError(InputIndex_, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}
// IActor & IDqComputeActorAsyncInput
@@ -194,18 +194,18 @@ namespace NYql::NDq {
TActorBootstrapped<TGenericReadActor>::PassAway();
}
- const ui64 InputIndex;
- const NActors::TActorId ComputeActorId;
+ const ui64 InputIndex_;
+ const NActors::TActorId ComputeActorId_;
- TActorSystem* const ActorSystem;
+ TActorSystem* const ActorSystem_;
// Changed:
- Connector::IClient::TPtr ConnectorClient;
- Connector::ReadSplitsResult::TPtr Result;
- NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache;
- const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
- const NYql::Connector::API::Select Select;
- const NYql::Connector::API::DataSourceInstance DataSourceInstance;
+ Connector::IClient::TPtr ConnectorClient_;
+ Connector::ReadSplitsResult::TPtr Result_;
+ NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_;
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
+ const NYql::Connector::API::Select Select_;
+ const NYql::Connector::API::DataSourceInstance DataSourceInstance_;
};
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp
index 7942485feb9..644141825df 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp
@@ -12,7 +12,7 @@ void SetDatabaseSourceInstance(NYql::Connector::API::DataSourceInstance* dsi) {
dsi->mutable_credentials()->mutable_basic()->set_password("qwerty12345");
}
-std::shared_ptr<NYql::Connector::DescribeTableResult> describeTable(NYql::Connector::IClient::TPtr client) {
+std::shared_ptr<NYql::Connector::DescribeTableResult> DescribeTable(NYql::Connector::IClient::TPtr client) {
NYql::Connector::API::DescribeTableRequest request;
request.set_table(TableName);
SetDatabaseSourceInstance(request.mutable_data_source_instance());
@@ -31,7 +31,7 @@ std::shared_ptr<NYql::Connector::DescribeTableResult> describeTable(NYql::Connec
}
std::shared_ptr<NYql::Connector::ListSplitsResult>
-listSplits(NYql::Connector::IClient::TPtr client, const google::protobuf::RepeatedPtrField<Ydb::Column>& columns) {
+ListSplits(NYql::Connector::IClient::TPtr client, const google::protobuf::RepeatedPtrField<Ydb::Column>& columns) {
NYql::Connector::API::ListSplitsRequest request;
SetDatabaseSourceInstance(request.mutable_data_source_instance());
@@ -61,7 +61,7 @@ listSplits(NYql::Connector::IClient::TPtr client, const google::protobuf::Repeat
return result;
}
-std::shared_ptr<NYql::Connector::ReadSplitsResult> readSplits(NYql::Connector::IClient::TPtr client,
+std::shared_ptr<NYql::Connector::ReadSplitsResult> ReadSplits(NYql::Connector::IClient::TPtr client,
const std::vector<NYql::Connector::API::Split>& splits) {
NYql::Connector::API::ReadSplitsRequest request;
SetDatabaseSourceInstance(request.mutable_data_source_instance());
@@ -91,9 +91,9 @@ int main() {
auto client = NYql::Connector::MakeClientGRPC("localhost:50051");
try {
- auto columns = describeTable(client)->Schema.columns();
- auto splits = listSplits(client, columns)->Splits;
- readSplits(client, splits);
+ auto columns = DescribeTable(client)->Schema.columns();
+ auto splits = ListSplits(client, columns)->Splits;
+ ReadSplits(client, splits);
} catch (const std::runtime_error& e) {
YQL_LOG(ERROR) << e.what();
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
index fc4da3b6aec..928a23283af 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp
@@ -3,6 +3,7 @@
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <util/string/builder.h>
+#include <util/system/type_name.h>
#include <ydb/core/formats/arrow/serializer/batch_only.h>
#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -47,7 +48,7 @@ namespace NYql::Connector {
}
default:
ythrow yexception() << "unexpected type: " << Ydb::Type_PrimitiveTypeId_Name(t) << " ("
- << typeid(t).name() << ")";
+ << TypeName(t) << ")";
}
return arrow::Status::OK();
}
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
index b45684fa272..d41aad8c1c6 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.darwin-x86_64.txt
@@ -30,6 +30,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-structured_token
providers-common-transform
providers-dq-common
providers-dq-expr_nodes
@@ -50,5 +51,4 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
index 190f243c44f..712f9b54de7 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-aarch64.txt
@@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-structured_token
providers-common-transform
providers-dq-common
providers-dq-expr_nodes
@@ -51,5 +52,4 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
index 190f243c44f..712f9b54de7 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.linux-x86_64.txt
@@ -31,6 +31,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-structured_token
providers-common-transform
providers-dq-common
providers-dq-expr_nodes
@@ -51,5 +52,4 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
index b45684fa272..d41aad8c1c6 100644
--- a/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/generic/provider/CMakeLists.windows-x86_64.txt
@@ -30,6 +30,7 @@ target_link_libraries(providers-generic-provider PUBLIC
providers-common-mkql
providers-common-proto
providers-common-provider
+ providers-common-structured_token
providers-common-transform
providers-dq-common
providers-dq-expr_nodes
@@ -50,5 +51,4 @@ target_sources(providers-generic-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
)
diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make
index c788882efbf..9129151fcd4 100644
--- a/ydb/library/yql/providers/generic/provider/ya.make
+++ b/ydb/library/yql/providers/generic/provider/ya.make
@@ -15,7 +15,6 @@ SRCS(
yql_generic_provider.cpp
yql_generic_provider.h
yql_generic_provider_impl.h
- yql_generic_settings.cpp
yql_generic_settings.h
)
@@ -38,6 +37,7 @@ PEERDIR(
ydb/library/yql/providers/common/mkql
ydb/library/yql/providers/common/proto
ydb/library/yql/providers/common/provider
+ ydb/library/yql/providers/common/structured_token
ydb/library/yql/providers/common/transform
ydb/library/yql/providers/dq/common
ydb/library/yql/providers/dq/expr_nodes
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
index 81c2aef829b..064519d2371 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
@@ -47,24 +47,29 @@ namespace NYql {
!reads.empty()) {
for (auto& node : reads) {
const TGenRead read(node);
- const auto cluster = read.DataSource().Cluster().StringValue();
- YQL_CLOG(DEBUG, ProviderGeneric) << "Found cluster: " << cluster;
- auto databaseID = State_->Configuration->ClusterConfigs[cluster].GetDatabaseID();
- YQL_CLOG(DEBUG, ProviderGeneric) << "Found cloudID: " << databaseID;
- const auto idKey = std::make_pair(databaseID, NYql::DatabaseType::Generic);
- const auto iter = State_->DatabaseIds.find(idKey);
- if (iter != State_->DatabaseIds.end()) {
- YQL_CLOG(DEBUG, ProviderGeneric) << "Resolve CloudID: " << databaseID;
- ids[idKey] = iter->second;
+ const auto clusterName = read.DataSource().Cluster().StringValue();
+ YQL_CLOG(DEBUG, ProviderGeneric) << "found cluster name: " << clusterName;
+
+ auto databaseId = State_->Configuration->ClusterConfigs[clusterName].GetDatabaseId();
+ if (databaseId) {
+ YQL_CLOG(DEBUG, ProviderGeneric) << "found database id: " << databaseId;
+ const auto idKey = std::make_pair(databaseId, NYql::DatabaseType::Generic);
+ const auto iter = State_->DatabaseIds.find(idKey);
+ if (iter != State_->DatabaseIds.end()) {
+ YQL_CLOG(DEBUG, ProviderGeneric) << "resolve database id: " << databaseId;
+ ids[idKey] = iter->second;
+ }
}
}
}
- YQL_CLOG(DEBUG, ProviderGeneric) << "Ids to resolve: " << ids.size();
+ YQL_CLOG(DEBUG, ProviderGeneric) << "total database ids to resolve: " << ids.size();
if (ids.empty()) {
return TStatus::Ok;
}
+ // FIXME: overengineered code - instead of using weak_ptr, directly copy shared_ptr in callback in this way:
+ // Apply([response = DbResolverResponse_](...))
const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
if (const auto res = response.lock())
@@ -88,7 +93,7 @@ namespace NYql {
DbResolverResponse_->DatabaseId2Endpoint.end());
DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
YQL_CLOG(DEBUG, ProviderGeneric) << "ResolvedIds: " << FullResolvedIds_.size();
- const auto& id2Clusters = State_->Configuration->DbId2Clusters;
+ const auto& id2Clusters = State_->Configuration->DatabaseIdsToClusterNames;
for (const auto& [dbIdWithType, info] : FullResolvedIds_) {
const auto& dbId = dbIdWithType.first;
const auto iter = id2Clusters.find(dbId);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
index 4a061c552e9..fe88e66c3e5 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp
@@ -27,7 +27,7 @@ namespace NYql {
state->FunctionRegistry = functionRegistry;
state->DbResolver = dbResolver;
if (gatewaysConfig) {
- state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DbResolver, state->DatabaseIds);
+ state->Configuration->Init(gatewaysConfig->GetGeneric(), state->DbResolver, state->DatabaseIds, typeCtx->Credentials);
}
TDataProviderInfo info;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
index 3e41f056a0f..69d902894a1 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h
@@ -49,16 +49,7 @@ namespace NYql {
ythrow yexception() << "unknown (" << cluster << ", " << table << ") pair";
};
- THashMap<std::pair<TString, TString>, TTableMeta> Tables;
- std::unordered_map<std::string_view, std::string_view> Timezones;
-
- TTypeAnnotationContext* Types = nullptr;
- TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
- const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
-
- TString DumpToString() const {
+ TString ToString() const {
TStringBuilder sb;
if (Tables) {
for (const auto& kv : Tables) {
@@ -68,6 +59,17 @@ namespace NYql {
}
return sb;
}
+
+ THashMap<std::pair<TString, TString>, TTableMeta> Tables;
+ std::unordered_map<std::string_view, std::string_view> Timezones;
+
+ TTypeAnnotationContext* Types = nullptr;
+ TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
+
+ // key - database id, value - credentials to access MDB API
+ NYql::IDatabaseAsyncResolver::DatabaseIds DatabaseIds;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
deleted file mode 100644
index 5c1086b81e2..00000000000
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp
+++ /dev/null
@@ -1,19 +0,0 @@
-#include "yql_generic_settings.h"
-
-namespace NYql {
-
- using namespace NCommon;
-
- TGenericConfiguration::TGenericConfiguration()
- {
- }
-
- TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const {
- return std::make_shared<const TGenericSettings>(*this);
- }
-
- bool TGenericConfiguration::HasCluster(TStringBuf cluster) const {
- return ValidClusters.contains(cluster);
- }
-
-}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index 3d6f36d722c..1274488f5e0 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -4,6 +4,7 @@
#include <ydb/library/yql/providers/common/config/yql_setting.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -13,25 +14,17 @@ namespace NYql {
using TConstPtr = std::shared_ptr<const TGenericSettings>;
};
- struct TGenericURL {
- TString Host;
- ui16 Port;
- EHostScheme Scheme;
-
- TString Endpoint() const {
- return Host + ':' + ToString(Port);
- };
- };
-
struct TGenericConfiguration: public TGenericSettings, public NCommon::TSettingDispatcher {
using TPtr = TIntrusivePtr<TGenericConfiguration>;
- TGenericConfiguration();
+ TGenericConfiguration(){};
TGenericConfiguration(const TGenericConfiguration&) = delete;
template <typename TProtoConfig>
- void Init(const TProtoConfig& config, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
+ void Init(const TProtoConfig& config,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ NYql::IDatabaseAsyncResolver::DatabaseIds& databaseIds,
+ const TCredentials::TPtr& credentials)
{
TVector<TString> clusterNames(Reserve(config.ClusterMappingSize()));
@@ -39,42 +32,74 @@ namespace NYql {
clusterNames.push_back(cluster.GetName());
ClusterConfigs[cluster.GetName()] = cluster;
}
-
this->SetValidClusters(clusterNames);
- // TODO: support data sources other than ClickHouse here
- for (auto& cluster : config.GetClusterMapping()) {
- if (dbResolver) {
- const auto& databaseID = cluster.GetDatabaseID();
- YQL_CLOG(DEBUG, ProviderGeneric)
- << "Settings: clusterName = " << cluster.GetName() << ", cluster cloud id = " << databaseID
- << ", cluster.GetEndpoint(): " << cluster.GetEndpoint()
- << ", HasEndpoint: " << (cluster.HasEndpoint() ? "TRUE" : "FALSE");
-
- // TODO: recover logic with structured tokens
- databaseIds[std::make_pair(databaseID, NYql::DatabaseType::Generic)] =
- NYql::TDatabaseAuth{"", /*AddBearer=*/false};
- if (databaseID) {
- DbId2Clusters[databaseID].emplace_back(cluster.GetName());
- YQL_CLOG(DEBUG, ProviderGeneric) << "Add cluster cloud id: " << databaseID << " to DbId2Clusters";
- }
- }
-
- // NOTE: Tokens map is left because of legacy.
- // There are no reasons for provider to store these tokens other than
- // to keep compatibility with YQL engine.
- const auto& basic = cluster.GetCredentials().basic();
- Tokens[cluster.GetName()] = TStringBuilder() << "basic#" << basic.username() << "#" << basic.password();
+ for (const auto& cluster : config.GetClusterMapping()) {
+ InitCluster(cluster, dbResolver, databaseIds, credentials);
}
+
this->FreezeDefaults();
}
- bool HasCluster(TStringBuf cluster) const;
+ private:
+ void InitCluster(const TGenericClusterConfig& cluster,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ NYql::IDatabaseAsyncResolver::DatabaseIds& databaseIds,
+ const TCredentials::TPtr& credentials) {
+ const auto& clusterName = cluster.GetName();
+ const auto& databaseId = cluster.GetDatabaseId();
+ const auto& endpoint = cluster.GetEndpoint();
+
+ YQL_CLOG(DEBUG, ProviderGeneric)
+ << "initialize cluster"
+ << ": name = " << clusterName
+ << ", database id = " << databaseId
+ << ", endpoint = " << endpoint;
+
+ if (dbResolver && databaseId) {
+ const auto token = MakeStructuredToken(cluster, credentials);
+
+ databaseIds[std::make_pair(databaseId, NYql::DatabaseType::Generic)] = NYql::TDatabaseAuth{token, /*AddBearer=*/true};
+
+ DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName);
+ YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping";
+ }
+
+ // NOTE: Tokens map is filled just because it's required by YQL legacy code.
+ // The only reason for provider to store these tokens is
+ // to keep compatibility with YQL engine.
+ // Real credentials are stored in TGenericClusterConfig.
+ Tokens[cluster.GetName()] = " ";
+ }
+
+ // Structured tokens are used to access MDB API. They can be constructed from two
+ TString MakeStructuredToken(const TGenericClusterConfig& cluster, const TCredentials::TPtr& credentials) {
+ TStructuredTokenBuilder b;
+
+ const auto iamToken = credentials->FindCredentialContent("default_" + cluster.name(), "default_generic", "");
+ if (iamToken) {
+ return b.SetIAMToken(iamToken).ToJson();
+ }
+
+ if (cluster.HasServiceAccountId() && cluster.HasServiceAccountIdSignature()) {
+ return b.SetServiceAccountIdAuth(cluster.GetServiceAccountId(), cluster.GetServiceAccountIdSignature()).ToJson();
+ }
+
+ ythrow yexception() << "you should either provide IAM Token via credential system, "
+ "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config";
+ }
+
+ public:
+ TGenericSettings::TConstPtr Snapshot() const {
+ return std::make_shared<const TGenericSettings>(*this);
+ }
+
+ bool HasCluster(TStringBuf cluster) const {
+ return ValidClusters.contains(cluster);
+ }
- TGenericSettings::TConstPtr Snapshot() const;
THashMap<TString, TString> Tokens;
- THashMap<TString, TGenericClusterConfig> ClusterConfigs;
- THashMap<TString, TVector<TString>> DbId2Clusters; // DatabaseId -> ClusterNames
+ THashMap<TString, TGenericClusterConfig> ClusterConfigs; // cluster name -> cluster config
+ THashMap<TString, TVector<TString>> DatabaseIdsToClusterNames; // database id -> cluster name
};
-
}