diff options
author | hcpp <hcpp@ydb.tech> | 2023-03-24 16:47:29 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-03-24 16:47:29 +0300 |
commit | 4a19ef5d96f0b6af337c7b562e8626f067bc0cf7 (patch) | |
tree | 015d9f1ecb3aa5dac8a2bc45989f258965dacce6 | |
parent | 0f069e47d7e56ada4522f5399165b4480807c26f (diff) | |
download | ydb-4a19ef5d96f0b6af337c7b562e8626f067bc0cf7.tar.gz |
projection types has been expanded
7 files changed, 172 insertions, 16 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index c2a8340a87..d5da28ab48 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -217,9 +217,18 @@ private: .Primitive(NYdb::EPrimitiveType::String) .Build(), NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Int32) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint32) + .Build(), + NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Int64) .Build(), NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint64) + .Build(), + NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Utf8) .Build() }; @@ -257,6 +266,9 @@ private: .Build(), NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Date) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Datetime) .Build() }; return ValidateProjectionType(columnType, columnName, availableTypes); @@ -275,13 +287,16 @@ private: .Build(), NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Date) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Date) .Build() }; return ValidateProjectionType(columnType, columnName, availableTypes); } static NYql::TIssues ValidateProjection(const NKikimrExternalSources::TSchema& schema, const TString& projection, const TVector<TString>& partitionedBy) { - auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy); // an exception is thrown if an error occurs + auto generator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns; for (const auto& column: generator->GetConfig().Rules) { projectionColumns[column.Name] = column.Type; @@ -308,6 +323,19 @@ private: } return issues; } + + static TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const NKikimrExternalSources::TSchema& schema) { + TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns; + for (const auto& column: schema.column()) { + if (column.has_type()) { + const auto& type = column.type(); + if (type.has_type_id()) { + dataSlotColumns[column.name()] = NYql::NUdf::GetDataSlot(type.type_id()); + } + } + } + return dataSlotColumns; + } }; } diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp index a470f55115..a0b12382aa 100644 --- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp +++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp @@ -16,9 +16,18 @@ NYql::TIssues ValidateIntegerProjectionType(const NYdb::TType& columnType, const .Primitive(NYdb::EPrimitiveType::String) .Build(), NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Int32) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint32) + .Build(), + NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Int64) .Build(), NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Uint64) + .Build(), + NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Utf8) .Build() }; @@ -56,6 +65,9 @@ NYql::TIssues ValidateCommonProjectionType(const NYdb::TType& columnType, const .Build(), NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Date) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Datetime) .Build() }; return ValidateProjectionType(columnType, columnName, availableTypes); @@ -74,11 +86,27 @@ NYql::TIssues ValidateDateProjectionType(const NYdb::TType& columnType, const TS .Build(), NYdb::TTypeBuilder{} .Primitive(NYdb::EPrimitiveType::Date) + .Build(), + NYdb::TTypeBuilder{} + .Primitive(NYdb::EPrimitiveType::Datetime) .Build() }; return ValidateProjectionType(columnType, columnName, availableTypes); } +TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const FederatedQuery::Schema& schema) { + TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns; + for (const auto& column: schema.column()) { + if (column.has_type()) { + const auto& type = column.type(); + if (type.has_type_id()) { + dataSlotColumns[column.name()] = NYql::NUdf::GetDataSlot(type.type_id()); + } + } + } + return dataSlotColumns; +} + } NYql::TIssues ValidateConnectionSetting(const FederatedQuery::ConnectionSetting& setting, const TSet<FederatedQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire) { @@ -288,7 +316,7 @@ NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, co } NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy) { - auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy); // an exception is thrown if an error occurs + auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns; for (const auto& column: generator->GetConfig().Rules) { projectionColumns[column.Name] = column.Type; diff --git a/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp b/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp index b80adb3665..c6070217ea 100644 --- a/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp +++ b/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp @@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(TGenerateTests) { "projection.code.values" : "0,1", "storage.location.template" : "/${city}/${code}/" } - )", {"city", "code"}, 2), yexception, "The limit on the number of paths has been reached: 2 of 2"); + )", {"city", "code"}, {}, 2), yexception, "The limit on the number of paths has been reached: 2 of 2"); } Y_UNIT_TEST(CheckClash) { @@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(TGenerateTests) { } Y_UNIT_TEST(CheckHiveFormat) { - auto generator = CreatePathGenerator({}, {"city", "code", "device_id"}, 1); + auto generator = CreatePathGenerator({}, {"city", "code", "device_id"}, {}, 1); auto rules = generator->GetRules(); UNIT_ASSERT_VALUES_EQUAL(rules.size(), 1); UNIT_ASSERT_VALUES_EQUAL(rules[0].ColumnValues.size(), 0); diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp index 7a293813b3..1081e1a60e 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp @@ -246,9 +246,11 @@ struct TPathGenerator: public IPathGenerator { TExplicitPartitioningConfig Config; TRules Rules; TMap<TString, TColumnPartitioningConfig> ColumnConfig; // column name -> config + TMap<TString, NUdf::EDataSlot> Columns; public: - TPathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, size_t pathsLimit) + TPathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns, size_t pathsLimit) + : Columns(columns) { try { ParsePartitioningRules(projection, partitionedBy); @@ -589,7 +591,7 @@ private: TString copyLocationTemplate = locationTemplate; const TString time = Strftime(rule.Format.c_str(), current); ReplaceAll(copyLocationTemplate, "${" + rule.Name + "}", time); - columnsWithValue.push_back(TColumnWithValue{.Name=rule.Name, .Type=NUdf::EDataSlot::Date, .Value=Strftime("%F", current)}); + columnsWithValue.push_back(CreateDateColumnWithValue(rule.Name, current)); DoGenerate(rules, copyLocationTemplate, columnsWithValue, result, pathsLimit, now, p + 1); columnsWithValue.pop_back(); @@ -665,7 +667,7 @@ private: for (int64_t i = rule.Min; i <= rule.Max; i += rule.Interval) { TString copyLocationTemplate = locationTemplate; ReplaceAll(copyLocationTemplate, "${" + rule.Name + "}", fmtInteger(rule.Digits, i)); - columnsWithValue.push_back(IPathGenerator::TColumnWithValue{.Name=rule.Name, .Type=NUdf::EDataSlot::Int64, .Value=ToString(i)}); + columnsWithValue.push_back(CreateIntegerColumnWithValue(rule.Name, i)); DoGenerate(rules, copyLocationTemplate, columnsWithValue, result, pathsLimit, now, p + 1); columnsWithValue.pop_back(); @@ -674,10 +676,63 @@ private: } } } + + IPathGenerator::TColumnWithValue CreateDateColumnWithValue(const TString& name, const TInstant& current) { + auto it = Columns.find(name); + auto slot = it == Columns.end() ? NUdf::EDataSlot::Date : it->second; + switch (slot) { + case NUdf::EDataSlot::Datetime: + return {.Name=name, .Type=NUdf::EDataSlot::Datetime, .Value=Strftime("%FT%TZ", current)}; + default: + break; + } + + return {.Name=name, .Type=NUdf::EDataSlot::Date, .Value=Strftime("%F", current)}; + } + + IPathGenerator::TColumnWithValue CreateIntegerColumnWithValue(const TString& name, int64_t value) { + auto it = Columns.find(name); + auto slot = it == Columns.end() ? NUdf::EDataSlot::Int64 : it->second; + switch (slot) { + case NUdf::EDataSlot::Int32: + CheckCastInt32(value, name); + return {.Name=name, .Type=NUdf::EDataSlot::Int32, .Value=ToString(value)}; + case NUdf::EDataSlot::Uint32: + CheckCastUint32(value, name); + return {.Name=name, .Type=NUdf::EDataSlot::Uint32, .Value=ToString(value)}; + case NUdf::EDataSlot::Uint64: + CheckCastUint64(value, name); + return {.Name=name, .Type=NUdf::EDataSlot::Uint64, .Value=ToString(value)}; + default: + break; + } + return {.Name=name, .Type=NUdf::EDataSlot::Int64, .Value=ToString(value)}; + } + + void CheckCastInt32(int64_t value, const TString& column) { + if (std::numeric_limits<int32_t>::min() <= value && value <= std::numeric_limits<int32_t>::max()) { + return; + } + ythrow yexception() << "The value " << value << " is not representable as an int32 type for column " << column; + } + + void CheckCastUint32(int64_t value, const TString& column) { + if (value >= 0 && value <= std::numeric_limits<uint32_t>::max()) { + return; + } + ythrow yexception() << "The value " << value << " is not representable as an uint32 type for column " << column; + } + + void CheckCastUint64(int64_t value, const TString& column) { + if (value >= 0) { + return; + } + ythrow yexception() << "The value " << value << " is not representable as an uint64 type for column " << column; + } }; -TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, size_t pathsLimit) { - return std::make_shared<TPathGenerator>(projection, partitionedBy, pathsLimit); +TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns, size_t pathsLimit) { + return std::make_shared<TPathGenerator>(projection, partitionedBy, columns, pathsLimit); } } diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h index 5049218bcd..992572bcc2 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h @@ -2,9 +2,12 @@ #include <ydb/library/yql/public/udf/udf_data_type.h> +#include <util/generic/map.h> #include <util/generic/string.h> + #include <vector> + namespace NYql::NPathGenerator { struct IPathGenerator { @@ -69,6 +72,6 @@ struct IPathGenerator { using TPathGeneratorPtr = std::shared_ptr<const IPathGenerator>; -TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, size_t pathsLimit = 2000); +TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 2000); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index c638bb4bf6..c303037478 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -92,7 +92,14 @@ public: , CommonTypes(CreateCommonAvailableTypes()) , EnumTypes(CreateEnumAvailableTypes()) , DateTypes(CreateDateAvailableTypes()) - {} + { + for (auto item: ColumnsType->GetItems()) { + auto type = item->GetItemType(); + if (type->GetKind() == ETypeAnnotationKind::Data) { + DataSlotColumns[TString{item->GetName()}] = type->Cast<TDataExprType>()->GetSlot(); + } + } + } bool ValidatePartitonBy(const std::vector<TString>& partitionedBy) { TSet<TString> partitionedByColumns{partitionedBy.begin(), partitionedBy.end()}; @@ -108,7 +115,7 @@ public: } bool ValidateProjection(const TString& projection, const std::vector<TString>& partitionedBy) { - auto generator = NPathGenerator::CreatePathGenerator(projection, partitionedBy); + auto generator = NPathGenerator::CreatePathGenerator(projection, partitionedBy, DataSlotColumns); TMap<TString, NPathGenerator::IPathGenerator::EType> projectionColumns; for (const auto& column: generator->GetConfig().Rules) { projectionColumns[column.Name] = column.Type; @@ -170,7 +177,10 @@ private: return { Ctx.MakeType<TDataExprType>(EDataSlot::String), Ctx.MakeType<TDataExprType>(EDataSlot::Utf8), - Ctx.MakeType<TDataExprType>(EDataSlot::Int64) + Ctx.MakeType<TDataExprType>(EDataSlot::Int64), + Ctx.MakeType<TDataExprType>(EDataSlot::Int32), + Ctx.MakeType<TDataExprType>(EDataSlot::Uint32), + Ctx.MakeType<TDataExprType>(EDataSlot::Uint64) }; } @@ -188,7 +198,8 @@ private: Ctx.MakeType<TDataExprType>(EDataSlot::Uint64), Ctx.MakeType<TDataExprType>(EDataSlot::Int32), Ctx.MakeType<TDataExprType>(EDataSlot::Uint32), - Ctx.MakeType<TDataExprType>(EDataSlot::Date) + Ctx.MakeType<TDataExprType>(EDataSlot::Date), + Ctx.MakeType<TDataExprType>(EDataSlot::Datetime) }; } @@ -197,7 +208,8 @@ private: Ctx.MakeType<TDataExprType>(EDataSlot::String), Ctx.MakeType<TDataExprType>(EDataSlot::Utf8), Ctx.MakeType<TDataExprType>(EDataSlot::Uint32), - Ctx.MakeType<TDataExprType>(EDataSlot::Date) + Ctx.MakeType<TDataExprType>(EDataSlot::Date), + Ctx.MakeType<TDataExprType>(EDataSlot::Datetime) }; } @@ -209,6 +221,7 @@ private: const TTypesContainer CommonTypes; const TTypesContainer EnumTypes; const TTypesContainer DateTypes; + TMap<TString, NUdf::EDataSlot> DataSlotColumns; }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 56e2b38c60..9dec3e3059 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -1,11 +1,13 @@ #include "yql_s3_provider_impl.h" #include "yql_s3_listing_strategy.h" +#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> +#include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/utils/log/log.h> @@ -606,6 +608,33 @@ private: return true; } + TMap<TString, NUdf::EDataSlot> GetDataSlotColumns(const TExprNode& schema, TExprContext& ctx) { + TMap<TString, NUdf::EDataSlot> columns; + auto types = schema.Child(1); + if (!types) { + return columns; + } + + TExprNode::TPtr holder; + if (types->Content() == "SqlTypeFromYson") { + auto type = NCommon::ParseTypeFromYson(types->Head().Content(), ctx, ctx.GetPosition(schema.Pos())); + holder = ExpandType(schema.Pos(), *type, ctx); + types = holder.Get(); + } + + for (size_t i = 0; i < types->ChildrenSize(); i++) { + const auto& column = types->Child(i); + const auto& name = column->Child(0); + const auto& type = column->Child(1)->Child(0); + auto slot = NKikimr::NUdf::FindDataSlot(type->Content()); + if (!slot) { + continue; + } + columns[TString{name->Content()}] = *slot; + } + return columns; + } + bool LaunchListsForNode(const TS3Read& read, TVector<NThreading::TFuture<NS3Lister::TListResult>>& futures, TExprContext& ctx) { const auto& settings = *read.Ref().Child(4); @@ -687,7 +716,7 @@ private: config.Columns = partitionedBy; config.SchemaTypeNode = schema->ChildPtr(1); if (!projection.empty()) { - config.Generator = CreatePathGenerator(projection, partitionedBy); + config.Generator = CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(*schema, ctx)); if (!ValidateProjection(projectionPos, config.Generator, partitionedBy, ctx)) { return false; } |