diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-08-09 10:34:48 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-08-09 11:37:15 +0300 |
commit | 67bdfaa8752032eac7ded95dc036061f8ddf2f4c (patch) | |
tree | 3dc4ce19221837029103d669096a23420db506fe | |
parent | b0a2cdad30a3d864a68684e19d082b2e2a0278df (diff) | |
download | ydb-67bdfaa8752032eac7ded95dc036061f8ddf2f4c.tar.gz |
fix error while SELECT from binding with partitioning
Fixed PARTITIONED_BY parameter in binding.
-rw-r--r-- | ydb/core/external_sources/external_source.h | 2 | ||||
-rw-r--r-- | ydb/core/external_sources/object_storage.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasource.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp | 62 |
4 files changed, 85 insertions, 14 deletions
diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h index bc5162b4ad1..60d1e6a067d 100644 --- a/ydb/core/external_sources/external_source.h +++ b/ydb/core/external_sources/external_source.h @@ -35,7 +35,7 @@ struct IExternalSource : public TThrRefBase { this data will be displayed in the viewer. Can throw an exception in case of an error */ - virtual TMap<TString, TString> GetParameters(const TString& content) const = 0; + virtual TMap<TString, TVector<TString>> GetParameters(const TString& content) const = 0; }; } diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index c12eae2b7fe..000ec785ec6 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -50,17 +50,21 @@ struct TObjectStorageExternalSource : public IExternalSource { return TString{NYql::S3ProviderName}; } - virtual TMap<TString, TString> GetParameters(const TString& content) const override { + virtual TMap<TString, TVector<TString>> GetParameters(const TString& content) const override { NKikimrExternalSources::TObjectStorage objectStorage; objectStorage.ParseFromStringOrThrow(content); - TMap<TString, TString> parameters{objectStorage.format_setting().begin(), objectStorage.format_setting().end()}; + TMap<TString, TVector<TString>> parameters; + for (const auto& [key, value] : objectStorage.format_setting()) { + parameters[key] = {value}; + } + if (objectStorage.format()) { - parameters["format"] = objectStorage.format(); + parameters["format"] = {objectStorage.format()}; } if (objectStorage.compression()) { - parameters["compression"] = objectStorage.compression(); + parameters["compression"] = {objectStorage.compression()}; } NSc::TValue projection; @@ -69,13 +73,14 @@ struct TObjectStorageExternalSource : public IExternalSource { } if (!projection.DictEmpty()) { - parameters["projection"] = projection.ToJson(); + parameters["projection"] = {projection.ToJson()}; } - NSc::TValue partitionedBy; - partitionedBy.AppendAll(objectStorage.partitioned_by()); - if (!partitionedBy.ArrayEmpty()) { - parameters["partitioned_by"] = partitionedBy.ToJson(); + if (!objectStorage.partitioned_by().empty()) { + parameters["partitioned_by"].reserve(objectStorage.partitioned_by().size()); + for (const TString& column : objectStorage.partitioned_by()) { + parameters["partitioned_by"].emplace_back(column); + } } return parameters; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index d18ac31c988..139731f4d32 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -50,10 +50,14 @@ TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ct auto userSchema = ctx.NewAtom(pos, "userschema"sv); items.emplace_back(ctx.NewList(pos, {userSchema, type, order})); - for (const auto& [key, value]: source->GetParameters(content)) { - auto keyAtom = ctx.NewAtom(pos, NormalizeName(key)); - auto valueAtom = ctx.NewAtom(pos, value); - items.emplace_back(ctx.NewList(pos, {keyAtom, valueAtom})); + for (const auto& [key, values]: source->GetParameters(content)) { + TExprNode::TListType children = {ctx.NewAtom(pos, NormalizeName(key))}; + children.reserve(values.size() + 1); + for (const TString& value : values) { + children.emplace_back(ctx.NewAtom(pos, value)); + } + + items.emplace_back(ctx.NewList(pos, std::move(children))); } return ctx.NewList(pos, std::move(items)); } diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp index 0ec78b37495..42608e7fa3c 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp @@ -905,6 +905,68 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(result.ColumnParser("value").GetUtf8(), "trololo"); UNIT_ASSERT(!result.TryNextRow()); } + + Y_UNIT_TEST(ExecuteScriptWithExternalTableResolveCheckPartitionedBy) { + using namespace fmt::literals; + const TString externalDataSourceName = "/Root/external_data_source"; + const TString externalTableName = "/Root/test_binding_resolve"; + const TString bucket = "test_bucket1"; + const TString object = "year=1/month=2/test_object"; + const TString content = "data,year,month\ntest,1,2"; + + CreateBucketWithObject(bucket, object, content); + + auto kikimr = DefaultKikimrRunner(); + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE `{external_table}` ( + data STRING NOT NULL, + year UINT32 NOT NULL, + month UINT32 NOT NULL + ) WITH ( + DATA_SOURCE="{external_source}", + LOCATION="/", + FORMAT="csv_with_names", + PARTITIONED_BY="[year, month]" + );)", + "external_source"_a = externalDataSourceName, + "external_table"_a = externalTableName, + "location"_a = GetBucketLocation(bucket) + ); + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr.GetQueryClient(); + const TString sql = fmt::format(R"( + SELECT * FROM `{external_table}` + )", "external_table"_a = externalTableName); + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 3); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("data").GetString(), "test"); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); + } } } // namespace NKqp |