diff options
author | aneporada <aneporada@ydb.tech> | 2023-01-11 11:53:01 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-01-11 11:53:01 +0300 |
commit | dae1c49d39a044bdfdd67ecd3c7347103625516e (patch) | |
tree | 8eaccb311fe7e9529521dbdc25fd2e41b2f94f06 | |
parent | b5af12df89ca027b41c4bc5ebd895cc4983e6620 (diff) | |
download | ydb-dae1c49d39a044bdfdd67ecd3c7347103625516e.tar.gz |
Introduce TBlockItem: lightweight not-owning replacement for TUnboxedValue for BlockReader/BlockBuilder API
7 files changed, 340 insertions, 131 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp index e3174be55b..a94d01c8b0 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp @@ -76,6 +76,13 @@ public: CurrLen++; } + void Add(TBlockItem value) final { + Y_VERIFY(CurrLen < MaxLen); + DoAdd(value); + CurrLen++; + } + + void AddDefault() { Y_VERIFY(CurrLen < MaxLen); DoAddDefault(); @@ -118,6 +125,7 @@ public: } protected: virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0; + virtual void DoAdd(TBlockItem value) = 0; virtual void DoAddDefault() = 0; virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0; virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0; @@ -200,6 +208,15 @@ public: void DoAdd(NUdf::TUnboxedValuePod value) final { if constexpr (Nullable) { if (!value) { + return DoAdd(TBlockItem{}); + } + } + DoAdd(TBlockItem(value.Get<T>())); + } + + void DoAdd(TBlockItem value) final { + if constexpr (Nullable) { + if (!value) { NullBuilder->UnsafeAppend(0); DataBuilder->UnsafeAppend(T{}); return; @@ -207,7 +224,7 @@ public: NullBuilder->UnsafeAppend(1); } - DataBuilder->UnsafeAppend(value.Get<T>()); + DataBuilder->UnsafeAppend(value.As<T>()); } void DoAddDefault() final { @@ -285,13 +302,23 @@ public: void DoAdd(NUdf::TUnboxedValuePod value) final { if constexpr (Nullable) { if (!value) { + return DoAdd(TBlockItem{}); + } + } + + DoAdd(TBlockItem(value.AsStringRef())); + } + + void DoAdd(TBlockItem value) final { + if constexpr (Nullable) { + if (!value) { NullBuilder->UnsafeAppend(0); AppendCurrentOffset(); return; } } - const TStringBuf str = value.AsStringRef(); + const std::string_view str = value.AsStringRef(); size_t currentLen = DataBuilder->Length(); // empty string can always be appended @@ -311,6 +338,7 @@ public: } } + void DoAddDefault() final { if constexpr (Nullable) { NullBuilder->UnsafeAppend(1); @@ -489,6 +517,25 @@ public: } } + void DoAdd(TBlockItem value) final { + TTupleType* tupleType = AS_TYPE(TTupleType, Type); + if constexpr (Nullable) { + if (!value) { + NullBuilder->UnsafeAppend(0); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + Children[i]->AddDefault(); + } + return; + } + NullBuilder->UnsafeAppend(1); + } + + auto elements = value.AsTuple(); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + Children[i]->Add(elements[i]); + } + } + void DoAddDefault() final { TTupleType* tupleType = AS_TYPE(TTupleType, Type); if constexpr (Nullable) { @@ -577,6 +624,17 @@ public: Inner->Add(value.GetOptionalValue()); } + void DoAdd(TBlockItem value) final { + if (!value) { + NullBuilder->UnsafeAppend(0); + Inner->AddDefault(); + return; + } + + NullBuilder->UnsafeAppend(1); + Inner->Add(value.GetOptionalValue()); + } + void DoAddDefault() final { NullBuilder->UnsafeAppend(1); Inner->AddDefault(); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h index e1024a8c80..4769c3d59f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h @@ -1,5 +1,7 @@ #pragma once +#include "mkql_block_item.h" + #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> @@ -27,6 +29,7 @@ public: virtual ~IBlockBuilder() = default; virtual size_t MaxLength() const = 0; virtual void Add(NUdf::TUnboxedValuePod value) = 0; + virtual void Add(TBlockItem value) = 0; virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0; virtual NUdf::TUnboxedValuePod Build(TComputationContext& ctx, bool finish) = 0; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h new file mode 100644 index 0000000000..84924977f0 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h @@ -0,0 +1,136 @@ +#pragma once + +#include <ydb/library/yql/public/udf/udf_data_type.h> +#include <ydb/library/yql/public/udf/udf_string_ref.h> +#include <ydb/library/yql/public/udf/udf_type_size_check.h> + +namespace NKikimr::NMiniKQL { + +class TBlockItem { + enum class EMarkers : ui8 { + Empty = 0, + Present = 1, + }; + +public: + TBlockItem() noexcept = default; + ~TBlockItem() noexcept = default; + + TBlockItem(const TBlockItem& value) noexcept = default; + TBlockItem(TBlockItem&& value) noexcept = default; + + TBlockItem& operator=(const TBlockItem& value) noexcept = default; + TBlockItem& operator=(TBlockItem&& value) noexcept = default; + + template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>> + inline explicit TBlockItem(T value); + + inline explicit TBlockItem(NYql::NUdf::TStringRef value) { + Raw.String.Value = value.Data(); + Raw.String.Size = value.Size(); + Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); + } + + inline explicit TBlockItem(const TBlockItem* tupleItems) { + Raw.Tuple.Value = tupleItems; + Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); + } + + template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>> + inline T As() const; + + inline const TBlockItem* AsTuple() const { + Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); + return Raw.Tuple.Value; + } + + inline NYql::NUdf::TStringRef AsStringRef() const { + Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); + return NYql::NUdf::TStringRef(Raw.String.Value, Raw.String.Size); + } + + inline TBlockItem MakeOptional() const + { + if (Raw.Simple.Meta) + return *this; + + TBlockItem result(*this); + ++result.Raw.Simple.Count; + return result; + } + + inline TBlockItem GetOptionalValue() const + { + if (Raw.Simple.Meta) + return *this; + + Y_VERIFY_DEBUG(Raw.Simple.Count > 0U, "Can't get value from empty."); + + TBlockItem result(*this); + --result.Raw.Simple.Count; + return result; + } + + inline explicit operator bool() const { return bool(Raw); } +private: + union TRaw { + ui64 Halfs[2] = {0, 0}; + struct { + union { + #define FIELD(type) type type##_; + PRIMITIVE_VALUE_TYPES(FIELD); + #undef FIELD + ui64 Count; + }; + union { + ui64 Pad; + struct { + ui8 Reserved[7]; + ui8 Meta; + }; + }; + } Simple; + + struct { + const char* Value; + ui32 Size; + } String; + + struct { + // client should know tuple size + const TBlockItem* Value; + } Tuple; + + EMarkers GetMarkers() const { + return static_cast<EMarkers>(Simple.Meta); + } + + explicit operator bool() const { return Simple.Meta | Simple.Count; } + } Raw; +}; + +UDF_ASSERT_TYPE_SIZE(TBlockItem, 16); + +#define VALUE_GET(xType) \ + template <> \ + inline xType TBlockItem::As<xType>() const \ + { \ + Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); \ + return Raw.Simple.xType##_; \ + } + +#define VALUE_CONSTR(xType) \ + template <> \ + inline TBlockItem::TBlockItem(xType value) \ + { \ + Raw.Simple.xType##_ = value; \ + Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); \ + } + +PRIMITIVE_VALUE_TYPES(VALUE_GET) +PRIMITIVE_VALUE_TYPES(VALUE_CONSTR) + +#undef VALUE_GET +#undef VALUE_CONSTR + +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp index 5c6e4585f0..914ec7e6ca 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp @@ -12,172 +12,163 @@ namespace NMiniKQL { namespace { -class TBlockReaderBase : public IBlockReader { -public: - using Ptr = std::unique_ptr<TBlockReaderBase>; - - void Reset(const arrow::Datum& datum) final { - Index = 0; - ArrayValues.clear(); - if (datum.is_scalar()) { - ScalarValue = GetScalar(*datum.scalar()); - } else { - Y_VERIFY(datum.is_arraylike()); - ScalarValue = {}; - for (auto& arr : datum.chunks()) { - ArrayValues.push_back(arr->data()); - } - } - } - - TMaybe<NUdf::TUnboxedValuePod> GetNextValue() final { - if (ScalarValue.Defined()) { - return ScalarValue; - } - - TMaybe<NUdf::TUnboxedValuePod> result; - while (!ArrayValues.empty()) { - if (Index < ArrayValues.front()->length) { - result = Get(*ArrayValues.front(), Index++); - break; - } - ArrayValues.pop_front(); - Index = 0; - } - return result; - } - - virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0; - virtual NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) = 0; - -private: - std::deque<std::shared_ptr<arrow::ArrayData>> ArrayValues; - TMaybe<NUdf::TUnboxedValuePod> ScalarValue; - size_t Index = 0; -}; +inline bool IsNull(const arrow::ArrayData& data, size_t index) { + return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset); +} template <typename T> -class TFixedSizeBlockReader : public TBlockReaderBase { +class TFixedSizeBlockReader : public IBlockReader { public: - NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { - if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + if (IsNull(data, index)) { return {}; } - return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]); + return TBlockItem(data.GetValues<T>(1)[index]); } - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if (!scalar.is_valid) { return {}; } - return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); + return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); + } + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + Y_UNUSED(holderFactory); + return item ? NUdf::TUnboxedValuePod(item.As<T>()) : NUdf::TUnboxedValuePod{}; } }; -class TStringBlockReader : public TBlockReaderBase { +template<typename TStringType> +class TStringBlockReader : public IBlockReader { public: - NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { + using TOffset = typename TStringType::offset_type; + + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { Y_VERIFY_DEBUG(data.buffers.size() == 3); - if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + if (IsNull(data, index)) { return {}; } - arrow::util::string_view result; - if (data.type->id() == arrow::Type::BINARY) { - arrow::BinaryArray arr(std::make_shared<arrow::ArrayData>(data)); - result = arr.GetView(index); - } else { - Y_VERIFY(data.type->id() == arrow::Type::STRING); - arrow::StringArray arr(std::make_shared<arrow::ArrayData>(data)); - result = arr.GetView(index); - } + const TOffset* offsets = data.GetValues<TOffset>(1); + const char* strData = data.GetValues<char>(2, 0); - return MakeString(NUdf::TStringRef(result.data(), result.size())); + std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]); + return TBlockItem(str); } - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if (!scalar.is_valid) { return {}; } auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value; - return MakeString(NUdf::TStringRef(reinterpret_cast<const char*>(buffer->data()), buffer->size())); + std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size()); + return TBlockItem(str); + } + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + Y_UNUSED(holderFactory); + if (!item) { + return {}; + } + return MakeString(item.AsStringRef()); } }; -class TTupleBlockReader : public TBlockReaderBase { +class TTupleBlockReader : public IBlockReader { public: - TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory) + TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children) : Children(std::move(children)) - , HolderFactory(holderFactory) + , Items(Children.size()) {} - NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { - if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + if (IsNull(data, index)) { return {}; } - NUdf::TUnboxedValue* items; - auto result = Cache.NewArray(HolderFactory, Children.size(), items); for (ui32 i = 0; i < Children.size(); ++i) { - items[i] = Children[i]->Get(*data.child_data[i], index); + Items[i] = Children[i]->GetItem(*data.child_data[i], index); } - return result; + return TBlockItem(Items.data()); } - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if (!scalar.is_valid) { return {}; } const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - NUdf::TUnboxedValue* items; - auto result = Cache.NewArray(HolderFactory, Children.size(), items); for (ui32 i = 0; i < Children.size(); ++i) { - items[i] = Children[i]->GetScalar(*structScalar.value[i]); + Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + } + + return TBlockItem(Items.data()); + } + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + if (!item) { + return {}; + } + + NUdf::TUnboxedValue* values; + auto result = Cache.NewArray(holderFactory, Children.size(), values); + const TBlockItem* childItems = item.AsTuple(); + for (ui32 i = 0; i < Children.size(); ++i) { + values[i] = Children[i]->MakeValue(childItems[i], holderFactory); } return result; } private: - TVector<std::unique_ptr<TBlockReaderBase>> Children; - const THolderFactory& HolderFactory; - TPlainContainerCache Cache; + const TVector<std::unique_ptr<IBlockReader>> Children; + TVector<TBlockItem> Items; + mutable TPlainContainerCache Cache; }; -class TExternalOptionalBlockReader : public TBlockReaderBase { +class TExternalOptionalBlockReader : public IBlockReader { public: - TExternalOptionalBlockReader(std::unique_ptr<TBlockReaderBase>&& inner) + TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner) : Inner(std::move(inner)) {} - NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { - if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { + if (IsNull(data, index)) { return {}; } - return Inner->Get(*data.child_data[0], index).MakeOptional(); + return Inner->GetItem(*data.child_data[0], index).MakeOptional(); } - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { if (!scalar.is_valid) { return {}; } const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - return Inner->GetScalar(*structScalar.value[0]).MakeOptional(); + return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional(); + } + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + if (!item) { + return {}; + } + return Inner->MakeValue(item.GetOptionalValue(), holderFactory).MakeOptional(); } private: - std::unique_ptr<TBlockReaderBase> Inner; + const std::unique_ptr<IBlockReader> Inner; }; -std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolderFactory& holderFactory) { +} // namespace + +std::unique_ptr<IBlockReader> MakeBlockReader(TType* type) { TType* unpacked = type; if (type->IsOptional()) { unpacked = AS_TYPE(TOptionalType, type)->GetItemType(); @@ -194,7 +185,7 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder currentType = AS_TYPE(TOptionalType, currentType)->GetItemType(); } while (currentType->IsOptional()); - std::unique_ptr<TBlockReaderBase> reader = MakeBlockReaderBase(previousType, holderFactory); + std::unique_ptr<IBlockReader> reader = MakeBlockReader(previousType); for (ui32 i = 1; i < nestLevel; ++i) { reader = std::make_unique<TExternalOptionalBlockReader>(std::move(reader)); } @@ -206,12 +197,12 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder if (type->IsTuple()) { auto tupleType = AS_TYPE(TTupleType, type); - TVector<std::unique_ptr<TBlockReaderBase>> children; + TVector<std::unique_ptr<IBlockReader>> children; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - children.emplace_back(MakeBlockReaderBase(tupleType->GetElementType(i), holderFactory)); + children.emplace_back(MakeBlockReader(tupleType->GetElementType(i))); } - return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory); + return std::make_unique<TTupleBlockReader>(std::move(children)); } if (type->IsData()) { @@ -243,8 +234,9 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder case NUdf::EDataSlot::Double: return std::make_unique<TFixedSizeBlockReader<double>>(); case NUdf::EDataSlot::String: + return std::make_unique<TStringBlockReader<arrow::BinaryType>>(); case NUdf::EDataSlot::Utf8: - return std::make_unique<TStringBlockReader>(); + return std::make_unique<TStringBlockReader<arrow::StringType>>(); default: MKQL_ENSURE(false, "Unsupported data slot"); } @@ -253,13 +245,5 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder MKQL_ENSURE(false, "Unsupported type"); } - -} // namespace - -std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory) { - return MakeBlockReaderBase(type, holderFactory); -} - - } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h index f4eb297770..4e144d78a5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h @@ -1,22 +1,24 @@ #pragma once +#include "mkql_block_item.h" + #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node.h> #include <arrow/datum.h> -namespace NKikimr { -namespace NMiniKQL { +namespace NKikimr::NMiniKQL { -class IBlockReader { +class IBlockReader : private TNonCopyable { public: virtual ~IBlockReader() = default; - virtual void Reset(const arrow::Datum& datum) = 0; - // for scalars will continuously return same value - virtual TMaybe<NUdf::TUnboxedValuePod> GetNextValue() = 0; + // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem + virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0; + virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0; + + virtual NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const = 0; }; -std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory); +std::unique_ptr<IBlockReader> MakeBlockReader(TType* type); } -} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 488dd2af04..789cb67df2 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -169,7 +169,7 @@ public: auto& state = GetState(ctx); for (;;) { - auto item = state.GetValue(); + auto item = state.GetValue(ctx); if (item) { return *item; } @@ -190,23 +190,43 @@ private: struct TState : public TComputationValue<TState> { using TComputationValue::TComputationValue; - TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx) + TState(TMemoryUsageInfo* memInfo, TType* itemType) : TComputationValue(memInfo) - , Reader_(MakeBlockReader(itemType, ctx.HolderFactory)) + , Reader_(MakeBlockReader(itemType)) { } - TMaybe<NUdf::TUnboxedValuePod> GetValue() const { - return Reader_->GetNextValue(); + TMaybe<NUdf::TUnboxedValuePod> GetValue(TComputationContext& ctx) { + for (;;) { + if (Arrays_.empty()) { + return {}; + } + if (Index_ < Arrays_.front()->length) { + break; + } + Index_ = 0; + Arrays_.pop_front(); + } + return Reader_->MakeValue(Reader_->GetItem(*Arrays_.front(), Index_++), ctx.HolderFactory); } - void Reset(const arrow::Datum& datum) const { + void Reset(const arrow::Datum& datum) { MKQL_ENSURE(datum.is_arraylike(), "Expecting array as FromBlocks argument"); - Reader_->Reset(datum); + MKQL_ENSURE(Arrays_.empty(), "Not all input is processed"); + if (datum.is_array()) { + Arrays_.push_back(datum.array()); + } else { + for (auto& chunk : datum.chunks()) { + Arrays_.push_back(chunk->data()); + } + } + Index_ = 0; } private: const std::unique_ptr<IBlockReader> Reader_; + TDeque<std::shared_ptr<arrow::ArrayData>> Arrays_; + size_t Index_ = 0; }; private: @@ -217,7 +237,7 @@ private: TState& GetState(TComputationContext& ctx) const { auto& result = ctx.MutableValues[StateIndex_]; if (!result.HasValue()) { - result = ctx.HolderFactory.Create<TState>(ItemType_, ctx); + result = ctx.HolderFactory.Create<TState>(ItemType_); } return *static_cast<TState*>(result.AsBoxed().Get()); } @@ -252,11 +272,6 @@ public: } s.Index_ = 0; - for (size_t i = 0; i < Width_; ++i) { - const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum(); - s.Readers_[i]->Reset(datum); - } - s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; } @@ -265,9 +280,16 @@ public: continue; } - auto result = s.Readers_[i]->GetNextValue(); - Y_VERIFY(result); - *(output[i]) = *result; + const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum(); + TBlockItem item; + if (datum.is_scalar()) { + item = s.Readers_[i]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + item = s.Readers_[i]->GetItem(*datum.array(), s.Index_); + } + + *(output[i]) = s.Readers_[i]->MakeValue(item, ctx.HolderFactory); } ++s.Index_; @@ -282,7 +304,7 @@ private: size_t Count_ = 0; size_t Index_ = 0; - TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types, TComputationContext& ctx) + TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types) : TComputationValue(memInfo) , Values_(types.size() + 1) , ValuePointers_(types.size() + 1) @@ -292,7 +314,7 @@ private: } for (size_t i = 0; i < types.size(); ++i) { - Readers_.push_back(MakeBlockReader(types[i], ctx.HolderFactory)); + Readers_.push_back(MakeBlockReader(types[i])); } } }; @@ -304,7 +326,7 @@ private: TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Types_, ctx); + state = ctx.HolderFactory.Create<TState>(Types_); } return *static_cast<TState*>(state.AsBoxed().Get()); } diff --git a/ydb/library/yql/public/udf/udf_value_inl.h b/ydb/library/yql/public/udf/udf_value_inl.h index bbe323cbd9..2bcd3a98bc 100644 --- a/ydb/library/yql/public/udf/udf_value_inl.h +++ b/ydb/library/yql/public/udf/udf_value_inl.h @@ -675,6 +675,10 @@ PRIMITIVE_VALUE_TYPES(VALUE_GET) PRIMITIVE_VALUE_TYPES(VALUE_GET_DEF) PRIMITIVE_VALUE_TYPES(VALUE_CONSTR) +#undef VALUE_GET +#undef VALUE_GET_DEF +#undef VALUE_CONSTR + template <> inline bool TUnboxedValuePod::Get<bool>() const { |