aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-03-24 16:47:29 +0300
committerhcpp <hcpp@ydb.tech>2023-03-24 16:47:29 +0300
commit4a19ef5d96f0b6af337c7b562e8626f067bc0cf7 (patch)
tree015d9f1ecb3aa5dac8a2bc45989f258965dacce6
parent0f069e47d7e56ada4522f5399165b4480807c26f (diff)
downloadydb-4a19ef5d96f0b6af337c7b562e8626f067bc0cf7.tar.gz
projection types has been expanded
-rw-r--r--ydb/core/external_sources/object_storage.cpp30
-rw-r--r--ydb/core/fq/libs/control_plane_storage/request_validators.cpp30
-rw-r--r--ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp4
-rw-r--r--ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp65
-rw-r--r--ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h5
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp23
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp31
7 files changed, 172 insertions, 16 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index c2a8340a87..d5da28ab48 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -217,9 +217,18 @@ private:
.Primitive(NYdb::EPrimitiveType::String)
.Build(),
NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Int32)
+ .Build(),
+ NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Uint32)
+ .Build(),
+ NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Int64)
.Build(),
NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Uint64)
+ .Build(),
+ NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Utf8)
.Build()
};
@@ -257,6 +266,9 @@ private:
.Build(),
NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Date)
+ .Build(),
+ NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Datetime)
.Build()
};
return ValidateProjectionType(columnType, columnName, availableTypes);
@@ -275,13 +287,16 @@ private:
.Build(),
NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Date)
+ .Build(),
+ NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Date)
.Build()
};
return ValidateProjectionType(columnType, columnName, availableTypes);
}
static NYql::TIssues ValidateProjection(const NKikimrExternalSources::TSchema& schema, const TString& projection, const TVector<TString>& partitionedBy) {
- auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy); // an exception is thrown if an error occurs
+ auto generator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs
TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns;
for (const auto& column: generator->GetConfig().Rules) {
projectionColumns[column.Name] = column.Type;
@@ -308,6 +323,19 @@ private:
}
return issues;
}
+
+ static TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const NKikimrExternalSources::TSchema& schema) {
+ TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns;
+ for (const auto& column: schema.column()) {
+ if (column.has_type()) {
+ const auto& type = column.type();
+ if (type.has_type_id()) {
+ dataSlotColumns[column.name()] = NYql::NUdf::GetDataSlot(type.type_id());
+ }
+ }
+ }
+ return dataSlotColumns;
+ }
};
}
diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
index a470f55115..a0b12382aa 100644
--- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
@@ -16,9 +16,18 @@ NYql::TIssues ValidateIntegerProjectionType(const NYdb::TType& columnType, const
.Primitive(NYdb::EPrimitiveType::String)
.Build(),
NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Int32)
+ .Build(),
+ NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Uint32)
+ .Build(),
+ NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Int64)
.Build(),
NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Uint64)
+ .Build(),
+ NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Utf8)
.Build()
};
@@ -56,6 +65,9 @@ NYql::TIssues ValidateCommonProjectionType(const NYdb::TType& columnType, const
.Build(),
NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Date)
+ .Build(),
+ NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Datetime)
.Build()
};
return ValidateProjectionType(columnType, columnName, availableTypes);
@@ -74,11 +86,27 @@ NYql::TIssues ValidateDateProjectionType(const NYdb::TType& columnType, const TS
.Build(),
NYdb::TTypeBuilder{}
.Primitive(NYdb::EPrimitiveType::Date)
+ .Build(),
+ NYdb::TTypeBuilder{}
+ .Primitive(NYdb::EPrimitiveType::Datetime)
.Build()
};
return ValidateProjectionType(columnType, columnName, availableTypes);
}
+TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const FederatedQuery::Schema& schema) {
+ TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns;
+ for (const auto& column: schema.column()) {
+ if (column.has_type()) {
+ const auto& type = column.type();
+ if (type.has_type_id()) {
+ dataSlotColumns[column.name()] = NYql::NUdf::GetDataSlot(type.type_id());
+ }
+ }
+ }
+ return dataSlotColumns;
+}
+
}
NYql::TIssues ValidateConnectionSetting(const FederatedQuery::ConnectionSetting& setting, const TSet<FederatedQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire) {
@@ -288,7 +316,7 @@ NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, co
}
NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy) {
- auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy); // an exception is thrown if an error occurs
+ auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs
TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns;
for (const auto& column: generator->GetConfig().Rules) {
projectionColumns[column.Name] = column.Type;
diff --git a/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp b/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp
index b80adb3665..c6070217ea 100644
--- a/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp
+++ b/ydb/library/yql/providers/s3/path_generator/ut/yql_generate_partitioning_rules_ut.cpp
@@ -110,7 +110,7 @@ Y_UNIT_TEST_SUITE(TGenerateTests) {
"projection.code.values" : "0,1",
"storage.location.template" : "/${city}/${code}/"
}
- )", {"city", "code"}, 2), yexception, "The limit on the number of paths has been reached: 2 of 2");
+ )", {"city", "code"}, {}, 2), yexception, "The limit on the number of paths has been reached: 2 of 2");
}
Y_UNIT_TEST(CheckClash) {
@@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(TGenerateTests) {
}
Y_UNIT_TEST(CheckHiveFormat) {
- auto generator = CreatePathGenerator({}, {"city", "code", "device_id"}, 1);
+ auto generator = CreatePathGenerator({}, {"city", "code", "device_id"}, {}, 1);
auto rules = generator->GetRules();
UNIT_ASSERT_VALUES_EQUAL(rules.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(rules[0].ColumnValues.size(), 0);
diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
index 7a293813b3..1081e1a60e 100644
--- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
+++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
@@ -246,9 +246,11 @@ struct TPathGenerator: public IPathGenerator {
TExplicitPartitioningConfig Config;
TRules Rules;
TMap<TString, TColumnPartitioningConfig> ColumnConfig; // column name -> config
+ TMap<TString, NUdf::EDataSlot> Columns;
public:
- TPathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, size_t pathsLimit)
+ TPathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns, size_t pathsLimit)
+ : Columns(columns)
{
try {
ParsePartitioningRules(projection, partitionedBy);
@@ -589,7 +591,7 @@ private:
TString copyLocationTemplate = locationTemplate;
const TString time = Strftime(rule.Format.c_str(), current);
ReplaceAll(copyLocationTemplate, "${" + rule.Name + "}", time);
- columnsWithValue.push_back(TColumnWithValue{.Name=rule.Name, .Type=NUdf::EDataSlot::Date, .Value=Strftime("%F", current)});
+ columnsWithValue.push_back(CreateDateColumnWithValue(rule.Name, current));
DoGenerate(rules, copyLocationTemplate, columnsWithValue, result, pathsLimit, now, p + 1);
columnsWithValue.pop_back();
@@ -665,7 +667,7 @@ private:
for (int64_t i = rule.Min; i <= rule.Max; i += rule.Interval) {
TString copyLocationTemplate = locationTemplate;
ReplaceAll(copyLocationTemplate, "${" + rule.Name + "}", fmtInteger(rule.Digits, i));
- columnsWithValue.push_back(IPathGenerator::TColumnWithValue{.Name=rule.Name, .Type=NUdf::EDataSlot::Int64, .Value=ToString(i)});
+ columnsWithValue.push_back(CreateIntegerColumnWithValue(rule.Name, i));
DoGenerate(rules, copyLocationTemplate, columnsWithValue, result, pathsLimit, now, p + 1);
columnsWithValue.pop_back();
@@ -674,10 +676,63 @@ private:
}
}
}
+
+ IPathGenerator::TColumnWithValue CreateDateColumnWithValue(const TString& name, const TInstant& current) {
+ auto it = Columns.find(name);
+ auto slot = it == Columns.end() ? NUdf::EDataSlot::Date : it->second;
+ switch (slot) {
+ case NUdf::EDataSlot::Datetime:
+ return {.Name=name, .Type=NUdf::EDataSlot::Datetime, .Value=Strftime("%FT%TZ", current)};
+ default:
+ break;
+ }
+
+ return {.Name=name, .Type=NUdf::EDataSlot::Date, .Value=Strftime("%F", current)};
+ }
+
+ IPathGenerator::TColumnWithValue CreateIntegerColumnWithValue(const TString& name, int64_t value) {
+ auto it = Columns.find(name);
+ auto slot = it == Columns.end() ? NUdf::EDataSlot::Int64 : it->second;
+ switch (slot) {
+ case NUdf::EDataSlot::Int32:
+ CheckCastInt32(value, name);
+ return {.Name=name, .Type=NUdf::EDataSlot::Int32, .Value=ToString(value)};
+ case NUdf::EDataSlot::Uint32:
+ CheckCastUint32(value, name);
+ return {.Name=name, .Type=NUdf::EDataSlot::Uint32, .Value=ToString(value)};
+ case NUdf::EDataSlot::Uint64:
+ CheckCastUint64(value, name);
+ return {.Name=name, .Type=NUdf::EDataSlot::Uint64, .Value=ToString(value)};
+ default:
+ break;
+ }
+ return {.Name=name, .Type=NUdf::EDataSlot::Int64, .Value=ToString(value)};
+ }
+
+ void CheckCastInt32(int64_t value, const TString& column) {
+ if (std::numeric_limits<int32_t>::min() <= value && value <= std::numeric_limits<int32_t>::max()) {
+ return;
+ }
+ ythrow yexception() << "The value " << value << " is not representable as an int32 type for column " << column;
+ }
+
+ void CheckCastUint32(int64_t value, const TString& column) {
+ if (value >= 0 && value <= std::numeric_limits<uint32_t>::max()) {
+ return;
+ }
+ ythrow yexception() << "The value " << value << " is not representable as an uint32 type for column " << column;
+ }
+
+ void CheckCastUint64(int64_t value, const TString& column) {
+ if (value >= 0) {
+ return;
+ }
+ ythrow yexception() << "The value " << value << " is not representable as an uint64 type for column " << column;
+ }
};
-TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, size_t pathsLimit) {
- return std::make_shared<TPathGenerator>(projection, partitionedBy, pathsLimit);
+TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns, size_t pathsLimit) {
+ return std::make_shared<TPathGenerator>(projection, partitionedBy, columns, pathsLimit);
}
}
diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
index 5049218bcd..992572bcc2 100644
--- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
+++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
@@ -2,9 +2,12 @@
#include <ydb/library/yql/public/udf/udf_data_type.h>
+#include <util/generic/map.h>
#include <util/generic/string.h>
+
#include <vector>
+
namespace NYql::NPathGenerator {
struct IPathGenerator {
@@ -69,6 +72,6 @@ struct IPathGenerator {
using TPathGeneratorPtr = std::shared_ptr<const IPathGenerator>;
-TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, size_t pathsLimit = 2000);
+TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 2000);
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index c638bb4bf6..c303037478 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -92,7 +92,14 @@ public:
, CommonTypes(CreateCommonAvailableTypes())
, EnumTypes(CreateEnumAvailableTypes())
, DateTypes(CreateDateAvailableTypes())
- {}
+ {
+ for (auto item: ColumnsType->GetItems()) {
+ auto type = item->GetItemType();
+ if (type->GetKind() == ETypeAnnotationKind::Data) {
+ DataSlotColumns[TString{item->GetName()}] = type->Cast<TDataExprType>()->GetSlot();
+ }
+ }
+ }
bool ValidatePartitonBy(const std::vector<TString>& partitionedBy) {
TSet<TString> partitionedByColumns{partitionedBy.begin(), partitionedBy.end()};
@@ -108,7 +115,7 @@ public:
}
bool ValidateProjection(const TString& projection, const std::vector<TString>& partitionedBy) {
- auto generator = NPathGenerator::CreatePathGenerator(projection, partitionedBy);
+ auto generator = NPathGenerator::CreatePathGenerator(projection, partitionedBy, DataSlotColumns);
TMap<TString, NPathGenerator::IPathGenerator::EType> projectionColumns;
for (const auto& column: generator->GetConfig().Rules) {
projectionColumns[column.Name] = column.Type;
@@ -170,7 +177,10 @@ private:
return {
Ctx.MakeType<TDataExprType>(EDataSlot::String),
Ctx.MakeType<TDataExprType>(EDataSlot::Utf8),
- Ctx.MakeType<TDataExprType>(EDataSlot::Int64)
+ Ctx.MakeType<TDataExprType>(EDataSlot::Int64),
+ Ctx.MakeType<TDataExprType>(EDataSlot::Int32),
+ Ctx.MakeType<TDataExprType>(EDataSlot::Uint32),
+ Ctx.MakeType<TDataExprType>(EDataSlot::Uint64)
};
}
@@ -188,7 +198,8 @@ private:
Ctx.MakeType<TDataExprType>(EDataSlot::Uint64),
Ctx.MakeType<TDataExprType>(EDataSlot::Int32),
Ctx.MakeType<TDataExprType>(EDataSlot::Uint32),
- Ctx.MakeType<TDataExprType>(EDataSlot::Date)
+ Ctx.MakeType<TDataExprType>(EDataSlot::Date),
+ Ctx.MakeType<TDataExprType>(EDataSlot::Datetime)
};
}
@@ -197,7 +208,8 @@ private:
Ctx.MakeType<TDataExprType>(EDataSlot::String),
Ctx.MakeType<TDataExprType>(EDataSlot::Utf8),
Ctx.MakeType<TDataExprType>(EDataSlot::Uint32),
- Ctx.MakeType<TDataExprType>(EDataSlot::Date)
+ Ctx.MakeType<TDataExprType>(EDataSlot::Date),
+ Ctx.MakeType<TDataExprType>(EDataSlot::Datetime)
};
}
@@ -209,6 +221,7 @@ private:
const TTypesContainer CommonTypes;
const TTypesContainer EnumTypes;
const TTypesContainer DateTypes;
+ TMap<TString, NUdf::EDataSlot> DataSlotColumns;
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 56e2b38c60..9dec3e3059 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -1,11 +1,13 @@
#include "yql_s3_provider_impl.h"
#include "yql_s3_listing_strategy.h"
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
+#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -606,6 +608,33 @@ private:
return true;
}
+ TMap<TString, NUdf::EDataSlot> GetDataSlotColumns(const TExprNode& schema, TExprContext& ctx) {
+ TMap<TString, NUdf::EDataSlot> columns;
+ auto types = schema.Child(1);
+ if (!types) {
+ return columns;
+ }
+
+ TExprNode::TPtr holder;
+ if (types->Content() == "SqlTypeFromYson") {
+ auto type = NCommon::ParseTypeFromYson(types->Head().Content(), ctx, ctx.GetPosition(schema.Pos()));
+ holder = ExpandType(schema.Pos(), *type, ctx);
+ types = holder.Get();
+ }
+
+ for (size_t i = 0; i < types->ChildrenSize(); i++) {
+ const auto& column = types->Child(i);
+ const auto& name = column->Child(0);
+ const auto& type = column->Child(1)->Child(0);
+ auto slot = NKikimr::NUdf::FindDataSlot(type->Content());
+ if (!slot) {
+ continue;
+ }
+ columns[TString{name->Content()}] = *slot;
+ }
+ return columns;
+ }
+
bool LaunchListsForNode(const TS3Read& read, TVector<NThreading::TFuture<NS3Lister::TListResult>>& futures, TExprContext& ctx) {
const auto& settings = *read.Ref().Child(4);
@@ -687,7 +716,7 @@ private:
config.Columns = partitionedBy;
config.SchemaTypeNode = schema->ChildPtr(1);
if (!projection.empty()) {
- config.Generator = CreatePathGenerator(projection, partitionedBy);
+ config.Generator = CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(*schema, ctx));
if (!ValidateProjection(projectionPos, config.Generator, partitionedBy, ctx)) {
return false;
}