aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFiodar Miron <61616792+fedor-miron@users.noreply.github.com>2024-05-23 01:50:17 +0300
committerGitHub <noreply@github.com>2024-05-23 01:50:17 +0300
commit70e288a74a1d044cf949f73d371d1351a3b5caa8 (patch)
tree9fc52d50798842e7e0c5d84fb38f0dbf385aa27e
parentb5300420c4e0bf024d8ad6075799815e2b15ea39 (diff)
downloadydb-70e288a74a1d044cf949f73d371d1351a3b5caa8.tar.gz
YQL-18117: Implement block tz date (#3366)
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_reader.cpp42
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_transport.cpp150
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp18
-rw-r--r--ydb/library/yql/minikql/datetime/datetime.h1
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp58
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.h25
-rw-r--r--ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp58
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h218
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item.h12
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item_comparator.h34
-rw-r--r--ydb/library/yql/public/udf/arrow/block_item_hasher.h11
-rw-r--r--ydb/library/yql/public/udf/arrow/block_reader.h209
-rw-r--r--ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h19
-rw-r--r--ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp24
-rw-r--r--ydb/library/yql/public/udf/udf_data_type.h2
-rw-r--r--ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp42
16 files changed, 790 insertions, 133 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
index e417fcf535..ee2864cc23 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
@@ -195,6 +195,37 @@ private:
mutable TVector<TBlockItem> Items;
};
+template <typename TTzDate, bool Nullable>
+class TTzDateBlockItemConverter : public IBlockItemConverter {
+public:
+ using TLayout = typename NYql::NUdf::TDataType<TTzDate>::TLayout;
+
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ Y_UNUSED(holderFactory);
+ if constexpr (Nullable) {
+ if (!item) {
+ return {};
+ }
+ }
+
+ NUdf::TUnboxedValuePod value {item.Get<TLayout>()};
+ value.SetTimezoneId(item.GetTimezoneId());
+ return value;
+ }
+
+ TBlockItem MakeItem(const NUdf::TUnboxedValuePod& value) const final {
+ if constexpr (Nullable) {
+ if (!value) {
+ return {};
+ }
+ }
+
+ TBlockItem item {value.Get<TLayout>()};
+ item.SetTimezoneId(value.GetTimezoneId());
+ return item;
+ }
+};
+
class TExternalOptionalBlockItemConverter : public IBlockItemConverter {
public:
TExternalOptionalBlockItemConverter(std::unique_ptr<IBlockItemConverter>&& inner)
@@ -229,6 +260,8 @@ struct TConverterTraits {
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String, NUdf::EPgStringType PgString = NUdf::EPgStringType::None>
using TStrings = TStringBlockItemConverter<TStringType, Nullable, PgString>;
using TExtOptional = TExternalOptionalBlockItemConverter;
+ template<typename TTzDate, bool Nullable>
+ using TTzDateConverter = TTzDateBlockItemConverter<TTzDate, Nullable>;
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
if (desc.PassByValue) {
@@ -258,6 +291,15 @@ struct TConverterTraits {
return std::make_unique<TResourceBlockItemConverter<false>>();
}
}
+
+ template<typename TTzDate>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ if (isOptional) {
+ return std::make_unique<TTzDateConverter<TTzDate, true>>();
+ } else {
+ return std::make_unique<TTzDateConverter<TTzDate, false>>();
+ }
+ }
};
} // namespace
diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
index cd7a9a60cf..e9700ed439 100644
--- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
@@ -150,7 +150,7 @@ public:
DoResetMetadata();
}
- std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) {
+ std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) const {
std::shared_ptr<arrow::Buffer> nulls;
i64 nullsCount = 0;
if (IsNullable()) {
@@ -176,7 +176,7 @@ template<size_t ObjectSize, bool Nullable>
class TFixedSizeBlockSerializer final : public IBlockSerializer {
public:
TFixedSizeBlockSerializer() = default;
-private:
+
size_t ArrayMetadataCount() const final {
return Nullable ? 3 : 1;
}
@@ -389,16 +389,10 @@ private:
const std::unique_ptr<TBlockDeserializerBase> Inner_;
};
-template<bool Nullable>
-class TTupleBlockSerializer final : public IBlockSerializer {
-public:
- explicit TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children)
- : Children_(std::move(children))
- {
- }
-private:
+template<bool Nullable, typename TDerived>
+class TTupleBlockSerializerBase : public IBlockSerializer {
size_t ArrayMetadataCount() const final {
- size_t result = GetChildMetaCount();
+ size_t result = static_cast<const TDerived*>(this)->GetChildrenMetaCount();
if constexpr (Nullable) {
result += 2;
}
@@ -410,14 +404,12 @@ private:
StoreNullsSizes(data, metaSink);
}
if (data.GetNullCount() == data.length) {
- auto childCount = GetChildMetaCount();
+ auto childCount = static_cast<const TDerived*>(this)->GetChildrenMetaCount();
for (size_t i = 0; i < childCount; ++i) {
metaSink(0);
}
} else {
- for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->StoreMetadata(*data.child_data[i], metaSink);
- }
+ static_cast<const TDerived*>(this)->StoreChildrenMetadata(data.child_data, metaSink);
}
}
@@ -426,13 +418,19 @@ private:
StoreNulls(data, dst);
}
if (data.GetNullCount() != data.length) {
- for (size_t i = 0; i < Children_.size(); ++i) {
- Children_[i]->StoreArray(*data.child_data[i], dst);
- }
+ static_cast<const TDerived*>(this)->StoreChildrenArrays(data.child_data, dst);
}
}
+};
+
+template<bool Nullable>
+class TTupleBlockSerializer final : public TTupleBlockSerializerBase<Nullable, TTupleBlockSerializer<Nullable>> {
+public:
+ TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children)
+ : Children_(std::move(children))
+ {}
- size_t GetChildMetaCount() const {
+ size_t GetChildrenMetaCount() const {
size_t result = 0;
for (const auto& child : Children_) {
result += child->ArrayMetadataCount();
@@ -440,9 +438,51 @@ private:
return result;
}
+ void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
+ const IBlockSerializer::TMetadataSink& metaSink) const {
+
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->StoreMetadata(*child_data[i], metaSink);
+ }
+ }
+
+ void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const {
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->StoreArray(*child_data[i], dst);
+ }
+ }
+
+private:
const TVector<std::unique_ptr<IBlockSerializer>> Children_;
};
+template<typename TDate, bool Nullable>
+class TTzDateBlockSerializer final : public TTupleBlockSerializerBase<Nullable, TTzDateBlockSerializer<TDate, Nullable>> {
+public:
+ TTzDateBlockSerializer() = default;
+
+ size_t GetChildrenMetaCount() const {
+ return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount();
+ }
+
+ void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
+ const IBlockSerializer::TMetadataSink& metaSink) const {
+ DateSerialiser_.StoreMetadata(*child_data[0], metaSink);
+ TzSerialiser_.StoreMetadata(*child_data[1], metaSink);
+ }
+
+ void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const {
+ DateSerialiser_.StoreArray(*child_data[0], dst);
+ TzSerialiser_.StoreArray(*child_data[1], dst);
+ }
+
+private:
+ using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
+
+ TFixedSizeBlockSerializer<sizeof(TDateLayout), false> DateSerialiser_;
+ TFixedSizeBlockSerializer<sizeof(NYql::NUdf::TTimezoneId), false> TzSerialiser_;
+};
+
template<bool Nullable>
class TTupleBlockDeserializer final : public TBlockDeserializerBase {
public:
@@ -494,6 +534,53 @@ private:
const TVector<std::unique_ptr<TBlockDeserializerBase>> Children_;
};
+template<typename TDate, bool Nullable>
+class TTzDateBlockDeserializer final : public TBlockDeserializerBase {
+public:
+ TTzDateBlockDeserializer() = default;
+
+private:
+ void DoLoadMetadata(const TMetadataSource& metaSource) final {
+ DateDeserialiser_.LoadMetadata(metaSource);
+ TzDeserialiser_.LoadMetadata(metaSource);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ std::vector<std::shared_ptr<arrow::ArrayData>> childData;
+ childData.emplace_back(DateDeserialiser_.MakeDefaultValue(blockLen, offset));
+ childData.emplace_back(TzDeserialiser_.MakeDefaultValue(blockLen, offset));
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::vector<std::shared_ptr<arrow::ArrayData>> childData;
+ childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset));
+ childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset));
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ }
+
+ void DoResetMetadata() final {
+ DateDeserialiser_.ResetMetadata();
+ TzDeserialiser_.ResetMetadata();
+ }
+
+ bool IsNullable() const final {
+ return Nullable;
+ }
+
+ void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final {
+ YQL_ENSURE(type->fields().size() == 2);
+ ArrowType_ = type;
+ DateDeserialiser_.SetArrowType(type->field(0)->type());
+ TzDeserialiser_.SetArrowType(type->field(1)->type());
+ }
+
+ using TDateLayout = typename NUdf::TDataType<TDate>::TLayout;
+
+ TFixedSizeBlockDeserializer<sizeof(TDateLayout), false> DateDeserialiser_;
+ TFixedSizeBlockDeserializer<sizeof(NYql::NUdf::TTimezoneId), false> TzDeserialiser_;
+};
+
struct TSerializerTraits {
using TResult = IBlockSerializer;
template <bool Nullable>
@@ -503,6 +590,9 @@ struct TSerializerTraits {
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
using TStrings = TStringBlockSerializer<TStringType, Nullable>;
using TExtOptional = TExtOptionalBlockSerializer;
+ template<typename TTzDateType, bool Nullable>
+ using TTzDate = TTzDateBlockSerializer<TTzDateType, Nullable>;
+
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
@@ -516,6 +606,16 @@ struct TSerializerTraits {
Y_UNUSED(isOptional);
ythrow yexception() << "Serializer not implemented for block resources";
}
+
+ template<typename TTzDateType>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ if (isOptional) {
+ return std::make_unique<TTzDate<TTzDateType, true>>();
+ }
+ else {
+ return std::make_unique<TTzDate<TTzDateType, false>>();
+ }
+ }
};
struct TDeserializerTraits {
@@ -527,6 +627,8 @@ struct TDeserializerTraits {
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
using TStrings = TStringBlockDeserializer<TStringType, Nullable>;
using TExtOptional = TExtOptionalBlockDeserializer;
+ template<typename TTzDateType, bool Nullable>
+ using TTzDate = TTzDateBlockDeserializer<TTzDateType, Nullable>;
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
@@ -540,6 +642,16 @@ struct TDeserializerTraits {
Y_UNUSED(isOptional);
ythrow yexception() << "Deserializer not implemented for block resources";
}
+
+ template<typename TTzDateType>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ if (isOptional) {
+ return std::make_unique<TTzDate<TTzDateType, true>>();
+ }
+ else {
+ return std::make_unique<TTzDate<TTzDateType, false>>();
+ }
+ }
};
} // namespace
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
index 3e0e7117c4..ea4802cd67 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -665,6 +665,9 @@ protected:
auto scalarOptStrType = PgmBuilder.NewBlockType(optStrType, TBlockType::EShape::Scalar);
auto blockOptTupleOptUi32StrType = PgmBuilder.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many);
auto scalarUi64Type = PgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+
+ auto tzDateType = PgmBuilder.NewDataType(NUdf::EDataSlot::TzDate);
+ auto blockTzDateType = PgmBuilder.NewBlockType(tzDateType, TBlockType::EShape::Many);
auto rowType =
legacyStruct
@@ -674,10 +677,11 @@ protected:
{"_yql_block_length", scalarUi64Type},
{"a", scalarOptStrType},
{"b", blockOptTupleOptUi32StrType},
+ {"c", blockTzDateType}
})
: PgmBuilder.NewMultiType(
{blockUi32Type, blockOptStrType, scalarOptStrType,
- blockOptTupleOptUi32StrType, scalarUi64Type});
+ blockOptTupleOptUi32StrType, blockTzDateType, scalarUi64Type});
ui64 blockLen = 1000;
UNIT_ASSERT_LE(offset + len, blockLen);
@@ -685,6 +689,7 @@ protected:
auto builder1 = MakeArrayBuilder(TTypeInfoHelper(), ui32Type, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(ui32Type)), nullptr);
auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr);
auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr);
+ auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr);
for (ui32 i = 0; i < blockLen; ++i) {
TBlockItem b1(i);
@@ -697,6 +702,10 @@ protected:
TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) };
TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem();
builder3->Add(b3);
+
+ TBlockItem tzDate {i};
+ tzDate.SetTimezoneId(i % 100);
+ builder4->Add(tzDate);
}
std::string_view testScalarString = "foobar";
@@ -709,11 +718,13 @@ protected:
datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
datums.emplace_back(builder3->Build(true));
+ datums.emplace_back(builder4->Build(true));
} else {
datums.emplace_back(builder1->Build(true));
datums.emplace_back(builder2->Build(true));
datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
datums.emplace_back(builder3->Build(true));
+ datums.emplace_back(builder4->Build(true));
datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
}
@@ -767,6 +778,7 @@ protected:
auto reader1 = MakeBlockReader(TTypeInfoHelper(), ui32Type);
auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType);
auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType);
+ auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType);
for (ui32 i = offset; i < len; ++i) {
TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset);
@@ -792,6 +804,10 @@ protected:
} else {
UNIT_ASSERT(!b3);
}
+
+ TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset);
+ UNIT_ASSERT(b4.Get<ui16>() == i);
+ UNIT_ASSERT(b4.GetTimezoneId() == (i % 100));
}
}
}
diff --git a/ydb/library/yql/minikql/datetime/datetime.h b/ydb/library/yql/minikql/datetime/datetime.h
index 71363b27cd..a86f28060c 100644
--- a/ydb/library/yql/minikql/datetime/datetime.h
+++ b/ydb/library/yql/minikql/datetime/datetime.h
@@ -21,6 +21,7 @@ struct TTMStorage {
unsigned int Second : 6;
unsigned int Microsecond : 20;
unsigned int TimezoneId : 16;
+ ui8 Reserved[2];
TTMStorage() {
Zero(*this);
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index dc946c4322..38e536688e 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1455,9 +1455,28 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
case NUdf::EDataSlot::Json:
type = arrow::utf8();
return true;
- default:
+ case NUdf::EDataSlot::TzDate: {
+ type = MakeTzDateArrowType<NYql::NUdf::EDataSlot::TzDate>();
+ return true;
+ }
+ case NUdf::EDataSlot::TzDatetime: {
+ type = MakeTzDateArrowType<NYql::NUdf::EDataSlot::TzDatetime>();
+ return true;
+ }
+ case NUdf::EDataSlot::TzTimestamp: {
+ type = MakeTzDateArrowType<NYql::NUdf::EDataSlot::TzTimestamp>();
+ return true;
+ }
+ case NUdf::EDataSlot::Uuid: {
return false;
}
+ case NUdf::EDataSlot::Decimal: {
+ return false;
+ }
+ case NUdf::EDataSlot::DyNumber: {
+ return false;
+ }
+ }
}
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type) {
@@ -2415,9 +2434,22 @@ size_t CalcMaxBlockItemSize(const TType* type) {
case NUdf::EDataSlot::Json:
// size of offset part
return sizeof(arrow::StringType::offset_type);
- default:
+ case NUdf::EDataSlot::TzDate:
+ return sizeof(typename NUdf::TDataType<NUdf::TTzDate>::TLayout) + sizeof(NYql::NUdf::TTimezoneId);
+ case NUdf::EDataSlot::TzDatetime:
+ return sizeof(typename NUdf::TDataType<NUdf::TTzDatetime>::TLayout) + sizeof(NYql::NUdf::TTimezoneId);
+ case NUdf::EDataSlot::TzTimestamp:
+ return sizeof(typename NUdf::TDataType<NUdf::TTzTimestamp>::TLayout) + sizeof(NYql::NUdf::TTimezoneId);
+ case NUdf::EDataSlot::Uuid: {
MKQL_ENSURE(false, "Unsupported data slot: " << slot);
}
+ case NUdf::EDataSlot::Decimal: {
+ MKQL_ENSURE(false, "Unsupported data slot: " << slot);
+ }
+ case NUdf::EDataSlot::DyNumber: {
+ MKQL_ENSURE(false, "Unsupported data slot: " << slot);
+ }
+ }
}
MKQL_ENSURE(false, "Unsupported type: " << *type);
@@ -2432,6 +2464,8 @@ struct TComparatorTraits {
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
using TStrings = NUdf::TStringBlockItemComparator<TStringType, Nullable>;
using TExtOptional = NUdf::TExternalOptionalBlockItemComparator;
+ template <typename T, bool Nullable>
+ using TTzDateComparator = NUdf::TTzDateBlockItemComparator<T, Nullable>;
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
@@ -2442,6 +2476,15 @@ struct TComparatorTraits {
Y_UNUSED(isOptional);
ythrow yexception() << "Comparator not implemented for block resources: ";
}
+
+ template<typename TTzDate>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ if (isOptional) {
+ return std::make_unique<TTzDateComparator<TTzDate, true>>();
+ } else {
+ return std::make_unique<TTzDateComparator<TTzDate, false>>();
+ }
+ }
};
struct THasherTraits {
@@ -2453,6 +2496,8 @@ struct THasherTraits {
template <typename TStringType, bool Nullable, NUdf::EDataSlot TOriginal = NUdf::EDataSlot::String>
using TStrings = NUdf::TStringBlockItemHasher<TStringType, Nullable>;
using TExtOptional = NUdf::TExternalOptionalBlockItemHasher;
+ template <typename T, bool Nullable>
+ using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher<T, Nullable>;
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
@@ -2463,6 +2508,15 @@ struct THasherTraits {
Y_UNUSED(isOptional);
ythrow yexception() << "Hasher not implemented for block resources";
}
+
+ template<typename TTzDate>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ if (isOptional) {
+ return std::make_unique<TTzDateHasher<TTzDate, true>>();
+ } else {
+ return std::make_unique<TTzDateHasher<TTzDate, false>>();
+ }
+ }
};
NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const {
diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h
index 2b234d7ee7..7199bc8360 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.h
+++ b/ydb/library/yql/minikql/mkql_type_builder.h
@@ -33,6 +33,31 @@ inline size_t CalcBlockLen(size_t maxBlockItemSize) {
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type);
bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type);
+template<NUdf::EDataSlot slot>
+std::shared_ptr<arrow::DataType> MakeTzLayoutArrowType() {
+ static_assert(slot == NUdf::EDataSlot::TzDate || slot == NUdf::EDataSlot::TzDatetime || slot == NUdf::EDataSlot::TzTimestamp,
+ "Expected tz date type slot");
+
+ if constexpr (slot == NUdf::EDataSlot::TzDate) {
+ return arrow::uint16();
+ }
+ if constexpr (slot == NUdf::EDataSlot::TzDatetime) {
+ return arrow::uint32();
+ }
+ if constexpr (slot == NUdf::EDataSlot::TzTimestamp) {
+ return arrow::uint64();
+ }
+}
+
+template<NUdf::EDataSlot slot>
+std::shared_ptr<arrow::StructType> MakeTzDateArrowType() {
+ std::vector<std::shared_ptr<arrow::Field>> fields {
+ std::make_shared<arrow::Field>("datetime", MakeTzLayoutArrowType<slot>()),
+ std::make_shared<arrow::Field>("timezoneId", arrow::uint16()),
+ };
+ return std::make_shared<arrow::StructType>(fields);
+}
+
class TArrowType : public NUdf::IArrowType {
public:
TArrowType(const std::shared_ptr<arrow::DataType>& type)
diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp
index c273fc0eac..f1b7e03847 100644
--- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp
+++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp
@@ -5,6 +5,7 @@
#include <ydb/library/yql/public/udf/arrow/block_reader.h>
#include <ydb/library/yql/utils/yql_panic.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/minikql/mkql_type_ops.h>
#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/yson/detail.h>
@@ -155,6 +156,10 @@ public:
const char* Data() {
return Data_;
}
+
+ size_t Available() const {
+ return Available_;
+ }
private:
const char* Data_;
size_t Available_;
@@ -314,9 +319,41 @@ public:
return ReadYson(buf);
}
}
-private:
- const TVector<std::unique_ptr<IYsonBlockReader>> Children_;
- TVector<NUdf::TBlockItem> Items_;
+};
+
+template<typename T, bool Nullable, bool Native>
+class TYsonTzDateBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
+public:
+ NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final {
+ if constexpr (Nullable) {
+ return this->GetNullableItem(buf);
+ }
+ return GetNotNull(buf);
+ }
+
+ NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final {
+ using TLayout = typename NUdf::TDataType<T>::TLayout;
+ size_t length = sizeof(TLayout) + sizeof(NUdf::TTimezoneId);
+ Y_ASSERT(buf.Available() == length);
+
+ TLayout date;
+ NUdf::TTimezoneId tz;
+
+ if constexpr (std::is_same_v<T, NUdf::TTzDate>) {
+ DeserializeTzDate({buf.Data(), length}, date, tz);
+ } else if constexpr (std::is_same_v<T, NUdf::TTzDatetime>) {
+ DeserializeTzDatetime({buf.Data(), length}, date, tz);
+ } else if constexpr (std::is_same_v<T, NUdf::TTzTimestamp>) {
+ DeserializeTzTimestamp({buf.Data(), length}, date, tz);
+ } else {
+ static_assert(sizeof(T) == 0, "Unsupported tz date type");
+ }
+
+ buf.Skip(length);
+ NUdf::TBlockItem res {date};
+ res.SetTimezoneId(tz);
+ return res;
+ }
};
namespace {
@@ -374,9 +411,6 @@ public:
buf.Next();
return NUdf::TBlockItem(T(buf.NextDouble()));
}
-private:
- const TVector<std::unique_ptr<IYsonBlockReader>> Children_;
- TVector<NUdf::TBlockItem> Items_;
};
template<bool Native>
@@ -439,6 +473,18 @@ struct TYsonBlockReaderTraits {
Y_UNUSED(isOptional);
ythrow yexception() << "Yson reader not implemented for block resources";
}
+
+ template<typename TTzDate>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ Y_UNUSED(isOptional);
+ if (isOptional) {
+ using TTzDateReader = TYsonTzDateBlockReader<TTzDate, true, Native>;
+ return std::make_unique<TTzDateReader>();
+ } else {
+ using TTzDateReader = TYsonTzDateBlockReader<TTzDate, false, Native>;
+ return std::make_unique<TTzDateReader>();
+ }
+ }
};
template<bool IsDictionary>
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index 7c86083ec9..10fa0a6f35 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -8,6 +8,7 @@
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/public/udf/udf_value_builder.h>
#include <ydb/library/yql/public/udf/udf_type_inspection.h>
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <arrow/datum.h>
#include <arrow/c/bridge.h>
@@ -516,15 +517,16 @@ private:
template<typename TLayout, bool Nullable>
class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase<TLayout, Nullable, TFixedSizeArrayBuilder<TLayout, Nullable>> {
- using TDerived = TFixedSizeArrayBuilder<TLayout, Nullable>;
+ using TSelf = TFixedSizeArrayBuilder<TLayout, Nullable>;
+ using TBase = TFixedSizeArrayBuilderBase<TLayout, Nullable, TSelf>;
public:
TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
- : TFixedSizeArrayBuilderBase<TLayout, Nullable, TDerived>(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
+ : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen, totalAllocated)
{}
TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
- : TFixedSizeArrayBuilderBase<TLayout, Nullable, TDerived>(typeInfoHelper, type, pool, maxLen, totalAllocated)
+ : TBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
{}
void DoAddNotNull(TUnboxedValuePod value) {
@@ -936,13 +938,11 @@ private:
i32 TypeLen = 0;
};
-template<bool Nullable>
-class TTupleArrayBuilder final : public TArrayBuilderBase {
+template<bool Nullable, typename TDerived>
+class TTupleArrayBuilderBase : public TArrayBuilderBase {
public:
- TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
- TVector<TArrayBuilderBase::Ptr>&& children, size_t* totalAllocated)
+ TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
: TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, totalAllocated)
- , Children(std::move(children))
{
Reserve();
}
@@ -951,70 +951,50 @@ public:
if constexpr (Nullable) {
if (!value) {
NullBuilder->UnsafeAppend(0);
- for (ui32 i = 0; i < Children.size(); ++i) {
- Children[i]->AddDefault();
- }
+ static_cast<TDerived*>(this)->AddToChildrenDefault();
return;
}
NullBuilder->UnsafeAppend(1);
}
- auto elements = value.GetElements();
- if (elements) {
- for (ui32 i = 0; i < Children.size(); ++i) {
- Children[i]->Add(elements[i]);
- }
- } else {
- for (ui32 i = 0; i < Children.size(); ++i) {
- auto element = value.GetElement(i);
- Children[i]->Add(element);
- }
- }
+ static_cast<TDerived*>(this)->AddToChildren(value);
}
void DoAdd(TBlockItem value) final {
if constexpr (Nullable) {
if (!value) {
NullBuilder->UnsafeAppend(0);
- for (ui32 i = 0; i < Children.size(); ++i) {
- Children[i]->AddDefault();
- }
+ static_cast<TDerived*>(this)->AddToChildrenDefault();
return;
}
NullBuilder->UnsafeAppend(1);
}
- auto elements = value.AsTuple();
- for (ui32 i = 0; i < Children.size(); ++i) {
- Children[i]->Add(elements[i]);
- }
+ static_cast<TDerived*>(this)->AddToChildren(value);
}
void DoAdd(TInputBuffer& input) final {
if constexpr (Nullable) {
if (!input.PopChar()) {
- return DoAdd(TBlockItem{});
+ NullBuilder->UnsafeAppend(0);
+ static_cast<TDerived*>(this)->AddToChildrenDefault();
+ return;
}
NullBuilder->UnsafeAppend(1);
}
- for (ui32 i = 0; i < Children.size(); ++i) {
- Children[i]->Add(input);
- }
+ static_cast<TDerived*>(this)->AddToChildren(input);
}
void DoAddDefault() final {
if constexpr (Nullable) {
NullBuilder->UnsafeAppend(1);
}
- for (ui32 i = 0; i < Children.size(); ++i) {
- Children[i]->AddDefault();
- }
+ static_cast<TDerived*>(this)->AddToChildrenDefault();
}
void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
Y_ABORT_UNLESS(!array.buffers.empty());
- Y_ABORT_UNLESS(array.child_data.size() == Children.size());
if constexpr (Nullable) {
if (array.buffers.front()) {
@@ -1026,14 +1006,11 @@ public:
}
}
- for (size_t i = 0; i < Children.size(); ++i) {
- Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
- }
+ static_cast<TDerived*>(this)->AddManyToChildren(array, sparseBitmap, popCount);
}
void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
Y_ABORT_UNLESS(!array.buffers.empty());
- Y_ABORT_UNLESS(array.child_data.size() == Children.size());
if constexpr (Nullable) {
for (ui64 i = beginIndex; i < beginIndex + count; ++i) {
@@ -1041,14 +1018,11 @@ public:
}
}
- for (size_t i = 0; i < Children.size(); ++i) {
- Children[i]->AddMany(*array.child_data[i], beginIndex, count);
- }
+ static_cast<TDerived*>(this)->AddManyToChildren(array, beginIndex, count);
}
void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
Y_ABORT_UNLESS(!array.buffers.empty());
- Y_ABORT_UNLESS(array.child_data.size() == Children.size());
if constexpr (Nullable) {
for (size_t i = 0; i < count; ++i) {
@@ -1056,9 +1030,7 @@ public:
}
}
- for (size_t i = 0; i < Children.size(); ++i) {
- Children[i]->AddMany(*array.child_data[i], indexes, count);
- }
+ static_cast<TDerived*>(this)->AddManyToChildren(array, indexes, count);
}
TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
@@ -1074,10 +1046,7 @@ public:
Y_ABORT_UNLESS(length);
result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap }));
- result->Children.reserve(Children.size());
- for (ui32 i = 0; i < Children.size(); ++i) {
- result->Children.emplace_back(Children[i]->BuildTree(finish));
- }
+ static_cast<TDerived*>(this)->BuildChildrenTree(finish, result->Children);
if (!finish) {
Reserve();
@@ -1104,10 +1073,145 @@ private:
}
private:
- TVector<std::unique_ptr<TArrayBuilderBase>> Children;
std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
};
+template<bool Nullable>
+class TTupleArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>> {
+public:
+
+ TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
+ TVector<TArrayBuilderBase::Ptr>&& children, size_t* totalAllocated = nullptr)
+ : TTupleArrayBuilderBase<Nullable, TTupleArrayBuilder<Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated)
+ , Children_(std::move(children)) {}
+
+ void AddToChildrenDefault() {
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ Children_[i]->AddDefault();
+ }
+ }
+
+ void AddToChildren(NUdf::TUnboxedValuePod value) {
+ auto elements = value.GetElements();
+ if (elements) {
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ Children_[i]->Add(elements[i]);
+ }
+ } else {
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ auto element = value.GetElement(i);
+ Children_[i]->Add(element);
+ }
+ }
+ }
+
+ void AddToChildren(TBlockItem value) {
+ auto elements = value.AsTuple();
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ Children_[i]->Add(elements[i]);
+ }
+ }
+
+ void AddToChildren(TInputBuffer& input) {
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ Children_[i]->Add(input);
+ }
+ }
+
+ void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
+ Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length);
+ }
+ }
+
+ void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
+ Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->AddMany(*array.child_data[i], beginIndex, count);
+ }
+ }
+
+ void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
+ Y_ABORT_UNLESS(array.child_data.size() == Children_.size());
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->AddMany(*array.child_data[i], indexes, count);
+ }
+ }
+
+ void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
+ resultChildren.reserve(Children_.size());
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ resultChildren.emplace_back(Children_[i]->BuildTree(finish));
+ }
+ }
+
+private:
+TVector<std::unique_ptr<TArrayBuilderBase>> Children_;
+};
+
+template<typename TDate, bool Nullable>
+class TTzDateArrayBuilder final : public TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>> {
+ using TDateLayout = typename TDataType<TDate>::TLayout;
+ static constexpr auto DataSlot = TDataType<TDate>::Slot;
+
+public:
+ TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, size_t* totalAllocated = nullptr)
+ : TTupleArrayBuilderBase<Nullable, TTzDateArrayBuilder<TDate, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated)
+ , DateBuilder_(typeInfoHelper, NKikimr::NMiniKQL::MakeTzLayoutArrowType<DataSlot>(), pool, maxLen)
+ , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen)
+ {
+ }
+
+ void AddToChildrenDefault() {
+ DateBuilder_.AddDefault();
+ TimezoneBuilder_.AddDefault();
+ }
+
+ void AddToChildren(NUdf::TUnboxedValuePod value) {
+ DateBuilder_.Add(value);
+ TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
+ }
+
+ void AddToChildren(TBlockItem value) {
+ DateBuilder_.Add(value);
+ TimezoneBuilder_.Add(TBlockItem(value.GetTimezoneId()));
+ }
+
+ void AddToChildren(TInputBuffer& input) {
+ DateBuilder_.Add(input);
+ TimezoneBuilder_.Add(input);
+ }
+
+ void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) {
+ Y_ABORT_UNLESS(array.child_data.size() == 2);
+ DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length);
+ TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length);
+ }
+
+ void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) {
+ Y_ABORT_UNLESS(array.child_data.size() == 2);
+ DateBuilder_.AddMany(*array.child_data[0], beginIndex, count);
+ TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count);
+ }
+
+ void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) {
+ Y_ABORT_UNLESS(array.child_data.size() == 2);
+ DateBuilder_.AddMany(*array.child_data[0], indexes, count);
+ TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count);
+ }
+
+ void BuildChildrenTree(bool finish, std::vector<TArrayBuilderBase::TBlockArrayTree::Ptr>& resultChildren) {
+ resultChildren.emplace_back(DateBuilder_.BuildTree(finish));
+ resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish));
+ }
+
+private:
+ TFixedSizeArrayBuilder<TDateLayout, false> DateBuilder_;
+ TFixedSizeArrayBuilder<ui16, false> TimezoneBuilder_;
+};
+
+
class TExternalOptionalArrayBuilder final : public TArrayBuilderBase {
public:
TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen,
@@ -1306,7 +1410,13 @@ inline std::unique_ptr<TArrayBuilderBase> MakeArrayBuilderImpl(
return std::make_unique<TStringArrayBuilder<arrow::BinaryType, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
case NUdf::EDataSlot::Utf8:
case NUdf::EDataSlot::Json:
- return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen, totalAllocated);
+ return std::make_unique<TStringArrayBuilder<arrow::StringType, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::TzDate:
+ return std::make_unique<TTzDateArrayBuilder<TTzDate, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::TzDatetime:
+ return std::make_unique<TTzDateArrayBuilder<TTzDatetime, Nullable>>(typeInfoHelper, type, pool, maxLen);
+ case NUdf::EDataSlot::TzTimestamp:
+ return std::make_unique<TTzDateArrayBuilder<TTzTimestamp, Nullable>>(typeInfoHelper, type, pool, maxLen);
default:
Y_ENSURE(false, "Unsupported data slot");
}
diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h
index 15edabe27a..f04a25666b 100644
--- a/ydb/library/yql/public/udf/arrow/block_item.h
+++ b/ydb/library/yql/public/udf/arrow/block_item.h
@@ -155,6 +155,15 @@ public:
bool IsBoxed() const { return EMarkers::Boxed == GetMarkers(); }
bool IsEmbedded() const { return EMarkers::Embedded == GetMarkers(); }
+ inline void SetTimezoneId(ui16 id) {
+ UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime");
+ Raw.Simple.TimezoneId = id;
+ }
+
+ inline ui16 GetTimezoneId() const {
+ UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime");
+ return Raw.Simple.TimezoneId;
+ }
private:
union TRaw {
@@ -180,7 +189,8 @@ private:
union {
ui64 FullMeta;
struct {
- ui8 Reserved[7];
+ TTimezoneId TimezoneId;
+ ui8 Reserved[5];
ui8 Meta;
};
};
diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h
index 98ca57d7f9..3431932d92 100644
--- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h
+++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h
@@ -3,6 +3,7 @@
#include "block_item.h"
#include "block_reader.h"
+
#include <ydb/library/yql/public/udf/udf_ptr.h>
#include <ydb/library/yql/public/udf/udf_type_inspection.h>
#include <ydb/library/yql/public/udf/udf_type_size_check.h>
@@ -148,6 +149,39 @@ public:
}
};
+template<typename TTzType, bool Nullable>
+class TTzDateBlockItemComparator : public TBlockItemComparatorBase<TTzDateBlockItemComparator<TTzType, Nullable>, Nullable> {
+ using TLayout = typename TDataType<TTzType>::TLayout;
+
+public:
+ bool DoCompare(TBlockItem lhs, TBlockItem rhs) const {
+ const auto x = lhs.Get<TLayout>();
+ const auto y = rhs.Get<TLayout>();
+
+ if (x == y) {
+ const auto tx = lhs.GetTimezoneId();
+ const auto ty = rhs.GetTimezoneId();
+ return (tx == ty) ? 0 : (tx < ty ? -1 : 1);
+ }
+
+ if (x < y) {
+ return -1;
+ }
+
+ return 1;
+ }
+
+ bool DoEquals(TBlockItem lhs, TBlockItem rhs) const {
+ return lhs.Get<TLayout>() == rhs.Get<TLayout>() && lhs.GetTimezoneId() == rhs.GetTimezoneId();
+ }
+
+
+ bool DoLess(TBlockItem lhs, TBlockItem rhs) const {
+ return std::forward_as_tuple(lhs.Get<TLayout>(), lhs.GetTimezoneId()) < std::forward_as_tuple(rhs.Get<TLayout>(), rhs.GetTimezoneId());
+ }
+};
+
+
template <bool Nullable>
class TTupleBlockItemComparator : public TBlockItemComparatorBase<TTupleBlockItemComparator<Nullable>, Nullable> {
public:
diff --git a/ydb/library/yql/public/udf/arrow/block_item_hasher.h b/ydb/library/yql/public/udf/arrow/block_item_hasher.h
index 4c3d89e998..a173e7940f 100644
--- a/ydb/library/yql/public/udf/arrow/block_item_hasher.h
+++ b/ydb/library/yql/public/udf/arrow/block_item_hasher.h
@@ -49,6 +49,17 @@ public:
}
};
+template <typename T, bool Nullable>
+class TTzDateBlockItemHasher : public TBlockItemHasherBase<TTzDateBlockItemHasher<T, Nullable>, Nullable> {
+public:
+ ui64 DoHash(TBlockItem value) const {
+ using TLayout = typename TDataType<T>::TLayout;
+ TUnboxedValuePod uv {value.Get<TLayout>()};
+ uv.SetTimezoneId(value.GetTimezoneId());
+ return GetValueHash<TDataType<T>::Slot>(uv);
+ }
+};
+
template <typename TStringType, bool Nullable>
class TStringBlockItemHasher : public TBlockItemHasherBase<TStringBlockItemHasher<TStringType, Nullable>, Nullable> {
public:
diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h
index d42c9f16c7..3eaeb62f73 100644
--- a/ydb/library/yql/public/udf/arrow/block_reader.h
+++ b/ydb/library/yql/public/udf/arrow/block_reader.h
@@ -203,26 +203,16 @@ public:
}
};
-template<bool Nullable>
-class TTupleBlockReader final : public IBlockReader {
+template<bool Nullable, typename TDerived>
+class TTupleBlockReaderBase : public IBlockReader {
public:
- TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
- : Children(std::move(children))
- , Items(Children.size())
- {}
-
TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
if constexpr (Nullable) {
if (IsNull(data, index)) {
return {};
}
}
-
- for (ui32 i = 0; i < Children.size(); ++i) {
- Items[i] = Children[i]->GetItem(*data.child_data[i], index);
- }
-
- return TBlockItem(Items.data());
+ return static_cast<TDerived*>(this)->GetChildrenItems(data, index);
}
TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
@@ -233,33 +223,87 @@ public:
}
const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
+ return static_cast<TDerived*>(this)->GetChildrenScalarItems(structScalar);
+ }
- for (ui32 i = 0; i < Children.size(); ++i) {
- Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
+ ui64 GetDataWeight(const arrow::ArrayData& data) const final {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size += data.length;
}
- return TBlockItem(Items.data());
+ size += static_cast<const TDerived*>(this)->GetChildrenDataWeight(data);
+ return size;
}
- ui64 GetDataWeight(const arrow::ArrayData& data) const final {
+ ui64 GetDataWeight(TBlockItem item) const final {
+ return static_cast<const TDerived*>(this)->GetDataWeightImpl(item);
+ }
+
+ ui64 GetDefaultValueWeight() const final {
ui64 size = 0;
if constexpr (Nullable) {
- size += data.length;
+ size = 1;
+ }
+ size += static_cast<const TDerived*>(this)->GetChildrenDefaultDataWeight();
+ return size;
+ }
+
+ void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
+ if constexpr (Nullable) {
+ if (IsNull(data, index)) {
+ return out.PushChar(0);
+ }
+ out.PushChar(1);
}
+ static_cast<const TDerived*>(this)->SaveChildrenItems(data, index, out);
+ }
+
+ void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
+ if constexpr (Nullable) {
+ if (!scalar.is_valid) {
+ return out.PushChar(0);
+ }
+ out.PushChar(1);
+ }
+
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
+
+ static_cast<const TDerived*>(this)->SaveChildrenScalarItems(structScalar, out);
+ }
+};
+
+template<bool Nullable>
+class TTupleBlockReader final : public TTupleBlockReaderBase<Nullable, TTupleBlockReader<Nullable>> {
+public:
+ TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
+ : Children(std::move(children))
+ , Items(Children.size())
+ {}
+
+ TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) {
for (ui32 i = 0; i < Children.size(); ++i) {
- size += Children[i]->GetDataWeight(*data.child_data[i]);
+ Items[i] = Children[i]->GetItem(*data.child_data[i], index);
}
- return size;
+ return TBlockItem(Items.data());
}
- ui64 GetDataWeight(TBlockItem item) const final {
+ TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) {
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
+ }
+
+ return TBlockItem(Items.data());
+ }
+
+ size_t GetDataWeightImpl(const TBlockItem& item) const {
const TBlockItem* items = nullptr;
ui64 size = 0;
if constexpr (Nullable) {
if (!item) {
- return GetDefaultValueWeight();
+ return this->GetDefaultValueWeight();
}
size = 1;
items = item.GetOptionalValue().GetElements();
@@ -274,40 +318,30 @@ public:
return size;
}
- ui64 GetDefaultValueWeight() const final {
- ui64 size = 0;
- if constexpr (Nullable) {
- size = 1;
- }
+ size_t GetChildrenDataWeight(const arrow::ArrayData& data) const {
+ size_t size = 0;
for (ui32 i = 0; i < Children.size(); ++i) {
- size += Children[i]->GetDefaultValueWeight();
+ size += Children[i]->GetDataWeight(*data.child_data[i]);
}
+
return size;
}
- void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final {
- if constexpr (Nullable) {
- if (IsNull(data, index)) {
- return out.PushChar(0);
- }
- out.PushChar(1);
+ size_t GetChildrenDefaultDataWeight() const {
+ size_t size = 0;
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ size += Children[i]->GetDefaultValueWeight();
}
+ return size;
+ }
+ void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const {
for (ui32 i = 0; i < Children.size(); ++i) {
Children[i]->SaveItem(*data.child_data[i], index, out);
}
}
-
- void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final {
- if constexpr (Nullable) {
- if (!scalar.is_valid) {
- return out.PushChar(0);
- }
- out.PushChar(1);
- }
-
- const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
-
+
+ void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
for (ui32 i = 0; i < Children.size(); ++i) {
Children[i]->SaveScalarItem(*structScalar.value[i], out);
}
@@ -318,6 +352,65 @@ private:
TVector<TBlockItem> Items;
};
+template<typename TTzDate, bool Nullable>
+class TTzDateBlockReader final : public TTupleBlockReaderBase<Nullable, TTzDateBlockReader<TTzDate, Nullable>> {
+public:
+ TBlockItem GetChildrenItems(const arrow::ArrayData& data, size_t index) {
+ Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
+
+ TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)};
+ item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get<ui16>());
+ return item;
+ }
+
+ TBlockItem GetChildrenScalarItems(const arrow::StructScalar& structScalar) {
+ Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2);
+
+ TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])};
+ item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get<ui16>());
+ return item;
+ }
+
+ size_t GetChildrenDataWeight(const arrow::ArrayData& data) const {
+ Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2);
+
+ size_t size = 0;
+ size += DateReader_.GetDataWeight(*data.child_data[0]);
+ size += TimezoneReader_.GetDataWeight(*data.child_data[1]);
+ return size;
+ }
+
+ size_t GetDataWeightImpl(const TBlockItem& item) const {
+ Y_UNUSED(item);
+ return GetChildrenDefaultDataWeight();
+ }
+
+ size_t GetChildrenDefaultDataWeight() const {
+ ui64 size = 0;
+ if constexpr (Nullable) {
+ size = 1;
+ }
+
+ size += DateReader_.GetDefaultValueWeight();
+ size += TimezoneReader_.GetDefaultValueWeight();
+ return size;
+ }
+
+ void SaveChildrenItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const {
+ DateReader_.SaveItem(*data.child_data[0], index, out);
+ TimezoneReader_.SaveItem(*data.child_data[1], index, out);
+ }
+
+ void SaveChildrenScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const {
+ DateReader_.SaveScalarItem(*structScalar.value[0], out);
+ TimezoneReader_.SaveScalarItem(*structScalar.value[1], out);
+ }
+
+private:
+ TFixedSizeBlockReader<typename TDataType<TTzDate>::TLayout, /* Nullable */false> DateReader_;
+ TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_;
+};
+
class TExternalOptionalBlockReader final : public IBlockReader {
public:
TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
@@ -390,6 +483,8 @@ struct TReaderTraits {
using TExtOptional = TExternalOptionalBlockReader;
template<bool Nullable>
using TResource = TResourceBlockReader<Nullable>;
+ template<typename TTzDate, bool Nullable>
+ using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>;
static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
@@ -407,6 +502,15 @@ struct TReaderTraits {
return std::make_unique<TResource<false>>();
}
}
+
+ template<typename TTzDate>
+ static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
+ if (isOptional) {
+ return std::make_unique<TTzDateReader<TTzDate, true>>();
+ } else {
+ return std::make_unique<TTzDateReader<TTzDate, false>>();
+ }
+ }
};
template <typename TTraits>
@@ -544,7 +648,15 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional);
case NUdf::EDataSlot::Json:
return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional);
- default:
+ case NUdf::EDataSlot::TzDate:
+ return TTraits::template MakeTzDate<TTzDate>(isOptional);
+ case NUdf::EDataSlot::TzDatetime:
+ return TTraits::template MakeTzDate<TTzDatetime>(isOptional);
+ case NUdf::EDataSlot::TzTimestamp:
+ return TTraits::template MakeTzDate<TTzTimestamp>(isOptional);
+ case NUdf::EDataSlot::Uuid:
+ case NUdf::EDataSlot::Decimal:
+ case NUdf::EDataSlot::DyNumber:
Y_ENSURE(false, "Unsupported data slot");
}
}
@@ -606,7 +718,10 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper,
if (dataTypeInfo.Features & StringType) {
props.MaxSize = {};
props.IsFixed = false;
- } else {
+ } else if (dataTypeInfo.Features & TzDateType) {
+ *props.MaxSize += dataTypeInfo.FixedSize + sizeof(TTimezoneId);
+ }
+ else {
*props.MaxSize += dataTypeInfo.FixedSize;
}
return;
diff --git a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
index 669e6386b2..2231ff239e 100644
--- a/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
+++ b/ydb/library/yql/public/udf/arrow/udf_arrow_helpers.h
@@ -533,6 +533,25 @@ arrow::Status UnaryPreallocatedExecImpl(arrow::compute::KernelContext* ctx, cons
return arrow::Status::OK();
}
+
+template <typename TReader, typename TOutput, TOutput(*Core)(TBlockItem)>
+arrow::Status UnaryPreallocatedReaderExecImpl(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
+ Y_UNUSED(ctx);
+ static_assert(std::is_base_of_v<IBlockReader, TReader>);
+ TReader reader;
+
+ auto& inArray = batch.values[0].array();
+ auto& outArray = res->array();
+ TOutput* outValues = outArray->GetMutableValues<TOutput>(1);
+ auto length = inArray->length;
+ for (int64_t i = 0; i < length; ++i) {
+ auto item = reader.GetItem(*inArray, i);
+ outValues[i] = Core(item);
+ }
+
+ return arrow::Status::OK();
+}
+
template<typename TInput, typename TOutput, std::pair<TOutput, bool> Core(TInput)>
struct TUnaryUnsafeFixedSizeFilterKernel {
static arrow::Status Do(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
diff --git a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp
index fcaa281584..a915bb7814 100644
--- a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp
+++ b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp
@@ -175,6 +175,30 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
UNIT_ASSERT_VALUES_EQUAL(resource2->GetResourceTag(), ResourceName);
}
+ Y_UNIT_TEST(TestTzDateBuilder_Layout) {
+ TArrayBuilderTestData data;
+ const auto tzDateType = data.PgmBuilder.NewDataType(EDataSlot::TzDate);
+ const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType,
+ *data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */ nullptr);
+
+ auto makeTzDate = [] (ui16 val, ui16 tz) {
+ TUnboxedValuePod tzDate {val};
+ tzDate.SetTimezoneId(tz);
+ return tzDate;
+ };
+
+ TVector<TUnboxedValuePod> dates{makeTzDate(1234, 1), makeTzDate(1234, 2), makeTzDate(45678, 333)};
+ for (auto date: dates) {
+ arrayBuilder->Add(date);
+ }
+
+ const auto datum = arrayBuilder->Build(true);
+ UNIT_ASSERT(datum.is_array());
+ UNIT_ASSERT_VALUES_EQUAL(datum.length(), dates.size());
+ const auto childData = datum.array()->child_data;
+ UNIT_ASSERT_VALUES_EQUAL_C(childData.size(), 2, "Expected date and timezone children");
+ }
+
Y_UNIT_TEST(TestResourceStringValueBuilderReader) {
TArrayBuilderTestData data;
const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName);
diff --git a/ydb/library/yql/public/udf/udf_data_type.h b/ydb/library/yql/public/udf/udf_data_type.h
index 7802f180fd..476c3c313e 100644
--- a/ydb/library/yql/public/udf/udf_data_type.h
+++ b/ydb/library/yql/public/udf/udf_data_type.h
@@ -13,6 +13,8 @@ namespace NUdf {
using TDataTypeId = ui16;
+using TTimezoneId = ui16;
+
enum EDataTypeFeatures : ui32 {
CanCompare = 1u << 0,
HasDeterministicCompare = 1u << 1,
diff --git a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp
index d587c3a6fc..311168f300 100644
--- a/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp
+++ b/ydb/library/yql/udfs/common/datetime2/datetime_udf.cpp
@@ -38,6 +38,24 @@ public:
static TResult DateCore(ui16 value) {
return value * ui32(86400) * TResult(ScaleAfterSeconds);
}
+
+ template<typename TTzDate>
+ static TResult TzBlockCore(TBlockItem tzDate);
+
+ template<>
+ static TResult TzBlockCore<TTzDate>(TBlockItem tzDate) {
+ return DateCore(tzDate.Get<ui16>());
+ }
+
+ template<>
+ static TResult TzBlockCore<TTzDatetime>(TBlockItem tzDate) {
+ return DatetimeCore(tzDate.Get<ui32>());
+ }
+
+ template<>
+ static TResult TzBlockCore<TTzTimestamp>(TBlockItem tzDate) {
+ return TimestampCore(tzDate.Get<ui64>());
+ }
static TResult DatetimeCore(ui32 value) {
return value * TResult(ScaleAfterSeconds);
@@ -55,6 +73,12 @@ public:
static auto name = TStringRef(TFuncName, std::strlen(TFuncName));
return name;
}
+
+ template<typename TTzDate, typename TOutput>
+ static auto MakeTzBlockExec() {
+ using TReader = TTzDateBlockReader<TTzDate, /*Nullable*/ false>;
+ return UnaryPreallocatedReaderExecImpl<TReader, TOutput, TzBlockCore<TTzDate>>;
+ }
static bool DeclareSignature(
const TStringRef& name,
@@ -135,8 +159,12 @@ public:
if (!typesOnly) {
if (typeId == TDataType<TDate>::Id || typeId == TDataType<TTzDate>::Id) {
if (block) {
+ const auto exec = (typeId == TDataType<TTzDate>::Id)
+ ? MakeTzBlockExec<TTzDate, TResult>()
+ : UnaryPreallocatedExecImpl<ui16, TResult, DateCore>;
+
builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(),
- UnaryPreallocatedExecImpl<ui16, TResult, DateCore>, builder, TString(name), arrow::compute::NullHandling::INTERSECTION));
+ exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION));
} else {
builder.Implementation(new TUnaryOverOptionalImpl<ui16, TResult, DateCore>());
}
@@ -144,8 +172,12 @@ public:
if (typeId == TDataType<TDatetime>::Id || typeId == TDataType<TTzDatetime>::Id) {
if (block) {
+ const auto exec = (typeId == TDataType<TTzDatetime>::Id)
+ ? MakeTzBlockExec<TTzDatetime, TResult>()
+ : UnaryPreallocatedExecImpl<ui32, TResult, DatetimeCore>;
+
builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(),
- UnaryPreallocatedExecImpl<ui32, TResult, DatetimeCore>, builder, TString(name), arrow::compute::NullHandling::INTERSECTION));
+ exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION));
} else {
builder.Implementation(new TUnaryOverOptionalImpl<ui32, TResult, DatetimeCore>());
}
@@ -153,8 +185,12 @@ public:
if (typeId == TDataType<TTimestamp>::Id || typeId == TDataType<TTzTimestamp>::Id) {
if (block) {
+ const auto exec = (typeId == TDataType<TTzTimestamp>::Id)
+ ? MakeTzBlockExec<TTzTimestamp, TResult>()
+ : UnaryPreallocatedExecImpl<ui64, TResult, TimestampCore>;
+
builder.Implementation(new TSimpleArrowUdfImpl(argBlockTypes, outputType, block.IsScalar(),
- UnaryPreallocatedExecImpl<ui64, TResult, TimestampCore>, builder, TString(name), arrow::compute::NullHandling::INTERSECTION));
+ exec, builder, TString(name), arrow::compute::NullHandling::INTERSECTION));
} else {
builder.Implementation(new TUnaryOverOptionalImpl<ui64, TResult, TimestampCore>());
}