diff options
author | Nina <100221635+ninaiad@users.noreply.github.com> | 2025-04-17 12:11:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-17 12:11:04 +0300 |
commit | c26c7154a9bc08da512a7f4f263ca435bd794cfd (patch) | |
tree | c470160ab316366cd227c3405ca7a496bb927e2b | |
parent | 68db7f5a047cadc6cdf2457139b7102326e41997 (diff) | |
download | ydb-c26c7154a9bc08da512a7f4f263ca435bd794cfd.tar.gz |
YDB FQ: support MongoDB as an external data source (#15335)
Co-authored-by: Vitaly Isaev <vitalyisaev2@gmail.com>
9 files changed, 100 insertions, 54 deletions
diff --git a/ydb/core/external_sources/external_data_source.cpp b/ydb/core/external_sources/external_data_source.cpp index f2ce2fbab49..46c58a11081 100644 --- a/ydb/core/external_sources/external_data_source.cpp +++ b/ydb/core/external_sources/external_data_source.cpp @@ -37,7 +37,7 @@ struct TExternalDataSource : public IExternalSource { } bool DataSourceMustHaveDataBaseName(const TProtoStringType& sourceType) const { - return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "ClickHouse"}, sourceType); + return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "ClickHouse", "MongoDB"}, sourceType); } virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override { diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index d0ad52264b0..787b6470c03 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -148,6 +148,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri { ToString(NYql::EDatabaseType::Prometheus), CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"protocol", "use_tls"}, hostnamePatternsRegEx) + }, + { + ToString(NYql::EDatabaseType::MongoDB), + CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "reading_mode", "unexpected_type_display_mode", "unsupported_type_display_mode"}, hostnamePatternsRegEx) } }, availableExternalDataSources); diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index 55e5fcc77fc..f6b151e11a0 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -88,7 +88,10 @@ TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString& "service_name", // oracle "folder_id", // logging "use_ssl", // solomon - "grpc_port" // solomon + "grpc_port", // solomon + "reading_mode", // mongodb + "unexpected_type_display_mode", // mongodb + "unsupported_type_display_mode", // mongodb }; auto& featuresExtractor = settings.GetFeaturesExtractor(); diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp index 9d34edab921..b44d9adccb5 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp +++ b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp @@ -49,6 +49,8 @@ EDatabaseType DatabaseTypeFromDataSourceKind(NYql::EGenericDataSourceKind dataSo return EDatabaseType::Redis; case NYql::EGenericDataSourceKind::PROMETHEUS: return EDatabaseType::Prometheus; + case NYql::EGenericDataSourceKind::MONGO_DB: + return EDatabaseType::MongoDB; default: ythrow yexception() << "Unknown data source kind: " << NYql::EGenericDataSourceKind_Name(dataSourceKind); } @@ -78,6 +80,8 @@ NYql::EGenericDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseType database return NYql::EGenericDataSourceKind::REDIS; case EDatabaseType::Prometheus: return NYql::EGenericDataSourceKind::PROMETHEUS; + case EDatabaseType::MongoDB: + return NYql::EGenericDataSourceKind::MONGO_DB; default: ythrow yexception() << "Unknown database type: " << ToString(databaseType); } diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h index ed2f5f2f631..e1eeee40b82 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h @@ -21,7 +21,8 @@ enum class EDatabaseType { Solomon, Iceberg, Redis, - Prometheus + Prometheus, + MongoDB }; std::set<TString> GetAllExternalDataSourceTypes(); diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index 0bc488f55bb..fe129ecd551 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -54,7 +54,8 @@ namespace NYql::NDq { "LoggingGeneric", "IcebergGeneric", "RedisGeneric", - "PrometheusGeneric"}) { + "PrometheusGeneric", + "MongoDBGeneric"}) { factory.RegisterSource<Generic::TSource>(name, readActorFactory); factory.RegisterLookupSource<Generic::TLookupSource>(name, lookupActorFactory); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp index 8443cd7a953..3e45863e100 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp @@ -120,36 +120,6 @@ namespace NYql { clusterConfig.SetDatabaseName(it->second); } - void ParseSchema(const THashMap<TString, TString>& properties, - NYql::TGenericClusterConfig& clusterConfig) { - auto it = properties.find("schema"); - if (it == properties.cend()) { - // SCHEMA is optional field - return; - } - - if (!it->second) { - // SCHEMA is optional field - return; - } - - clusterConfig.mutable_datasourceoptions()->insert({TString("schema"), TString(it->second)}); - } - - void ParseServiceName(const THashMap<TString, TString>& properties, - NYql::TGenericClusterConfig& clusterConfig) { - auto it = properties.find("service_name"); - if (it == properties.cend()) { - return; - } - - if (!it->second) { - return; - } - - clusterConfig.mutable_datasourceoptions()->insert({TString("service_name"), TString(it->second)}); - } - void ParseMdbClusterId(const THashMap<TString, TString>& properties, NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("mdb_cluster_id"); @@ -215,7 +185,8 @@ namespace NYql { EGenericDataSourceKind::MS_SQL_SERVER, EGenericDataSourceKind::ORACLE, EGenericDataSourceKind::ICEBERG, - EGenericDataSourceKind::REDIS + EGenericDataSourceKind::REDIS, + EGenericDataSourceKind::MONGO_DB }, clusterConfig.GetKind() )) { @@ -277,20 +248,18 @@ namespace NYql { clusterConfig.SetServiceAccountIdSignature(it->second); } - void ParseFolderId(const THashMap<TString, TString>& properties, - NYql::TGenericClusterConfig& clusterConfig) { - auto it = properties.find("folder_id"); + void ParseOptionalField(const THashMap<TString, TString>& properties, + NYql::TGenericClusterConfig& clusterConfig, const TString& fieldName) { + auto it = properties.find(fieldName); if (it == properties.cend()) { - // FOLDER_ID is optional field return; } if (!it->second) { - // FOLDER_ID is optional field return; } - clusterConfig.mutable_datasourceoptions()->insert({"folder_id", TString(it->second)}); + clusterConfig.mutable_datasourceoptions()->insert({fieldName, TString(it->second)}); } /// @@ -349,8 +318,6 @@ namespace NYql { ParseLocation(properties, clusterConfig); ParseUseTLS(properties, clusterConfig); ParseDatabaseName(properties, clusterConfig); - ParseSchema(properties, clusterConfig); - ParseServiceName(properties, clusterConfig); ParseMdbClusterId(properties, clusterConfig); ParseDatabaseId(properties, clusterConfig); ParseSourceType(properties, clusterConfig); @@ -358,8 +325,13 @@ namespace NYql { ParseServiceAccountId(properties, clusterConfig); ParseServiceAccountIdSignature(properties, clusterConfig); ParseToken(properties, clusterConfig); - ParseFolderId(properties, clusterConfig); ParseIcebergFields(properties, clusterConfig); + ParseOptionalField(properties, clusterConfig, "schema"); + ParseOptionalField(properties, clusterConfig, "folder_id"); + ParseOptionalField(properties, clusterConfig, "reading_mode"); + ParseOptionalField(properties, clusterConfig, "service_name"); + ParseOptionalField(properties, clusterConfig, "unexpected_type_display_mode"); + ParseOptionalField(properties, clusterConfig, "unsupported_type_display_mode"); return clusterConfig; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index ac520dd5623..4eed2213cfe 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -44,6 +44,8 @@ namespace NYql { return "RedisGeneric"; case NYql::EGenericDataSourceKind::PROMETHEUS: return "PrometheusGeneric"; + case NYql::EGenericDataSourceKind::MONGO_DB: + return "MongoDBGeneric"; default: throw yexception() << "Data source kind is unknown or not specified"; } @@ -286,6 +288,9 @@ namespace NYql { case NYql::EGenericDataSourceKind::ICEBERG: properties["SourceType"] = "Iceberg"; break; + case NYql::EGenericDataSourceKind::MONGO_DB: + properties["SourceType"] = "MongoDB"; + break; case NYql::EGenericDataSourceKind::REDIS: properties["SourceType"] = "Redis"; break; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 672690fc4b1..5ac1e1809e3 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -98,7 +98,11 @@ namespace NYql { TableDescriptions_.reserve(pendingTables.size()); for (const auto& tableAddress : pendingTables) { - LoadTableMetadataFromConnector(tableAddress, handles); + auto tIssues = LoadTableMetadataFromConnector(tableAddress, handles); + if (!tIssues.Empty()) { + ctx.AddError(TIssue(tIssues.ToString())); + return TStatus::Error; + } } if (handles.empty()) { @@ -111,22 +115,28 @@ namespace NYql { // clang-format on private: - void LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress, + TIssues LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress, std::vector<NThreading::TFuture<void>>& handles) { const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(tableAddress.ClusterName); YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, "cluster not found: " << tableAddress.ClusterName); + // preserve data source instance for the further usage + auto emplaceIt = + TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>()); + + auto desc = emplaceIt.first->second; + NConnector::NApi::TDescribeTableRequest request; - FillDescribeTableRequest(request, it->second, tableAddress.TableName); + auto issues = FillDescribeTableRequest(request, it->second, tableAddress.TableName); + + if (!issues.Empty()) { + return issues; + } auto promise = NThreading::NewPromise(); handles.emplace_back(promise.GetFuture()); - // preserve data source instance for the further usage - auto emplaceIt = - TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>()); - auto desc = emplaceIt.first->second; desc->DataSourceInstance = request.data_source_instance(); Y_ENSURE(State_->GenericClient); @@ -217,6 +227,8 @@ namespace NYql { }); }); }); + + return TIssues{}; } public: @@ -362,7 +374,7 @@ namespace NYql { // clang-format on } - void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, + TIssues FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, const TString& tablePath) { const auto dataSourceKind = clusterConfig.GetKind(); auto dsi = request.mutable_data_source_instance(); @@ -372,8 +384,14 @@ namespace NYql { dsi->set_protocol(clusterConfig.GetProtocol()); FillCredentials(request, clusterConfig); FillTypeMappingSettings(request); - FillDataSourceOptions(request, clusterConfig); + auto issues = FillDataSourceOptions(request, clusterConfig); + if (!issues.Empty()) { + return issues; + } + FillTablePath(request, clusterConfig, tablePath); + + return {}; } void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, @@ -508,7 +526,39 @@ namespace NYql { } } - void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, + TIssues SetMongoDBOptions(NYql::TMongoDbDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) { + TIssues issues; + auto it = clusterConfig.GetDataSourceOptions().find("reading_mode"); + if (it != clusterConfig.GetDataSourceOptions().end()) { + TMongoDbDataSourceOptions_EReadingMode value = TMongoDbDataSourceOptions::READING_MODE_UNSPECIFIED; + if (!TMongoDbDataSourceOptions_EReadingMode_Parse(it->second, &value)) { + issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB reading_mode: " << it->second)); + } + options.set_reading_mode(value); + } + + it = clusterConfig.GetDataSourceOptions().find("unexpected_type_display_mode"); + if (it != clusterConfig.GetDataSourceOptions().end()) { + TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode value = TMongoDbDataSourceOptions::UNEXPECTED_UNSPECIFIED; + if (!TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode_Parse(it->second, &value)) { + issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unexpected_type_display_mode: " << it->second)); + } + options.set_unexpected_type_display_mode(value); + } + + it = clusterConfig.GetDataSourceOptions().find("unsupported_type_display_mode"); + if (it != clusterConfig.GetDataSourceOptions().end()) { + TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode value = TMongoDbDataSourceOptions::UNSUPPORTED_UNSPECIFIED; + if (!TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode_Parse(it->second, &value)) { + issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unsupported_type_display_mode: " << it->second)); + } + options.set_unsupported_type_display_mode(value); + } + + return issues; + } + + TIssues FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { const auto dataSourceKind = clusterConfig.GetKind(); switch (dataSourceKind) { @@ -544,10 +594,16 @@ namespace NYql { break; case NYql::EGenericDataSourceKind::PROMETHEUS: break; + case NYql::EGenericDataSourceKind::MONGO_DB: { + auto* options = request.mutable_data_source_instance()->mutable_mongodb_options(); + return SetMongoDBOptions(*options, clusterConfig); + } break; default: throw yexception() << "Unexpected data source kind: '" << NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'"; } + + return TIssues{}; } void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) { |