diff options
author | hcpp <hcpp@ydb.tech> | 2022-07-15 17:26:03 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-07-15 17:26:03 +0300 |
commit | a968df509e4c63cede6d1c98ccaed1f1d32549cc (patch) | |
tree | 862b8d5c4474b2abf31e44bb46cbb768287e5395 | |
parent | 9d445bb0614662792baefa488b38e1ca78a3dd38 (diff) | |
download | ydb-a968df509e4c63cede6d1c98ccaed1f1d32549cc.tar.gz |
interval.unit has been supproted for s3
18 files changed, 357 insertions, 19 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 9d2fdc8ade..d6157ca785 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1324,6 +1324,7 @@ add_subdirectory(ydb/library/yql/providers/common/schema/skiff) add_subdirectory(ydb/library/yql/providers/common/ut_helpers) add_subdirectory(ydb/library/yql/providers/s3/compressors) add_subdirectory(ydb/library/yql/providers/s3/path_generator) +add_subdirectory(ydb/library/yql/providers/s3/serializations) add_subdirectory(ydb/library/yql/providers/function/common) add_subdirectory(ydb/library/yql/providers/function/expr_nodes) add_subdirectory(ydb/library/yql/providers/function/gateway) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index bf0464e09c..faad06f747 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -913,6 +913,7 @@ add_subdirectory(contrib/libs/poco/JSON) add_subdirectory(contrib/libs/poco/XML) add_subdirectory(ydb/library/yql/providers/s3/compressors) add_subdirectory(contrib/libs/lzma) +add_subdirectory(ydb/library/yql/providers/s3/serializations) add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client) add_subdirectory(ydb/library/yql/public/udf/support) add_subdirectory(contrib/restricted/boost/libs/program_options) diff --git a/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp b/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp index e353a194a4..fb010679b3 100644 --- a/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp +++ b/ydb/core/yq/libs/actors/table_bindings_from_bindings.cpp @@ -16,6 +16,7 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande TString format; TString compression; TString schema; + THashMap<TString, TString> formatSettings; switch (binding.content().setting().binding_case()) { case YandexQuery::BindingSetting::kDataStreams: { clusterType = PqProviderName; @@ -24,6 +25,7 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande format = yds.format(); compression = yds.compression(); schema = FormatSchema(yds.schema()); + formatSettings = {yds.format_setting().begin(), yds.format_setting().end()}; break; } case YandexQuery::BindingSetting::kObjectStorage: { @@ -38,6 +40,7 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande format = s.format(); compression = s.compression(); schema = FormatSchema(s.schema()); + formatSettings = {s.format_setting().begin(), s.format_setting().end()}; break; } @@ -54,6 +57,7 @@ void FillBinding(NSQLTranslation::TTranslationSettings& sqlSettings, const Yande NSQLTranslation::TTableBindingSettings bindSettings; bindSettings.ClusterType = clusterType; + bindSettings.Settings = formatSettings; bindSettings.Settings["cluster"] = connectionPtr->content().name(); bindSettings.Settings["path"] = path; bindSettings.Settings["format"] = format; diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp index 861d90e7d9..c182610254 100644 --- a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp @@ -105,4 +105,18 @@ NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& se return issues; } +NYql::TIssues ValidateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting) { + NYql::TIssues issues; + for (const auto& [key, value]: formatSetting) { + if (key == "data.interval.unit") { + if (!IsValidIntervalUnit(value)) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.interval.unit " + value)); + } + } else { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key)); + } + } + return issues; +} + } // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h index 0e91b2b03f..8510a53a4b 100644 --- a/ydb/core/yq/libs/control_plane_storage/request_validators.h +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h @@ -1,5 +1,7 @@ #pragma once +#include "util.h" + #include <ydb/core/yq/libs/config/yq_issue.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/public/api/protos/yq.pb.h> @@ -71,8 +73,10 @@ NYql::TIssues ValidateQuery(T& ev, size_t maxSize) return issues; } +NYql::TIssues ValidateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting); + template<typename T> - NYql::TIssues ValidateBinding(T& ev, size_t maxSize, const TSet<YandexQuery::BindingSetting::BindingCase>& availableBindings) +NYql::TIssues ValidateBinding(T& ev, size_t maxSize, const TSet<YandexQuery::BindingSetting::BindingCase>& availableBindings) { const auto& request = ev->Get()->Request; NYql::TIssues issues = ValidateEvent(ev, maxSize); @@ -102,6 +106,7 @@ template<typename T> if (!dataStreams.has_schema()) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden")); } + issues.AddIssues(ValidateFormatSetting(dataStreams.format_setting())); break; } case YandexQuery::BindingSetting::BINDING_NOT_SET: { @@ -110,6 +115,10 @@ template<typename T> } // Do not replace with default. Adding a new binding should cause a compilation error case YandexQuery::BindingSetting::kObjectStorage: + const YandexQuery::ObjectStorageBinding objectStorage = setting.object_storage(); + for (const auto& subset: objectStorage.subset()) { + issues.AddIssues(ValidateFormatSetting(subset.format_setting())); + } break; } } else { diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index e11bd64187..f73e0e4ce9 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -175,4 +175,17 @@ std::pair<TString, TString> SplitId(const TString& id, char delim) { (it != id.end() ? id.substr(it - id.begin() + 1) : TString{""})); } +bool IsValidIntervalUnit(const TString& unit) { + static constexpr std::array<std::string_view, 10> IntervalUnits = { + "MICROSECONDS"sv, + "MILLISECONDS"sv, + "SECONDS"sv, + "MINUTES"sv, + "HOURS"sv, + "DAYS"sv, + "WEEKS"sv + }; + return IsIn(IntervalUnits, unit); +} + } //namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h index fa721cb2bd..d14f27bcc8 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.h +++ b/ydb/core/yq/libs/control_plane_storage/util.h @@ -44,4 +44,6 @@ NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items); std::pair<TString, TString> SplitId(const TString& id, char delim = '-'); +bool IsValidIntervalUnit(const TString& unit); + } // namespace NYq diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 8d88cfaacb..b25a966511 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -40,6 +40,15 @@ namespace { "bzip2"sv, "xz"sv }; + constexpr std::array<std::string_view, 10> IntervalUnits = { + "MICROSECONDS"sv, + "MILLISECONDS"sv, + "SECONDS"sv, + "MINUTES"sv, + "HOURS"sv, + "DAYS"sv, + "WEEKS"sv + }; } // namespace bool TCommitSettings::EnsureModeEmpty(TExprContext& ctx) { @@ -1098,5 +1107,14 @@ bool ValidateFormat(TStringBuf format, TExprContext& ctx) { return false; } +bool ValidateIntervalUnit(TStringBuf unit, TExprContext& ctx) { + if (unit.empty() || IsIn(IntervalUnits, unit)) { + return true; + } + ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << unit + << ". Use one of: " << JoinSeq(", ", IntervalUnits))); + return false; +} + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index 2c2a38a9b5..a8e22dbf2b 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -127,5 +127,7 @@ bool ValidateCompression(TStringBuf compression, TExprContext& ctx); bool ValidateFormat(TStringBuf format, TExprContext& ctx); +bool ValidateIntervalUnit(TStringBuf format, TExprContext& ctx); + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index d1802d1ec9..73edb9f346 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -22,14 +22,15 @@ target_link_libraries(providers-s3-actors PUBLIC contrib-libs-fmt libs-poco-Util cpp-xml-document - yql-minikql-computation - common-token_accessor-client - common-schema-mkql - yql-public-types dq-actors-compute + yql-minikql-computation providers-common-http_gateway - providers-s3-proto + common-schema-mkql + common-token_accessor-client providers-s3-compressors + providers-s3-proto + providers-s3-serializations + yql-public-types clickhouse_client_udf ) target_sources(providers-s3-actors PRIVATE diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index b34926d088..1d0cc239cd 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1,14 +1,15 @@ #ifdef __linux__ -#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeEnum.h> -#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypesNumber.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeArray.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeDate.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeEnum.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeFactory.h> -#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeArray.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeInterval.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeNothing.h> -#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeTuple.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeNullable.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeString.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeTuple.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeUUID.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypesNumber.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/Block.h> @@ -32,6 +33,7 @@ #include <ydb/library/yql/providers/s3/compressors/factory.h> #include <ydb/library/yql/providers/s3/proto/range.pb.h> +#include <ydb/library/yql/providers/s3/serializations/serialization_interval.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -590,20 +592,20 @@ private: using namespace NKikimr::NMiniKQL; -NDB::DataTypePtr MetaToClickHouse(const TType* type) { +NDB::DataTypePtr MetaToClickHouse(const TType* type, NSerialization::TSerializationInterval::EUnit unit) { switch (type->GetKind()) { case TType::EKind::EmptyList: return std::make_shared<NDB::DataTypeArray>(std::make_shared<NDB::DataTypeNothing>()); case TType::EKind::Optional: - return makeNullable(MetaToClickHouse(static_cast<const TOptionalType*>(type)->GetItemType())); + return makeNullable(MetaToClickHouse(static_cast<const TOptionalType*>(type)->GetItemType(), unit)); case TType::EKind::List: - return std::make_shared<NDB::DataTypeArray>(MetaToClickHouse(static_cast<const TListType*>(type)->GetItemType())); + return std::make_shared<NDB::DataTypeArray>(MetaToClickHouse(static_cast<const TListType*>(type)->GetItemType(), unit)); case TType::EKind::Tuple: { const auto tupleType = static_cast<const TTupleType*>(type); NDB::DataTypes elems; elems.reserve(tupleType->GetElementsCount()); for (auto i = 0U; i < tupleType->GetElementsCount(); ++i) - elems.emplace_back(MetaToClickHouse(tupleType->GetElementType(i))); + elems.emplace_back(MetaToClickHouse(tupleType->GetElementType(i), unit)); return std::make_shared<NDB::DataTypeTuple>(elems); } case TType::EKind::Data: { @@ -642,6 +644,8 @@ NDB::DataTypePtr MetaToClickHouse(const TType* type) { return std::make_shared<NDB::DataTypeDateTime>(); case NUdf::EDataSlot::Uuid: return std::make_shared<NDB::DataTypeUUID>(); + case NUdf::EDataSlot::Interval: + return NSerialization::GetInterval(unit); default: break; } @@ -702,6 +706,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( addPathIndex = FromString<bool>(it->second); } + NYql::NSerialization::TSerializationInterval::EUnit intervalUnit = NYql::NSerialization::TSerializationInterval::EUnit::MICROSECONDS; + if (auto it = settings.find("data.interval.unit"); it != settings.cend()) { + intervalUnit = NYql::NSerialization::TSerializationInterval::ToUnit(it->second); + } + if (params.HasFormat() && params.HasRowType()) { const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry); const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr); @@ -711,7 +720,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( readSpec->Columns.resize(structType->GetMembersCount()); for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) { auto& colsumn = readSpec->Columns[i]; - colsumn.type = MetaToClickHouse(structType->GetMemberType(i)); + colsumn.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit); colsumn.name = structType->GetMemberName(i); } readSpec->Format = params.GetFormat(); diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp index 98d5c5834a..60877b38fd 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp @@ -292,9 +292,9 @@ TDuration FromUnit(int64_t interval, EIntervalUnit unit) { case EIntervalUnit::WEEKS: return TDuration::Days(interval * 7); case EIntervalUnit::MONTHS: - return TDuration::Days(interval * 30); + return TDuration::Seconds(interval * 2629746LL); /// Exactly 1/12 of a year. case EIntervalUnit::YEARS: - return TDuration::Days(interval * 365); + return TDuration::Seconds(interval * 31556952LL); /// The average length of a Gregorian year is equal to 365.2425 days default: ythrow yexception() << "Only the " << GetEnumAllNames<EIntervalUnit>() << " units are supported but got " << unit; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 505fe7410e..4c04fe5a37 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -241,7 +241,7 @@ public: if (input->ChildrenSize() > TS3Object::idx_Settings) { auto validator = [](TStringBuf name, const TExprNode& setting, TExprContext& ctx) { - if ((name == "compression" || name == "projection") && setting.ChildrenSize() != 2) { + if ((name == "compression" || name == "projection" || name == "data.interval.unit") && setting.ChildrenSize() != 2) { ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()), TStringBuilder() << "Expected single value setting for " << name << ", but got " << setting.ChildrenSize() - 1)); return false; @@ -288,6 +288,24 @@ public: return true; } + if (name == "data.interval.unit") { + auto& value = setting.Tail(); + TStringBuf unit; + if (value.IsAtom()) { + unit = value.Content(); + } else { + if (!EnsureStringOrUtf8Type(value, ctx)) { + return false; + } + if (!value.IsCallable({"String", "Utf8"})) { + ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "Expected literal string as compression value")); + return false; + } + unit = value.Head().Content(); + } + return NCommon::ValidateIntervalUnit(unit, ctx); + } + YQL_ENSURE(name == "projection"); if (!EnsureAtom(setting.Tail(), ctx)) { return false; @@ -306,7 +324,7 @@ public: return true; }; if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), - { "compression", "partitionedby", "projection" }, validator, ctx)) + { "compression", "partitionedby", "projection", "data.interval.unit" }, validator, ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/serializations/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/serializations/CMakeLists.darwin.txt new file mode 100644 index 0000000000..2a1f7b92b1 --- /dev/null +++ b/ydb/library/yql/providers/s3/serializations/CMakeLists.darwin.txt @@ -0,0 +1,29 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-serializations) +target_include_directories(providers-s3-serializations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src +) +target_link_libraries(providers-s3-serializations PUBLIC + contrib-libs-cxxsupp + yutil + common-clickhouse-client + tools-enum_parser-enum_serialization_runtime +) +target_sources(providers-s3-serializations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/serializations/serialization_interval.cpp +) +generate_enum_serilization(providers-s3-serializations + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/serializations/serialization_interval.h + INCLUDE_HEADERS + ydb/library/yql/providers/s3/serializations/serialization_interval.h +) diff --git a/ydb/library/yql/providers/s3/serializations/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/serializations/CMakeLists.linux.txt new file mode 100644 index 0000000000..0a75f68101 --- /dev/null +++ b/ydb/library/yql/providers/s3/serializations/CMakeLists.linux.txt @@ -0,0 +1,29 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-serializations) +target_include_directories(providers-s3-serializations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random + ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src +) +target_link_libraries(providers-s3-serializations PUBLIC + contrib-libs-cxxsupp + yutil + clickhouse_client_udf + tools-enum_parser-enum_serialization_runtime +) +target_sources(providers-s3-serializations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/serializations/serialization_interval.cpp +) +generate_enum_serilization(providers-s3-serializations + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/serializations/serialization_interval.h + INCLUDE_HEADERS + ydb/library/yql/providers/s3/serializations/serialization_interval.h +) diff --git a/ydb/library/yql/providers/s3/serializations/CMakeLists.txt b/ydb/library/yql/providers/s3/serializations/CMakeLists.txt new file mode 100644 index 0000000000..fc7b1ee73c --- /dev/null +++ b/ydb/library/yql/providers/s3/serializations/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/s3/serializations/serialization_interval.cpp b/ydb/library/yql/providers/s3/serializations/serialization_interval.cpp new file mode 100644 index 0000000000..65d23b1c3a --- /dev/null +++ b/ydb/library/yql/providers/s3/serializations/serialization_interval.cpp @@ -0,0 +1,123 @@ +#include "serialization_interval.h" + +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadHelpers.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/WriteHelpers.h> + +#include <util/generic/serialized_enum.h> + +namespace NYql::NSerialization { + +using namespace NDB; +namespace { + +Int64 ToAvgMicroSeconds(TSerializationInterval::EUnit unit) +{ + switch (unit) + { + case TSerializationInterval::EUnit::MICROSECONDS: return 1LL; + case TSerializationInterval::EUnit::MILLISECONDS: return 1000LL; + case TSerializationInterval::EUnit::SECONDS: return 1000000LL; + case TSerializationInterval::EUnit::MINUTES: return 60000000LL; + case TSerializationInterval::EUnit::HOURS: return 3600000000LL; + case TSerializationInterval::EUnit::DAYS: return 86400000000LL; + case TSerializationInterval::EUnit::WEEKS: return 604800000000LL; + } +} + +} + +TSerializationInterval::EUnit TSerializationInterval::ToUnit(const TString& unit) { + const auto names = GetEnumNames<EUnit>(); + for (const auto& name: names) { + if (name.second == unit) { + return name.first; + } + } + return EUnit::MICROSECONDS; +} + +TSerializationInterval::TSerializationInterval(EUnit unit) + : Multiplier(ToAvgMicroSeconds(unit)) +{} + +void TSerializationInterval::serializeText(const IColumn & column, size_t rowNum, WriteBuffer & ostr, const FormatSettings &) const +{ + writeText(assert_cast<const ColumnVector<Int64> &>(column).getData()[rowNum] / Multiplier , ostr); +} + +void TSerializationInterval::deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & formatSettings) const +{ + Base::deserializeText(column, istr, formatSettings); + assert_cast<ColumnVector<Int64> &>(column).getData().back() *= Multiplier; +} + +void TSerializationInterval::serializeTextJSON(const IColumn & column, size_t rowNum, WriteBuffer & ostr, const FormatSettings & settings) const +{ + auto x = assert_cast<const ColumnVector<Int64> &>(column).getData()[rowNum] / Multiplier; + writeJSONNumber(x, ostr, settings); +} + +void TSerializationInterval::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & formatSettings) const +{ + Base::deserializeTextJSON(column, istr, formatSettings); + assert_cast<ColumnVector<Int64> &>(column).getData().back() *= Multiplier; +} + +void TSerializationInterval::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & formatSettings) const +{ + Base::deserializeTextCSV(column, istr, formatSettings); + assert_cast<ColumnVector<Int64> &>(column).getData().back() *= Multiplier; +} + +void TSerializationInterval::serializeBinary(const Field & field, WriteBuffer & ostr) const +{ + /// ColumnVector<T>::ValueType is a narrower type. For example, UInt8, when the Field type is UInt64 + typename ColumnVector<Int64>::ValueType x = get<FieldType>(field) / Multiplier; + writeBinary(x, ostr); +} + +void TSerializationInterval::deserializeBinary(Field & field, ReadBuffer & istr) const +{ + Base::deserializeBinary(field, istr); + field.get<Int64>() *= Multiplier; +} + +void TSerializationInterval::serializeBinary(const IColumn & column, size_t rowNum, WriteBuffer & ostr) const +{ + writeBinary(assert_cast<const ColumnVector<Int64> &>(column).getData()[rowNum] / Multiplier, ostr); +} + +void TSerializationInterval::deserializeBinary(IColumn & column, ReadBuffer & istr) const +{ + Base::deserializeBinary(column, istr); + assert_cast<ColumnVector<Int64> &>(column).getData().back() *= Multiplier; +} + +void TSerializationInterval::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const +{ + const typename ColumnVector<NDB::Int64>::Container & x = typeid_cast<const ColumnVector<NDB::Int64> &>(column).getData(); + size_t size = x.size(); + + if (limit == 0 || offset + limit > size) + limit = size - offset; + + for (size_t i = offset; i < limit; i++) { + serializeBinary(column, i, ostr); + } +} + +void TSerializationInterval::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avgValueSizeHint) const +{ + Base::deserializeBinaryBulk(column, istr, limit, avgValueSizeHint); + ColumnVector<Int64>::Container & x = typeid_cast<ColumnVector<Int64> &>(column).getData(); + for (size_t i = x.size() - limit; i < x.size(); i++) { + assert_cast<ColumnVector<Int64> &>(column).getData()[i] *= Multiplier; + } +} + +NDB::DataTypePtr GetInterval(TSerializationInterval::EUnit unit) { + auto customDescr = std::make_unique<NDB::DataTypeCustomDesc>(std::make_unique<NDB::DataTypeCustomFixedName>("IntervalSecond"), std::make_shared<TSerializationInterval>(unit)); + return NDB::DataTypeFactory::instance().getCustom(std::move(customDescr)); +} + +} diff --git a/ydb/library/yql/providers/s3/serializations/serialization_interval.h b/ydb/library/yql/providers/s3/serializations/serialization_interval.h new file mode 100644 index 0000000000..29ee573c2a --- /dev/null +++ b/ydb/library/yql/providers/s3/serializations/serialization_interval.h @@ -0,0 +1,52 @@ +#pragma once + +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeFactory.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeInterval.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/Serializations/SerializationNumber.h> + +#include <util/generic/fwd.h> + +namespace NYql::NSerialization { + +using namespace NDB; + +class TSerializationInterval : public SerializationNumber<Int64> { +public: + enum class EUnit + { + MICROSECONDS, + MILLISECONDS, + SECONDS, + MINUTES, + HOURS, + DAYS, + WEEKS + }; + +private: + Int64 Multiplier; + +public: + TSerializationInterval(EUnit unit); + + static EUnit ToUnit(const TString& unit); + + using Base = NDB::SerializationNumber<Int64>; + void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override; + void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; + void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override; + void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override; + void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override; + + /** Format is platform-dependent. */ + void serializeBinary(const Field & field, WriteBuffer & ostr) const override; + void deserializeBinary(Field & field, ReadBuffer & istr) const override; + void serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const override; + void deserializeBinary(IColumn & column, ReadBuffer & istr) const override; + void serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const override; + void deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const override; +}; + +NDB::DataTypePtr GetInterval(TSerializationInterval::EUnit unit); + +} |