diff options
author | Fiodar Miron <61616792+fedor-miron@users.noreply.github.com> | 2024-05-23 01:50:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-23 01:50:17 +0300 |
commit | 70e288a74a1d044cf949f73d371d1351a3b5caa8 (patch) | |
tree | 9fc52d50798842e7e0c5d84fb38f0dbf385aa27e | |
parent | b5300420c4e0bf024d8ad6075799815e2b15ea39 (diff) | |
download | ydb-70e288a74a1d044cf949f73d371d1351a3b5caa8.tar.gz |
YQL-18117: Implement block tz date (#3366)
16 files changed, 790 insertions, 133 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp index e417fcf535..ee2864cc23 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp @@ -195,6 +195,37 @@ private: mutable TVector<TBlockItem> Items; }; +template <typename TTzDate, bool Nullable> +class TTzDateBlockItemConverter : public IBlockItemConverter { +public: + using TLayout = typename NYql::NUdf::TDataType<TTzDate>::TLayout; + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + Y_UNUSED(holderFactory); + if constexpr (Nullable) { + if (!item) { + return {}; + } + } + + NUdf::TUnboxedValuePod value {item.Get<TLayout>()}; + value.SetTimezoneId(item.GetTimezoneId()); + return value; + } + + TBlockItem MakeItem(const NUdf::TUnboxedValuePod& value) const final { + if constexpr (Nullable) { + if (!value) { + return {}; + } + } + + TBlockItem item {value.Get<TLayout>()}; + item.SetTimezoneId(value.GetTimezoneId()); + return item; + } +}; + class TExternalOptionalBlockItemConverter : public IBlockItemConverter { public: TExternalOptionalBlockItemConverter(std::unique_ptr<IBlockItemConverter>&& inner) @@ -229,6 +260,8 @@ struct TConverterTraits { template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String, NUdf::EPgStringType PgString = NUdf::EPgStringType::None> using TStrings = TStringBlockItemConverter<TStringType, Nullable, PgString>; using TExtOptional = TExternalOptionalBlockItemConverter; + template<typename TTzDate, bool Nullable> + using TTzDateConverter = TTzDateBlockItemConverter<TTzDate, Nullable>; static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { if (desc.PassByValue) { @@ -258,6 +291,15 @@ struct TConverterTraits { return std::make_unique<TResourceBlockItemConverter<false>>(); } } + + template<typename TTzDate> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique<TTzDateConverter<TTzDate, true>>(); + } else { + return std::make_unique<TTzDateConverter<TTzDate, false>>(); + } + } }; } // namespace diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index cd7a9a60cf..e9700ed439 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -150,7 +150,7 @@ public: DoResetMetadata(); } - std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) { + std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) const { std::shared_ptr<arrow::Buffer> nulls; i64 nullsCount = 0; if (IsNullable()) { @@ -176,7 +176,7 @@ template<size_t ObjectSize, bool Nullable> class TFixedSizeBlockSerializer final : public IBlockSerializer { public: TFixedSizeBlockSerializer() = default; -private: + size_t ArrayMetadataCount() const final { return Nullable ? 3 : 1; } @@ -389,16 +389,10 @@ private: const std::unique_ptr<TBlockDeserializerBase> Inner_; }; -template<bool Nullable> -class TTupleBlockSerializer final : public IBlockSerializer { -public: - explicit TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children) - : Children_(std::move(children)) - { - } -private: +template<bool Nullable, typename TDerived> +class TTupleBlockSerializerBase : public IBlockSerializer { size_t ArrayMetadataCount() const final { - size_t result = GetChildMetaCount(); + size_t result = static_cast<const TDerived*>(this)->GetChildrenMetaCount(); if constexpr (Nullable) { result += 2; } @@ -410,14 +404,12 @@ private: StoreNullsSizes(data, metaSink); } if (data.GetNullCount() == data.length) { - auto childCount = GetChildMetaCount(); + auto childCount = static_cast<const TDerived*>(this)->GetChildrenMetaCount(); for (size_t i = 0; i < childCount; ++i) { metaSink(0); } } else { - for (size_t i = 0; i < Children_.size(); ++i) { - Children_[i]->StoreMetadata(*data.child_data[i], metaSink); - } + static_cast<const TDerived*>(this)->StoreChildrenMetadata(data.child_data, metaSink); } } @@ -426,13 +418,19 @@ private: StoreNulls(data, dst); } if (data.GetNullCount() != data.length) { - for (size_t i = 0; i < Children_.size(); ++i) { - Children_[i]->StoreArray(*data.child_data[i], dst); - } + static_cast<const TDerived*>(this)->StoreChildrenArrays(data.child_data, dst); } } +}; + +template<bool Nullable> +class TTupleBlockSerializer final : public TTupleBlockSerializerBase<Nullable, TTupleBlockSerializer<Nullable>> { +public: + TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children) + : Children_(std::move(children)) + {} - size_t GetChildMetaCount() const { + size_t GetChildrenMetaCount() const { size_t result = 0; for (const auto& child : Children_) { result += child->ArrayMetadataCount(); @@ -440,9 +438,51 @@ private: return result; } + void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, + const IBlockSerializer::TMetadataSink& metaSink) const { + + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->StoreMetadata(*child_data[i], metaSink); + } + } + + void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const { + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->StoreArray(*child_data[i], dst); + } + } + +private: const TVector<std::unique_ptr<IBlockSerializer>> Children_; }; +template<typename TDate, bool Nullable> +class TTzDateBlockSerializer final : public TTupleBlockSerializerBase<Nullable, TTzDateBlockSerializer<TDate, Nullable>> { +public: + TTzDateBlockSerializer() = default; + + size_t GetChildrenMetaCount() const { + return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount(); + } + + void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, + const IBlockSerializer::TMetadataSink& metaSink) const { + DateSerialiser_.StoreMetadata(*child_data[0], metaSink); + TzSerialiser_.StoreMetadata(*child_data[1], metaSink); + } + + void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const { + DateSerialiser_.StoreArray(*child_data[0], dst); + TzSerialiser_.StoreArray(*child_data[1], dst); + } + +private: + using TDateLayout = typename NUdf::TDataType<TDate>::TLayout; + + TFixedSizeBlockSerializer<sizeof(TDateLayout), false> DateSerialiser_; + TFixedSizeBlockSerializer<sizeof(NYql::NUdf::TTimezoneId), false> TzSerialiser_; +}; + template<bool Nullable> class TTupleBlockDeserializer final : public TBlockDeserializerBase { public: @@ -494,6 +534,53 @@ private: const TVector<std::unique_ptr<TBlockDeserializerBase>> Children_; }; +template<typename TDate, bool Nullable> +class TTzDateBlockDeserializer final : public TBlockDeserializerBase { +public: + TTzDateBlockDeserializer() = default; + +private: + void DoLoadMetadata(const TMetadataSource& metaSource) final { + DateDeserialiser_.LoadMetadata(metaSource); + TzDeserialiser_.LoadMetadata(metaSource); + } + + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + std::vector<std::shared_ptr<arrow::ArrayData>> childData; + childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset)); + childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset)); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + } + + std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::vector<std::shared_ptr<arrow::ArrayData>> childData; + childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset)); + childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset)); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + } + + void DoResetMetadata() final { + DateDeserialiser_.ResetMetadata(); + TzDeserialiser_.ResetMetadata(); + } + + bool IsNullable() const final { + return Nullable; + } + + void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final { + YQL_ENSURE(type->fields().size() == 2); + ArrowType_ = type; + DateDeserialiser_.SetArrowType(type->field(0)->type()); + TzDeserialiser_.SetArrowType(type->field(1)->type()); + } + + using TDateLayout = typename NUdf::TDataType<TDate>::TLayout; + + TFixedSizeBlockDeserializer<sizeof(TDateLayout), false> DateDeserialiser_; + TFixedSizeBlockDeserializer<sizeof(NYql::NUdf::TTimezoneId), false> TzDeserialiser_; +}; + struct TSerializerTraits { using TResult = IBlockSerializer; template <bool Nullable> @@ -503,6 +590,9 @@ struct TSerializerTraits { template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String> using TStrings = TStringBlockSerializer<TStringType, Nullable>; using TExtOptional = TExtOptionalBlockSerializer; + template<typename TTzDateType, bool Nullable> + using TTzDate = TTzDateBlockSerializer<TTzDateType, Nullable>; + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -516,6 +606,16 @@ struct TSerializerTraits { Y_UNUSED(isOptional); ythrow yexception() << "Serializer not implemented for block resources"; } + + template<typename TTzDateType> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique<TTzDate<TTzDateType, true>>(); + } + else { + return std::make_unique<TTzDate<TTzDateType, false>>(); + } + } }; struct TDeserializerTraits { @@ -527,6 +627,8 @@ struct TDeserializerTraits { template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String> using TStrings = TStringBlockDeserializer<TStringType, Nullable>; using TExtOptional = TExtOptionalBlockDeserializer; + template<typename TTzDateType, bool Nullable> + using TTzDate = TTzDateBlockDeserializer<TTzDateType, Nullable>; static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -540,6 +642,16 @@ struct TDeserializerTraits { Y_UNUSED(isOptional); ythrow yexception() << "Deserializer not implemented for block resources"; } + + template<typename TTzDateType> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique<TTzDate<TTzDateType, true>>(); + } + else { + return std::make_unique<TTzDate<TTzDateType, false>>(); + } + } }; } // namespace diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp index 3e0e7117c4..ea4802cd67 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -665,6 +665,9 @@ protected: auto scalarOptStrType = PgmBuilder.NewBlockType(optStrType, TBlockType::EShape::Scalar); auto blockOptTupleOptUi32StrType = PgmBuilder.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many); auto scalarUi64Type = PgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + + auto tzDateType = PgmBuilder.NewDataType(NUdf::EDataSlot::TzDate); + auto blockTzDateType = PgmBuilder.NewBlockType(tzDateType, TBlockType::EShape::Many); auto rowType = legacyStruct @@ -674,10 +677,11 @@ protected: {"_yql_block_length", scalarUi64Type}, {"a", scalarOptStrType}, {"b", blockOptTupleOptUi32StrType}, + {"c", blockTzDateType} }) : PgmBuilder.NewMultiType( {blockUi32Type, blockOptStrType, scalarOptStrType, - blockOptTupleOptUi32StrType, scalarUi64Type}); + blockOptTupleOptUi32StrType, blockTzDateType, scalarUi64Type}); ui64 blockLen = 1000; UNIT_ASSERT_LE(offset + len, blockLen); @@ -685,6 +689,7 @@ protected: auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr); auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr); auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr); + auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr); for (ui32 i = 0; i < blockLen; ++i) { TBlockItem b1(i); @@ -697,6 +702,10 @@ protected: TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) }; TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem(); builder3->Add(b3); + + TBlockItem tzDate {i}; + tzDate.SetTimezoneId(i % 100); + builder4->Add(tzDate); } std::string_view testScalarString = "foobar"; @@ -709,11 +718,13 @@ protected: datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); datums.emplace_back(builder3->Build(true)); + datums.emplace_back(builder4->Build(true)); } else { datums.emplace_back(builder1->Build(true)); datums.emplace_back(builder2->Build(true)); datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf))); datums.emplace_back(builder3->Build(true)); + datums.emplace_back(builder4->Build(true)); datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen))); } @@ -767,6 +778,7 @@ protected: auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type); auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType); auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType); + auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType); for (ui32 i = offset; i < len; ++i) { TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset); @@ -792,6 +804,10 @@ protected: } else { UNIT_ASSERT(!b3); } + + TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset); + UNIT_ASSERT(b4.Get<ui16>() == i); + UNIT_ASSERT(b4.GetTimezoneId() == (i % 100)); } } } diff --git a/ydb/library/yql/minikql/datetime/datetime.h b/ydb/library/yql/minikql/datetime/datetime.h index 71363b27cd..a86f28060c 100644 --- a/ydb/library/yql/minikql/datetime/datetime.h +++ b/ydb/library/yql/minikql/datetime/datetime.h @@ -21,6 +21,7 @@ struct TTMStorage { unsigned int Second : 6; unsigned int Microsecond : 20; unsigned int TimezoneId : 16; + ui8 Reserved[2]; TTMStorage() { Zero(*this); diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index dc946c4322..38e536688e 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1455,9 +1455,28 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty case NUdf::EDataSlot::Json: type = arrow::utf8(); return true; - default: + case NUdf::EDataSlot::TzDate: { + type = MakeTzDateArrowType<NYql::NUdf::EDataSlot::TzDate>(); + return true; + } + case NUdf::EDataSlot::TzDatetime: { + type = MakeTzDateArrowType<NYql::NUdf::EDataSlot::TzDatetime>(); + return true; + } + case NUdf::EDataSlot::TzTimestamp: { + type = MakeTzDateArrowType<NYql::NUdf::EDataSlot::TzTimestamp>(); + return true; + } + case NUdf::EDataSlot::Uuid: { return false; } + case NUdf::EDataSlot::Decimal: { + return false; + } + case NUdf::EDataSlot::DyNumber: { + return false; + } + } } bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type) { @@ -2415,9 +2434,22 @@ size_t CalcMaxBlockItemSize(const TType* type) { case NUdf::EDataSlot::Json: // size of offset part return sizeof(arrow::StringType::offset_type); - default: + case NUdf::EDataSlot::TzDate: + return sizeof(typename NUdf::TDataType<NUdf::TTzDate>::TLayout) + sizeof(NYql::NUdf::TTimezoneId); + case NUdf::EDataSlot::TzDatetime: + return sizeof(typename NUdf::TDataType<NUdf::TTzDatetime>::TLayout) + sizeof(NYql::NUdf::TTimezoneId); + case NUdf::EDataSlot::TzTimestamp: + return sizeof(typename NUdf::TDataType<NUdf::TTzTimestamp>::TLayout) + sizeof(NYql::NUdf::TTimezoneId); + case NUdf::EDataSlot::Uuid: { MKQL_ENSURE(false, "Unsupported data slot: " << slot); } + case NUdf::EDataSlot::Decimal: { + MKQL_ENSURE(false, "Unsupported data slot: " << slot); + } + case NUdf::EDataSlot::DyNumber: { + MKQL_ENSURE(false, "Unsupported data slot: " << slot); + } + } } MKQL_ENSURE(false, "Unsupported type: " << *type); @@ -2432,6 +2464,8 @@ struct TComparatorTraits { template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String> using TStrings = NUdf::TStringBlockItemComparator<TStringType, Nullable>; using TExtOptional = NUdf::TExternalOptionalBlockItemComparator; + template <typename T, bool Nullable> + using TTzDateComparator = NUdf::TTzDateBlockItemComparator<T, Nullable>; static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -2442,6 +2476,15 @@ struct TComparatorTraits { Y_UNUSED(isOptional); ythrow yexception() << "Comparator not implemented for block resources: "; } + + template<typename TTzDate> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique<TTzDateComparator<TTzDate, true>>(); + } else { + return std::make_unique<TTzDateComparator<TTzDate, false>>(); + } + } }; struct THasherTraits { @@ -2453,6 +2496,8 @@ struct THasherTraits { template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String> using TStrings = NUdf::TStringBlockItemHasher<TStringType, Nullable>; using TExtOptional = NUdf::TExternalOptionalBlockItemHasher; + template <typename T, bool Nullable> + using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher<T, Nullable>; static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -2463,6 +2508,15 @@ struct THasherTraits { Y_UNUSED(isOptional); ythrow yexception() << "Hasher not implemented for block resources"; } + + template<typename TTzDate> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique<TTzDateHasher<TTzDate, true>>(); + } else { + return std::make_unique<TTzDateHasher<TTzDate, false>>(); + } + } }; NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const { diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index 2b234d7ee7..7199bc8360 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -33,6 +33,31 @@ inline size_t CalcBlockLen(size_t maxBlockItemSize) { bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type); bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type); +template<NUdf::EDataSlot slot> +std::shared_ptr<arrow::DataType> MakeTzLayoutArrowType() { + static_assert(slot == NUdf::EDataSlot::TzDate || slot == NUdf::EDataSlot::TzDatetime || slot == NUdf::EDataSlot::TzTimestamp, + "Expected tz date type slot"); + + if constexpr (slot == NUdf::EDataSlot::TzDate) { + return arrow::uint16(); + } + if constexpr (slot == NUdf::EDataSlot::TzDatetime) { + return arrow::uint32(); + } + if constexpr (slot == NUdf::EDataSlot::TzTimestamp) { + return arrow::uint64(); + } +} + +template<NUdf::EDataSlot slot> +std::shared_ptr<arrow::StructType> MakeTzDateArrowType() { + std::vector<std::shared_ptr<arrow::Field>> fields { + std::make_shared<arrow::Field>("datetime", MakeTzLayoutArrowType<slot>()), + std::make_shared<arrow::Field>("timezoneId", arrow::uint16()), + }; + return std::make_shared<arrow::StructType>(fields); +} + class TArrowType : public NUdf::IArrowType { public: TArrowType(const std::shared_ptr<arrow::DataType>& type) diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp index c273fc0eac..f1b7e03847 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/public/udf/arrow/block_reader.h> #include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/minikql/mkql_type_ops.h> #include <library/cpp/yson/node/node_io.h> #include <library/cpp/yson/detail.h> @@ -155,6 +156,10 @@ public: const char* Data() { return Data_; } + + size_t Available() const { + return Available_; + } private: const char* Data_; size_t Available_; @@ -314,9 +319,41 @@ public: return ReadYson(buf); } } -private: - const TVector<std::unique_ptr<IYsonBlockReader>> Children_; - TVector<NUdf::TBlockItem> Items_; +}; + +template<typename T, bool Nullable, bool Native> +class TYsonTzDateBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> { +public: + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + using TLayout = typename NUdf::TDataType<T>::TLayout; + size_t length = sizeof(TLayout) + sizeof(NUdf::TTimezoneId); + Y_ASSERT(buf.Available() == length); + + TLayout date; + NUdf::TTimezoneId tz; + + if constexpr (std::is_same_v<T, NUdf::TTzDate>) { + DeserializeTzDate({buf.Data(), length}, date, tz); + } else if constexpr (std::is_same_v<T, NUdf::TTzDatetime>) { + DeserializeTzDatetime({buf.Data(), length}, date, tz); + } else if constexpr (std::is_same_v<T, NUdf::TTzTimestamp>) { + DeserializeTzTimestamp({buf.Data(), length}, date, tz); + } else { + static_assert(sizeof(T) == 0, "Unsupported tz date type"); + } + + buf.Skip(length); + NUdf::TBlockItem res {date}; + res.SetTimezoneId(tz); + return res; + } }; namespace { @@ -374,9 +411,6 @@ public: buf.Next(); return NUdf::TBlockItem(T(buf.NextDouble())); } -private: - const TVector<std::unique_ptr<IYsonBlockReader>> Children_; - TVector<NUdf::TBlockItem> Items_; }; template<bool Native> @@ -439,6 +473,18 @@ struct TYsonBlockReaderTraits { Y_UNUSED(isOptional); ythrow yexception() << "Yson reader not implemented for block resources"; } + + template<typename TTzDate> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + Y_UNUSED(isOptional); + if (isOptional) { + using TTzDateReader = TYsonTzDateBlockReader<TTzDate, true, Native>; + return std::make_unique<TTzDateReader>(); + } else { + using TTzDateReader = TYsonTzDateBlockReader<TTzDate, false, Native>; + return std::make_unique<TTzDateReader>(); + } + } }; template<bool IsDictionary> diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index 7c86083ec9..10fa0a6f35 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -8,6 +8,7 @@ #include <ydb/library/yql/public/udf/udf_value.h> #include <ydb/library/yql/public/udf/udf_value_builder.h> #include <ydb/library/yql/public/udf/udf_type_inspection.h> +#include <ydb/library/yql/minikql/mkql_type_builder.h> #include <arrow/datum.h> #include <arrow/c/bridge.h> @@ -516,15 +517,16 @@ private: template<typename TLayout, bool Nullable> class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase<TLayout, Nullable, TFixedSizeArrayBuilder<TLayout, Nullable>> { - using TDerived = TFixedSizeArrayBuilder<TLayout, Nullable>; + using TSelf = TFixedSizeArrayBuilder<TLayout, Nullable>; + using TBase = TFixedSizeArrayBuilderBase<TLayout, Nullable, TSelf>; public: TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) - : TFixedSizeArrayBuilderBase<TLayout, Nullable, TDerived>(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated) + : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated) {} TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) - : TFixedSizeArrayBuilderBase<TLayout, Nullable, TDerived>(typeInfoHelper, type, pool, maxLen, totalAllocated) + : TBase(typeInfoHelper, type, pool, maxLen, totalAllocated) {} void DoAddNotNull(TUnboxedValuePod value) { @@ -936,13 +938,11 @@ private: i32 TypeLen = 0; }; -template<bool Nullable> -class TTupleArrayBuilder final : public TArrayBuilderBase { +template<bool Nullable, typename TDerived> +class TTupleArrayBuilderBase : public TArrayBuilderBase { public: - TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, - TVector<TArrayBuilderBase::Ptr>&& children, size_t* totalAllocated) + TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated) - , Children(std::move(children)) { Reserve(); } @@ -951,70 +951,50 @@ public: if constexpr (Nullable) { if (!value) { NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast<TDerived*>(this)->AddToChildrenDefault(); return; } NullBuilder->UnsafeAppend(1); } - auto elements = value.GetElements(); - if (elements) { - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(elements[i]); - } - } else { - for (ui32 i = 0; i < Children.size(); ++i) { - auto element = value.GetElement(i); - Children[i]->Add(element); - } - } + static_cast<TDerived*>(this)->AddToChildren(value); } void DoAdd(TBlockItem value) final { if constexpr (Nullable) { if (!value) { NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast<TDerived*>(this)->AddToChildrenDefault(); return; } NullBuilder->UnsafeAppend(1); } - auto elements = value.AsTuple(); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(elements[i]); - } + static_cast<TDerived*>(this)->AddToChildren(value); } void DoAdd(TInputBuffer& input) final { if constexpr (Nullable) { if (!input.PopChar()) { - return DoAdd(TBlockItem{}); + NullBuilder->UnsafeAppend(0); + static_cast<TDerived*>(this)->AddToChildrenDefault(); + return; } NullBuilder->UnsafeAppend(1); } - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(input); - } + static_cast<TDerived*>(this)->AddToChildren(input); } void DoAddDefault() final { if constexpr (Nullable) { NullBuilder->UnsafeAppend(1); } - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast<TDerived*>(this)->AddToChildrenDefault(); } void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { if (array.buffers.front()) { @@ -1026,14 +1006,11 @@ public: } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); - } + static_cast<TDerived*>(this)->AddManyToChildren(array, sparseBitmap, popCount); } void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { for (ui64 i = beginIndex; i < beginIndex + count; ++i) { @@ -1041,14 +1018,11 @@ public: } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], beginIndex, count); - } + static_cast<TDerived*>(this)->AddManyToChildren(array, beginIndex, count); } void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { for (size_t i = 0; i < count; ++i) { @@ -1056,9 +1030,7 @@ public: } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], indexes, count); - } + static_cast<TDerived*>(this)->AddManyToChildren(array, indexes, count); } TBlockArrayTree::Ptr DoBuildTree(bool finish) final { @@ -1074,10 +1046,7 @@ public: Y_ABORT_UNLESS(length); result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap })); - result->Children.reserve(Children.size()); - for (ui32 i = 0; i < Children.size(); ++i) { - result->Children.emplace_back(Children[i]->BuildTree(finish)); - } + static_cast<TDerived*>(this)->BuildChildrenTree(finish, result->Children); if (!finish) { Reserve(); @@ -1104,10 +1073,145 @@ private: } private: - TVector<std::unique_ptr<TArrayBuilderBase>> Children; std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; }; +template<bool Nullable> +class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>> { +public: + + TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, + TVector<TArrayBuilderBase::Ptr>&& children, size_t* totalAllocated = nullptr) + : TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated) + , Children_(std::move(children)) {} + + void AddToChildrenDefault() { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->AddDefault(); + } + } + + void AddToChildren(NUdf::TUnboxedValuePod value) { + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(elements[i]); + } + } else { + for (ui32 i = 0; i < Children_.size(); ++i) { + auto element = value.GetElement(i); + Children_[i]->Add(element); + } + } + } + + void AddToChildren(TBlockItem value) { + auto elements = value.AsTuple(); + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(elements[i]); + } + } + + void AddToChildren(TInputBuffer& input) { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(input); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], beginIndex, count); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], indexes, count); + } + } + + void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) { + resultChildren.reserve(Children_.size()); + for (ui32 i = 0; i < Children_.size(); ++i) { + resultChildren.emplace_back(Children_[i]->BuildTree(finish)); + } + } + +private: +TVector<std::unique_ptr<TArrayBuilderBase>> Children_; +}; + +template<typename TDate, bool Nullable> +class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>> { + using TDateLayout = typename TDataType<TDate>::TLayout; + static constexpr auto DataSlot = TDataType<TDate>::Slot; + +public: + TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr) + : TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated) + , DateBuilder_(typeInfoHelper, NKikimr::NMiniKQL::MakeTzLayoutArrowType<DataSlot>(), pool, maxLen) + , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen) + { + } + + void AddToChildrenDefault() { + DateBuilder_.AddDefault(); + TimezoneBuilder_.AddDefault(); + } + + void AddToChildren(NUdf::TUnboxedValuePod value) { + DateBuilder_.Add(value); + TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId())); + } + + void AddToChildren(TBlockItem value) { + DateBuilder_.Add(value); + TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId())); + } + + void AddToChildren(TInputBuffer& input) { + DateBuilder_.Add(input); + TimezoneBuilder_.Add(input); + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); + TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length); + } + + void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], beginIndex, count); + TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count); + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], indexes, count); + TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count); + } + + void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) { + resultChildren.emplace_back(DateBuilder_.BuildTree(finish)); + resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish)); + } + +private: + TFixedSizeArrayBuilder<TDateLayout, false> DateBuilder_; + TFixedSizeArrayBuilder<ui16, false> TimezoneBuilder_; +}; + + class TExternalOptionalArrayBuilder final : public TArrayBuilderBase { public: TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, @@ -1306,7 +1410,13 @@ inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl( return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated); case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Json: - return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated); + return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzDate: + return std::make_unique<TTzDateArrayBuilder<TTzDate, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzDatetime: + return std::make_unique<TTzDateArrayBuilder<TTzDatetime, Nullable>>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzTimestamp: + return std::make_unique<TTzDateArrayBuilder<TTzTimestamp, Nullable>>(typeInfoHelper, type, pool, maxLen); default: Y_ENSURE(false, "Unsupported data slot"); } diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h index 15edabe27a..f04a25666b 100644 --- a/ydb/library/yql/public/udf/arrow/block_item.h +++ b/ydb/library/yql/public/udf/arrow/block_item.h @@ -155,6 +155,15 @@ public: bool IsBoxed() const { return EMarkers::Boxed == GetMarkers(); } bool IsEmbedded() const { return EMarkers::Embedded == GetMarkers(); } + inline void SetTimezoneId(ui16 id) { + UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime"); + Raw.Simple.TimezoneId = id; + } + + inline ui16 GetTimezoneId() const { + UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime"); + return Raw.Simple.TimezoneId; + } private: union TRaw { @@ -180,7 +189,8 @@ private: union { ui64 FullMeta; struct { - ui8 Reserved[7]; + TTimezoneId TimezoneId; + ui8 Reserved[5]; ui8 Meta; }; }; diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h index 98ca57d7f9..3431932d92 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h +++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h @@ -3,6 +3,7 @@ #include "block_item.h" #include "block_reader.h" + #include <ydb/library/yql/public/udf/udf_ptr.h> #include <ydb/library/yql/public/udf/udf_type_inspection.h> #include <ydb/library/yql/public/udf/udf_type_size_check.h> @@ -148,6 +149,39 @@ public: } }; +template<typename TTzType, bool Nullable> +class TTzDateBlockItemComparator : public TBlockItemComparatorBase<TTzDateBlockItemComparator<TTzType, Nullable>, Nullable> { + using TLayout = typename TDataType<TTzType>::TLayout; + +public: + bool DoCompare(TBlockItem lhs, TBlockItem rhs) const { + const auto x = lhs.Get<TLayout>(); + const auto y = rhs.Get<TLayout>(); + + if (x == y) { + const auto tx = lhs.GetTimezoneId(); + const auto ty = rhs.GetTimezoneId(); + return (tx == ty) ? 0 : (tx < ty ? -1 : 1); + } + + if (x < y) { + return -1; + } + + return 1; + } + + bool DoEquals(TBlockItem lhs, TBlockItem rhs) const { + return lhs.Get<TLayout>() == rhs.Get<TLayout>() && lhs.GetTimezoneId() == rhs.GetTimezoneId(); + } + + + bool DoLess(TBlockItem lhs, TBlockItem rhs) const { + return std::forward_as_tuple(lhs.Get<TLayout>(), lhs.GetTimezoneId()) < std::forward_as_tuple(rhs.Get<TLayout>(), rhs.GetTimezoneId()); + } +}; + + template <bool Nullable> class TTupleBlockItemComparator : public TBlockItemComparatorBase<TTupleBlockItemComparator<Nullable>, Nullable> { public: diff --git a/ydb/library/yql/public/udf/arrow/block_item_hasher.h b/ydb/library/yql/public/udf/arrow/block_item_hasher.h index 4c3d89e998..a173e7940f 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_hasher.h +++ b/ydb/library/yql/public/udf/arrow/block_item_hasher.h @@ -49,6 +49,17 @@ public: } }; +template <typename T, bool Nullable> +class TTzDateBlockItemHasher : public TBlockItemHasherBase<TTzDateBlockItemHasher<T, Nullable>, Nullable> { +public: + ui64 DoHash(TBlockItem value) const { + using TLayout = typename TDataType<T>::TLayout; + TUnboxedValuePod uv {value.Get<TLayout>()}; + uv.SetTimezoneId(value.GetTimezoneId()); + return GetValueHash<TDataType<T>::Slot>(uv); + } +}; + template <typename TStringType, bool Nullable> class TStringBlockItemHasher : public TBlockItemHasherBase<TStringBlockItemHasher<TStringType, Nullable>, Nullable> { public: diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index d42c9f16c7..3eaeb62f73 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -203,26 +203,16 @@ public: } }; -template<bool Nullable> -class TTupleBlockReader final : public IBlockReader { +template<bool Nullable, typename TDerived> +class TTupleBlockReaderBase : public IBlockReader { public: - TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children) - : Children(std::move(children)) - , Items(Children.size()) - {} - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { if (IsNull(data, index)) { return {}; } } - - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetItem(*data.child_data[i], index); - } - - return TBlockItem(Items.data()); + return static_cast<TDerived*>(this)->GetChildrenItems(data, index); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { @@ -233,33 +223,87 @@ public: } const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); + return static_cast<TDerived*>(this)->GetChildrenScalarItems(structScalar); + } - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 size = 0; + if constexpr (Nullable) { + size += data.length; } - return TBlockItem(Items.data()); + size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data); + return size; } - ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 GetDataWeight(TBlockItem item) const final { + return static_cast<const TDerived*>(this)->GetDataWeightImpl(item); + } + + ui64 GetDefaultValueWeight() const final { ui64 size = 0; if constexpr (Nullable) { - size += data.length; + size = 1; + } + size += static_cast<const TDerived*>(this)->GetChildrenDefaultDataWeight(); + return size; + } + + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { + if constexpr (Nullable) { + if (IsNull(data, index)) { + return out.PushChar(0); + } + out.PushChar(1); } + static_cast<const TDerived*>(this)->SaveChildrenItems(data, index, out); + } + + void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { + if constexpr (Nullable) { + if (!scalar.is_valid) { + return out.PushChar(0); + } + out.PushChar(1); + } + + const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); + + static_cast<const TDerived*>(this)->SaveChildrenScalarItems(structScalar, out); + } +}; + +template<bool Nullable> +class TTupleBlockReader final : public TTupleBlockReaderBase<Nullable, TTupleBlockReader<Nullable>> { +public: + TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children) + : Children(std::move(children)) + , Items(Children.size()) + {} + + TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) { for (ui32 i = 0; i < Children.size(); ++i) { - size += Children[i]->GetDataWeight(*data.child_data[i]); + Items[i] = Children[i]->GetItem(*data.child_data[i], index); } - return size; + return TBlockItem(Items.data()); } - ui64 GetDataWeight(TBlockItem item) const final { + TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) { + for (ui32 i = 0; i < Children.size(); ++i) { + Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + } + + return TBlockItem(Items.data()); + } + + size_t GetDataWeightImpl(const TBlockItem& item) const { const TBlockItem* items = nullptr; ui64 size = 0; if constexpr (Nullable) { if (!item) { - return GetDefaultValueWeight(); + return this->GetDefaultValueWeight(); } size = 1; items = item.GetOptionalValue().GetElements(); @@ -274,40 +318,30 @@ public: return size; } - ui64 GetDefaultValueWeight() const final { - ui64 size = 0; - if constexpr (Nullable) { - size = 1; - } + size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { + size_t size = 0; for (ui32 i = 0; i < Children.size(); ++i) { - size += Children[i]->GetDefaultValueWeight(); + size += Children[i]->GetDataWeight(*data.child_data[i]); } + return size; } - void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { - if constexpr (Nullable) { - if (IsNull(data, index)) { - return out.PushChar(0); - } - out.PushChar(1); + size_t GetChildrenDefaultDataWeight() const { + size_t size = 0; + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDefaultValueWeight(); } + return size; + } + void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveItem(*data.child_data[i], index, out); } } - - void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { - if constexpr (Nullable) { - if (!scalar.is_valid) { - return out.PushChar(0); - } - out.PushChar(1); - } - - const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - + + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveScalarItem(*structScalar.value[i], out); } @@ -318,6 +352,65 @@ private: TVector<TBlockItem> Items; }; +template<typename TTzDate, bool Nullable> +class TTzDateBlockReader final : public TTupleBlockReaderBase<Nullable, TTzDateBlockReader<TTzDate, Nullable>> { +public: + TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)}; + item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get<ui16>()); + return item; + } + + TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) { + Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2); + + TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])}; + item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get<ui16>()); + return item; + } + + size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + size_t size = 0; + size += DateReader_.GetDataWeight(*data.child_data[0]); + size += TimezoneReader_.GetDataWeight(*data.child_data[1]); + return size; + } + + size_t GetDataWeightImpl(const TBlockItem& item) const { + Y_UNUSED(item); + return GetChildrenDefaultDataWeight(); + } + + size_t GetChildrenDefaultDataWeight() const { + ui64 size = 0; + if constexpr (Nullable) { + size = 1; + } + + size += DateReader_.GetDefaultValueWeight(); + size += TimezoneReader_.GetDefaultValueWeight(); + return size; + } + + void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { + DateReader_.SaveItem(*data.child_data[0], index, out); + TimezoneReader_.SaveItem(*data.child_data[1], index, out); + } + + void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { + DateReader_.SaveScalarItem(*structScalar.value[0], out); + TimezoneReader_.SaveScalarItem(*structScalar.value[1], out); + } + +private: + TFixedSizeBlockReader<typename TDataType<TTzDate>::TLayout, /* Nullable */false> DateReader_; + TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_; +}; + class TExternalOptionalBlockReader final : public IBlockReader { public: TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner) @@ -390,6 +483,8 @@ struct TReaderTraits { using TExtOptional = TExternalOptionalBlockReader; template<bool Nullable> using TResource = TResourceBlockReader<Nullable>; + template<typename TTzDate, bool Nullable> + using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>; static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -407,6 +502,15 @@ struct TReaderTraits { return std::make_unique<TResource<false>>(); } } + + template<typename TTzDate> + static std::unique_ptr<TResult> MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique<TTzDateReader<TTzDate, true>>(); + } else { + return std::make_unique<TTzDateReader<TTzDate, false>>(); + } + } }; template <typename TTraits> @@ -544,7 +648,15 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional); case NUdf::EDataSlot::Json: return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional); - default: + case NUdf::EDataSlot::TzDate: + return TTraits::template MakeTzDate<TTzDate>(isOptional); + case NUdf::EDataSlot::TzDatetime: + return TTraits::template MakeTzDate<TTzDatetime>(isOptional); + case NUdf::EDataSlot::TzTimestamp: + return TTraits::template MakeTzDate<TTzTimestamp>(isOptional); + case NUdf::EDataSlot::Uuid: + case NUdf::EDataSlot::Decimal: + case NUdf::EDataSlot::DyNumber: Y_ENSURE(false, "Unsupported data slot"); } } @@ -606,7 +718,10 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper, if (dataTypeInfo.Features & StringType) { props.MaxSize = {}; props.IsFixed = false; - } else { + } else if (dataTypeInfo.Features & TzDateType) { + *props.MaxSize += dataTypeInfo.FixedSize + sizeof(TTimezoneId); + } + else { *props.MaxSize += dataTypeInfo.FixedSize; } return; diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h index 669e6386b2..2231ff239e 100644 --- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h +++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h @@ -533,6 +533,25 @@ arrow::Status UnaryPreallocatedExecImpl(arrow::compute::KernelContext* ctx, cons return arrow::Status::OK(); } + +template <typename TReader, typename TOutput, TOutput(*Core)(TBlockItem)> +arrow::Status UnaryPreallocatedReaderExecImpl(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + Y_UNUSED(ctx); + static_assert(std::is_base_of_v<IBlockReader, TReader>); + TReader reader; + + auto& inArray = batch.values[0].array(); + auto& outArray = res->array(); + TOutput* outValues = outArray->GetMutableValues<TOutput>(1); + auto length = inArray->length; + for (int64_t i = 0; i < length; ++i) { + auto item = reader.GetItem(*inArray, i); + outValues[i] = Core(item); + } + + return arrow::Status::OK(); +} + template<typename TInput, typename TOutput, std::pair<TOutput, bool> Core(TInput)> struct TUnaryUnsafeFixedSizeFilterKernel { static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { diff --git a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp index fcaa281584..a915bb7814 100644 --- a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp +++ b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp @@ -175,6 +175,30 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { UNIT_ASSERT_VALUES_EQUAL(resource2->GetResourceTag(), ResourceName); } + Y_UNIT_TEST(TestTzDateBuilder_Layout) { + TArrayBuilderTestData data; + const auto tzDateType = data.PgmBuilder.NewDataType(EDataSlot::TzDate); + const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType, + *data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */ nullptr); + + auto makeTzDate = [] (ui16 val, ui16 tz) { + TUnboxedValuePod tzDate {val}; + tzDate.SetTimezoneId(tz); + return tzDate; + }; + + TVector<TUnboxedValuePod> dates{makeTzDate(1234, 1), makeTzDate(1234, 2), makeTzDate(45678, 333)}; + for (auto date: dates) { + arrayBuilder->Add(date); + } + + const auto datum = arrayBuilder->Build(true); + UNIT_ASSERT(datum.is_array()); + UNIT_ASSERT_VALUES_EQUAL(datum.length(), dates.size()); + const auto childData = datum.array()->child_data; + UNIT_ASSERT_VALUES_EQUAL_C(childData.size(), 2, "Expected date and timezone children"); + } + Y_UNIT_TEST(TestResourceStringValueBuilderReader) { TArrayBuilderTestData data; const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName); diff --git a/ydb/library/yql/public/udf/udf_data_type.h b/ydb/library/yql/public/udf/udf_data_type.h index 7802f180fd..476c3c313e 100644 --- a/ydb/library/yql/public/udf/udf_data_type.h +++ b/ydb/library/yql/public/udf/udf_data_type.h @@ -13,6 +13,8 @@ namespace NUdf { using TDataTypeId = ui16; +using TTimezoneId = ui16; + enum EDataTypeFeatures : ui32 { CanCompare = 1u << 0, HasDeterministicCompare = 1u << 1, diff --git a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp index d587c3a6fc..311168f300 100644 --- a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp +++ b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp @@ -38,6 +38,24 @@ public: static TResult DateCore(ui16 value) { return value * ui32(86400) * TResult(ScaleAfterSeconds); } + + template<typename TTzDate> + static TResult TzBlockCore(TBlockItem tzDate); + + template<> + static TResult TzBlockCore<TTzDate>(TBlockItem tzDate) { + return DateCore(tzDate.Get<ui16>()); + } + + template<> + static TResult TzBlockCore<TTzDatetime>(TBlockItem tzDate) { + return DatetimeCore(tzDate.Get<ui32>()); + } + + template<> + static TResult TzBlockCore<TTzTimestamp>(TBlockItem tzDate) { + return TimestampCore(tzDate.Get<ui64>()); + } static TResult DatetimeCore(ui32 value) { return value * TResult(ScaleAfterSeconds); @@ -55,6 +73,12 @@ public: static auto name = TStringRef(TFuncName, std::strlen(TFuncName)); return name; } + + template<typename TTzDate, typename TOutput> + static auto MakeTzBlockExec() { + using TReader = TTzDateBlockReader<TTzDate, /*Nullable*/ false>; + return UnaryPreallocatedReaderExecImpl<TReader, TOutput, TzBlockCore<TTzDate>>; + } static bool DeclareSignature( const TStringRef& name, @@ -135,8 +159,12 @@ public: if (!typesOnly) { if (typeId == TDataType<TDate>::Id || typeId == TDataType<TTzDate>::Id) { if (block) { + const auto exec = (typeId == TDataType<TTzDate>::Id) + ? MakeTzBlockExec<TTzDate, TResult>() + : UnaryPreallocatedExecImpl<ui16, TResult, DateCore>; + builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(), - UnaryPreallocatedExecImpl<ui16, TResult, DateCore>, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); + exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); } else { builder.Implementation(new TUnaryOverOptionalImpl<ui16, TResult, DateCore>()); } @@ -144,8 +172,12 @@ public: if (typeId == TDataType<TDatetime>::Id || typeId == TDataType<TTzDatetime>::Id) { if (block) { + const auto exec = (typeId == TDataType<TTzDatetime>::Id) + ? MakeTzBlockExec<TTzDatetime, TResult>() + : UnaryPreallocatedExecImpl<ui32, TResult, DatetimeCore>; + builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(), - UnaryPreallocatedExecImpl<ui32, TResult, DatetimeCore>, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); + exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); } else { builder.Implementation(new TUnaryOverOptionalImpl<ui32, TResult, DatetimeCore>()); } @@ -153,8 +185,12 @@ public: if (typeId == TDataType<TTimestamp>::Id || typeId == TDataType<TTzTimestamp>::Id) { if (block) { + const auto exec = (typeId == TDataType<TTzTimestamp>::Id) + ? MakeTzBlockExec<TTzTimestamp, TResult>() + : UnaryPreallocatedExecImpl<ui64, TResult, TimestampCore>; + builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(), - UnaryPreallocatedExecImpl<ui64, TResult, TimestampCore>, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); + exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION)); } else { builder.Implementation(new TUnaryOverOptionalImpl<ui64, TResult, TimestampCore>()); } |