aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNina <100221635+ninaiad@users.noreply.github.com>2025-04-17 12:11:04 +0300
committerGitHub <noreply@github.com>2025-04-17 12:11:04 +0300
commitc26c7154a9bc08da512a7f4f263ca435bd794cfd (patch)
treec470160ab316366cd227c3405ca7a496bb927e2b
parent68db7f5a047cadc6cdf2457139b7102326e41997 (diff)
downloadydb-c26c7154a9bc08da512a7f4f263ca435bd794cfd.tar.gz
YDB FQ: support MongoDB as an external data source (#15335)
Co-authored-by: Vitaly Isaev <vitalyisaev2@gmail.com>
-rw-r--r--ydb/core/external_sources/external_data_source.cpp2
-rw-r--r--ydb/core/external_sources/external_source_factory.cpp4
-rw-r--r--ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp5
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp4
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/database_type.h3
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp3
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp52
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp5
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp76
9 files changed, 100 insertions, 54 deletions
diff --git a/ydb/core/external_sources/external_data_source.cpp b/ydb/core/external_sources/external_data_source.cpp
index f2ce2fbab49..46c58a11081 100644
--- a/ydb/core/external_sources/external_data_source.cpp
+++ b/ydb/core/external_sources/external_data_source.cpp
@@ -37,7 +37,7 @@ struct TExternalDataSource : public IExternalSource {
}
bool DataSourceMustHaveDataBaseName(const TProtoStringType& sourceType) const {
- return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "ClickHouse"}, sourceType);
+ return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "ClickHouse", "MongoDB"}, sourceType);
}
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp
index d0ad52264b0..787b6470c03 100644
--- a/ydb/core/external_sources/external_source_factory.cpp
+++ b/ydb/core/external_sources/external_source_factory.cpp
@@ -148,6 +148,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
{
ToString(NYql::EDatabaseType::Prometheus),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"protocol", "use_tls"}, hostnamePatternsRegEx)
+ },
+ {
+ ToString(NYql::EDatabaseType::MongoDB),
+ CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "reading_mode", "unexpected_type_display_mode", "unsupported_type_display_mode"}, hostnamePatternsRegEx)
}
},
availableExternalDataSources);
diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp
index 55e5fcc77fc..f6b151e11a0 100644
--- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp
+++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp
@@ -88,7 +88,10 @@ TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString&
"service_name", // oracle
"folder_id", // logging
"use_ssl", // solomon
- "grpc_port" // solomon
+ "grpc_port", // solomon
+ "reading_mode", // mongodb
+ "unexpected_type_display_mode", // mongodb
+ "unsupported_type_display_mode", // mongodb
};
auto& featuresExtractor = settings.GetFeaturesExtractor();
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp
index 9d34edab921..b44d9adccb5 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp
@@ -49,6 +49,8 @@ EDatabaseType DatabaseTypeFromDataSourceKind(NYql::EGenericDataSourceKind dataSo
return EDatabaseType::Redis;
case NYql::EGenericDataSourceKind::PROMETHEUS:
return EDatabaseType::Prometheus;
+ case NYql::EGenericDataSourceKind::MONGO_DB:
+ return EDatabaseType::MongoDB;
default:
ythrow yexception() << "Unknown data source kind: " << NYql::EGenericDataSourceKind_Name(dataSourceKind);
}
@@ -78,6 +80,8 @@ NYql::EGenericDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseType database
return NYql::EGenericDataSourceKind::REDIS;
case EDatabaseType::Prometheus:
return NYql::EGenericDataSourceKind::PROMETHEUS;
+ case EDatabaseType::MongoDB:
+ return NYql::EGenericDataSourceKind::MONGO_DB;
default:
ythrow yexception() << "Unknown database type: " << ToString(databaseType);
}
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h
index ed2f5f2f631..e1eeee40b82 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/database_type.h
@@ -21,7 +21,8 @@ enum class EDatabaseType {
Solomon,
Iceberg,
Redis,
- Prometheus
+ Prometheus,
+ MongoDB
};
std::set<TString> GetAllExternalDataSourceTypes();
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp
index 0bc488f55bb..fe129ecd551 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp
@@ -54,7 +54,8 @@ namespace NYql::NDq {
"LoggingGeneric",
"IcebergGeneric",
"RedisGeneric",
- "PrometheusGeneric"}) {
+ "PrometheusGeneric",
+ "MongoDBGeneric"}) {
factory.RegisterSource<Generic::TSource>(name, readActorFactory);
factory.RegisterLookupSource<Generic::TLookupSource>(name, lookupActorFactory);
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp
index 8443cd7a953..3e45863e100 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp
@@ -120,36 +120,6 @@ namespace NYql {
clusterConfig.SetDatabaseName(it->second);
}
- void ParseSchema(const THashMap<TString, TString>& properties,
- NYql::TGenericClusterConfig& clusterConfig) {
- auto it = properties.find("schema");
- if (it == properties.cend()) {
- // SCHEMA is optional field
- return;
- }
-
- if (!it->second) {
- // SCHEMA is optional field
- return;
- }
-
- clusterConfig.mutable_datasourceoptions()->insert({TString("schema"), TString(it->second)});
- }
-
- void ParseServiceName(const THashMap<TString, TString>& properties,
- NYql::TGenericClusterConfig& clusterConfig) {
- auto it = properties.find("service_name");
- if (it == properties.cend()) {
- return;
- }
-
- if (!it->second) {
- return;
- }
-
- clusterConfig.mutable_datasourceoptions()->insert({TString("service_name"), TString(it->second)});
- }
-
void ParseMdbClusterId(const THashMap<TString, TString>& properties,
NYql::TGenericClusterConfig& clusterConfig) {
auto it = properties.find("mdb_cluster_id");
@@ -215,7 +185,8 @@ namespace NYql {
EGenericDataSourceKind::MS_SQL_SERVER,
EGenericDataSourceKind::ORACLE,
EGenericDataSourceKind::ICEBERG,
- EGenericDataSourceKind::REDIS
+ EGenericDataSourceKind::REDIS,
+ EGenericDataSourceKind::MONGO_DB
},
clusterConfig.GetKind()
)) {
@@ -277,20 +248,18 @@ namespace NYql {
clusterConfig.SetServiceAccountIdSignature(it->second);
}
- void ParseFolderId(const THashMap<TString, TString>& properties,
- NYql::TGenericClusterConfig& clusterConfig) {
- auto it = properties.find("folder_id");
+ void ParseOptionalField(const THashMap<TString, TString>& properties,
+ NYql::TGenericClusterConfig& clusterConfig, const TString& fieldName) {
+ auto it = properties.find(fieldName);
if (it == properties.cend()) {
- // FOLDER_ID is optional field
return;
}
if (!it->second) {
- // FOLDER_ID is optional field
return;
}
- clusterConfig.mutable_datasourceoptions()->insert({"folder_id", TString(it->second)});
+ clusterConfig.mutable_datasourceoptions()->insert({fieldName, TString(it->second)});
}
///
@@ -349,8 +318,6 @@ namespace NYql {
ParseLocation(properties, clusterConfig);
ParseUseTLS(properties, clusterConfig);
ParseDatabaseName(properties, clusterConfig);
- ParseSchema(properties, clusterConfig);
- ParseServiceName(properties, clusterConfig);
ParseMdbClusterId(properties, clusterConfig);
ParseDatabaseId(properties, clusterConfig);
ParseSourceType(properties, clusterConfig);
@@ -358,8 +325,13 @@ namespace NYql {
ParseServiceAccountId(properties, clusterConfig);
ParseServiceAccountIdSignature(properties, clusterConfig);
ParseToken(properties, clusterConfig);
- ParseFolderId(properties, clusterConfig);
ParseIcebergFields(properties, clusterConfig);
+ ParseOptionalField(properties, clusterConfig, "schema");
+ ParseOptionalField(properties, clusterConfig, "folder_id");
+ ParseOptionalField(properties, clusterConfig, "reading_mode");
+ ParseOptionalField(properties, clusterConfig, "service_name");
+ ParseOptionalField(properties, clusterConfig, "unexpected_type_display_mode");
+ ParseOptionalField(properties, clusterConfig, "unsupported_type_display_mode");
return clusterConfig;
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index ac520dd5623..4eed2213cfe 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -44,6 +44,8 @@ namespace NYql {
return "RedisGeneric";
case NYql::EGenericDataSourceKind::PROMETHEUS:
return "PrometheusGeneric";
+ case NYql::EGenericDataSourceKind::MONGO_DB:
+ return "MongoDBGeneric";
default:
throw yexception() << "Data source kind is unknown or not specified";
}
@@ -286,6 +288,9 @@ namespace NYql {
case NYql::EGenericDataSourceKind::ICEBERG:
properties["SourceType"] = "Iceberg";
break;
+ case NYql::EGenericDataSourceKind::MONGO_DB:
+ properties["SourceType"] = "MongoDB";
+ break;
case NYql::EGenericDataSourceKind::REDIS:
properties["SourceType"] = "Redis";
break;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index 672690fc4b1..5ac1e1809e3 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -98,7 +98,11 @@ namespace NYql {
TableDescriptions_.reserve(pendingTables.size());
for (const auto& tableAddress : pendingTables) {
- LoadTableMetadataFromConnector(tableAddress, handles);
+ auto tIssues = LoadTableMetadataFromConnector(tableAddress, handles);
+ if (!tIssues.Empty()) {
+ ctx.AddError(TIssue(tIssues.ToString()));
+ return TStatus::Error;
+ }
}
if (handles.empty()) {
@@ -111,22 +115,28 @@ namespace NYql {
// clang-format on
private:
- void LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress,
+ TIssues LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress,
std::vector<NThreading::TFuture<void>>& handles) {
const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(tableAddress.ClusterName);
YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it,
"cluster not found: " << tableAddress.ClusterName);
+ // preserve data source instance for the further usage
+ auto emplaceIt =
+ TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>());
+
+ auto desc = emplaceIt.first->second;
+
NConnector::NApi::TDescribeTableRequest request;
- FillDescribeTableRequest(request, it->second, tableAddress.TableName);
+ auto issues = FillDescribeTableRequest(request, it->second, tableAddress.TableName);
+
+ if (!issues.Empty()) {
+ return issues;
+ }
auto promise = NThreading::NewPromise();
handles.emplace_back(promise.GetFuture());
- // preserve data source instance for the further usage
- auto emplaceIt =
- TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>());
- auto desc = emplaceIt.first->second;
desc->DataSourceInstance = request.data_source_instance();
Y_ENSURE(State_->GenericClient);
@@ -217,6 +227,8 @@ namespace NYql {
});
});
});
+
+ return TIssues{};
}
public:
@@ -362,7 +374,7 @@ namespace NYql {
// clang-format on
}
- void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request,
+ TIssues FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request,
const TGenericClusterConfig& clusterConfig, const TString& tablePath) {
const auto dataSourceKind = clusterConfig.GetKind();
auto dsi = request.mutable_data_source_instance();
@@ -372,8 +384,14 @@ namespace NYql {
dsi->set_protocol(clusterConfig.GetProtocol());
FillCredentials(request, clusterConfig);
FillTypeMappingSettings(request);
- FillDataSourceOptions(request, clusterConfig);
+ auto issues = FillDataSourceOptions(request, clusterConfig);
+ if (!issues.Empty()) {
+ return issues;
+ }
+
FillTablePath(request, clusterConfig, tablePath);
+
+ return {};
}
void FillCredentials(NConnector::NApi::TDescribeTableRequest& request,
@@ -508,7 +526,39 @@ namespace NYql {
}
}
- void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request,
+ TIssues SetMongoDBOptions(NYql::TMongoDbDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) {
+ TIssues issues;
+ auto it = clusterConfig.GetDataSourceOptions().find("reading_mode");
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ TMongoDbDataSourceOptions_EReadingMode value = TMongoDbDataSourceOptions::READING_MODE_UNSPECIFIED;
+ if (!TMongoDbDataSourceOptions_EReadingMode_Parse(it->second, &value)) {
+ issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB reading_mode: " << it->second));
+ }
+ options.set_reading_mode(value);
+ }
+
+ it = clusterConfig.GetDataSourceOptions().find("unexpected_type_display_mode");
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode value = TMongoDbDataSourceOptions::UNEXPECTED_UNSPECIFIED;
+ if (!TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode_Parse(it->second, &value)) {
+ issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unexpected_type_display_mode: " << it->second));
+ }
+ options.set_unexpected_type_display_mode(value);
+ }
+
+ it = clusterConfig.GetDataSourceOptions().find("unsupported_type_display_mode");
+ if (it != clusterConfig.GetDataSourceOptions().end()) {
+ TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode value = TMongoDbDataSourceOptions::UNSUPPORTED_UNSPECIFIED;
+ if (!TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode_Parse(it->second, &value)) {
+ issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unsupported_type_display_mode: " << it->second));
+ }
+ options.set_unsupported_type_display_mode(value);
+ }
+
+ return issues;
+ }
+
+ TIssues FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request,
const TGenericClusterConfig& clusterConfig) {
const auto dataSourceKind = clusterConfig.GetKind();
switch (dataSourceKind) {
@@ -544,10 +594,16 @@ namespace NYql {
break;
case NYql::EGenericDataSourceKind::PROMETHEUS:
break;
+ case NYql::EGenericDataSourceKind::MONGO_DB: {
+ auto* options = request.mutable_data_source_instance()->mutable_mongodb_options();
+ return SetMongoDBOptions(*options, clusterConfig);
+ } break;
default:
throw yexception() << "Unexpected data source kind: '"
<< NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'";
}
+
+ return TIssues{};
}
void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) {