aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2023-08-09 10:34:48 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2023-08-09 11:37:15 +0300
commit67bdfaa8752032eac7ded95dc036061f8ddf2f4c (patch)
tree3dc4ce19221837029103d669096a23420db506fe
parentb0a2cdad30a3d864a68684e19d082b2e2a0278df (diff)
downloadydb-67bdfaa8752032eac7ded95dc036061f8ddf2f4c.tar.gz
fix error while SELECT from binding with partitioning
Fixed PARTITIONED_BY parameter in binding.
-rw-r--r--ydb/core/external_sources/external_source.h2
-rw-r--r--ydb/core/external_sources/object_storage.cpp23
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp12
-rw-r--r--ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp62
4 files changed, 85 insertions, 14 deletions
diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h
index bc5162b4ad1..60d1e6a067d 100644
--- a/ydb/core/external_sources/external_source.h
+++ b/ydb/core/external_sources/external_source.h
@@ -35,7 +35,7 @@ struct IExternalSource : public TThrRefBase {
this data will be displayed in the viewer.
Can throw an exception in case of an error
*/
- virtual TMap<TString, TString> GetParameters(const TString& content) const = 0;
+ virtual TMap<TString, TVector<TString>> GetParameters(const TString& content) const = 0;
};
}
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index c12eae2b7fe..000ec785ec6 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -50,17 +50,21 @@ struct TObjectStorageExternalSource : public IExternalSource {
return TString{NYql::S3ProviderName};
}
- virtual TMap<TString, TString> GetParameters(const TString& content) const override {
+ virtual TMap<TString, TVector<TString>> GetParameters(const TString& content) const override {
NKikimrExternalSources::TObjectStorage objectStorage;
objectStorage.ParseFromStringOrThrow(content);
- TMap<TString, TString> parameters{objectStorage.format_setting().begin(), objectStorage.format_setting().end()};
+ TMap<TString, TVector<TString>> parameters;
+ for (const auto& [key, value] : objectStorage.format_setting()) {
+ parameters[key] = {value};
+ }
+
if (objectStorage.format()) {
- parameters["format"] = objectStorage.format();
+ parameters["format"] = {objectStorage.format()};
}
if (objectStorage.compression()) {
- parameters["compression"] = objectStorage.compression();
+ parameters["compression"] = {objectStorage.compression()};
}
NSc::TValue projection;
@@ -69,13 +73,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
if (!projection.DictEmpty()) {
- parameters["projection"] = projection.ToJson();
+ parameters["projection"] = {projection.ToJson()};
}
- NSc::TValue partitionedBy;
- partitionedBy.AppendAll(objectStorage.partitioned_by());
- if (!partitionedBy.ArrayEmpty()) {
- parameters["partitioned_by"] = partitionedBy.ToJson();
+ if (!objectStorage.partitioned_by().empty()) {
+ parameters["partitioned_by"].reserve(objectStorage.partitioned_by().size());
+ for (const TString& column : objectStorage.partitioned_by()) {
+ parameters["partitioned_by"].emplace_back(column);
+ }
}
return parameters;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index d18ac31c988..139731f4d32 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -50,10 +50,14 @@ TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ct
auto userSchema = ctx.NewAtom(pos, "userschema"sv);
items.emplace_back(ctx.NewList(pos, {userSchema, type, order}));
- for (const auto& [key, value]: source->GetParameters(content)) {
- auto keyAtom = ctx.NewAtom(pos, NormalizeName(key));
- auto valueAtom = ctx.NewAtom(pos, value);
- items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom}));
+ for (const auto& [key, values]: source->GetParameters(content)) {
+ TExprNode::TListType children = {ctx.NewAtom(pos, NormalizeName(key))};
+ children.reserve(values.size() + 1);
+ for (const TString& value : values) {
+ children.emplace_back(ctx.NewAtom(pos, value));
+ }
+
+ items.emplace_back(ctx.NewList(pos, std::move(children)));
}
return ctx.NewList(pos, std::move(items));
}
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
index 0ec78b37495..42608e7fa3c 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
@@ -905,6 +905,68 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("value").GetUtf8(), "trololo");
UNIT_ASSERT(!result.TryNextRow());
}
+
+ Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPartitionedBy) {
+ using namespace fmt::literals;
+ const TString externalDataSourceName = "/Root/external_data_source";
+ const TString externalTableName = "/Root/test_binding_resolve";
+ const TString bucket = "test_bucket1";
+ const TString object = "year=1/month=2/test_object";
+ const TString content = "data,year,month\ntest,1,2";
+
+ CreateBucketWithObject(bucket, object, content);
+
+ auto kikimr = DefaultKikimrRunner();
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );
+ CREATE EXTERNAL TABLE `{external_table}` (
+ data STRING NOT NULL,
+ year UINT32 NOT NULL,
+ month UINT32 NOT NULL
+ ) WITH (
+ DATA_SOURCE="{external_source}",
+ LOCATION="/",
+ FORMAT="csv_with_names",
+ PARTITIONED_BY="[year, month]"
+ );)",
+ "external_source"_a = externalDataSourceName,
+ "external_table"_a = externalTableName,
+ "location"_a = GetBucketLocation(bucket)
+ );
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto db = kikimr.GetQueryClient();
+ const TString sql = fmt::format(R"(
+ SELECT * FROM `{external_table}`
+ )", "external_table"_a = externalTableName);
+
+ auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), "test");
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2);
+ }
}
} // namespace NKqp