diff options
author | vvvv <vvvv@ydb.tech> | 2022-12-13 20:15:20 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-12-13 20:15:20 +0300 |
commit | 1a43879c5f8c64404638e6a48714106f427bdc71 (patch) | |
tree | a2e78b49414813a18c7f24a4ce014abd79d3714f | |
parent | 9fbdf83be5486c7553b2c6cc11162b68a3d01834 (diff) | |
download | ydb-1a43879c5f8c64404638e6a48714106f427bdc71.tar.gz |
initial support of float/double/tuple
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp | 643 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_type_builder.cpp | 23 |
2 files changed, 435 insertions, 231 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index a24f4a2a0b..96d82c583f 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -21,10 +21,10 @@ constexpr size_t MaxBlockSizeInBytes = 1_MB; class TBlockBuilderBase { public: - TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) + TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength) : Ctx_(ctx) , ItemType_(itemType) - , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType_)) + , MaxLength_(maxLength) {} virtual ~TBlockBuilderBase() = default; @@ -33,15 +33,9 @@ public: return MaxLength_; } - virtual void Add(NUdf::TUnboxedValue& value) = 0; + virtual void Add(const NUdf::TUnboxedValue& value) = 0; virtual NUdf::TUnboxedValuePod Build(bool finish) = 0; -private: - static int64_t TypeSize(arrow::DataType& itemType) { - const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width(); - return arrow::BitUtil::BytesForBits(bits); - } - protected: TComputationContext& Ctx_; const std::shared_ptr<arrow::DataType> ItemType_; @@ -52,13 +46,13 @@ template <typename T, typename TBuilder> class TFixedSizeBlockBuilder : public TBlockBuilderBase { public: TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) - : TBlockBuilderBase(ctx, itemType) + : TBlockBuilderBase(ctx, itemType, MaxBlockSizeInBytes / TypeSize(*itemType)) , Builder_(std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool)) { this->Reserve(); } - void Add(NUdf::TUnboxedValue& value) override { + void Add(const NUdf::TUnboxedValue& value) override { Y_VERIFY_DEBUG(Builder_->length() < MaxLength_); if (value) { this->Builder_->UnsafeAppend(value.Get<T>()); @@ -84,83 +78,192 @@ private: ARROW_OK(this->Builder_->Reserve(MaxLength_)); } + static int64_t TypeSize(arrow::DataType& itemType) { + const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width(); + return arrow::BitUtil::BytesForBits(bits); + } + private: std::unique_ptr<TBuilder> Builder_; }; -std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, NUdf::EDataSlot slot) { - switch (slot) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8()); - case NUdf::EDataSlot::Uint8: - case NUdf::EDataSlot::Bool: - return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8()); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16()); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16()); - case NUdf::EDataSlot::Int32: - return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32()); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32()); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64()); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64()); - default: - MKQL_ENSURE(false, "Unsupported data slot"); - } -} - -class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> { +class TTupleBlockBuilder : public TBlockBuilderBase { public: - explicit TToBlocksWrapper(IComputationNode* flow, NUdf::EDataSlot slot) - : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed) - , Flow_(flow) - , Slot_(slot) + TTupleBlockBuilder(TComputationContext & ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength, + TTupleType* tupleType, TVector<std::unique_ptr<TBlockBuilderBase>>&& children) + : TBlockBuilderBase(ctx, itemType, maxLength) + , TupleType_(tupleType) + , Children_(std::move(children)) { + bool isOptional; + MKQL_ENSURE(ConvertArrowType(TupleType_, isOptional, ArrowType_), "Unsupported type"); + Reserve(); } - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto builder = MakeBlockBuilder(ctx, Slot_); + void Add(const NUdf::TUnboxedValue& value) final { + if (!value) { + NullBitmapBuilder_->UnsafeAppend(false); + for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { + Children_[i]->Add({}); + } - for (size_t i = 0; i < builder->MaxLength(); ++i) { - auto result = Flow_->GetValue(ctx); - if (result.IsFinish() || result.IsYield()) { - if (i == 0) { - return result.Release(); - } - break; + return; + } + + NullBitmapBuilder_->UnsafeAppend(true); + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { + Children_[i]->Add(elements[i]); + } + } else { + for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { + Children_[i]->Add(value.GetElement(i)); } - builder->Add(result); + } + } + + NUdf::TUnboxedValuePod Build(bool finish) final { + std::vector<arrow::Datum> childrenValues; + childrenValues.reserve(TupleType_->GetElementsCount()); + for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) { + childrenValues.emplace_back(TArrowBlock::From(Children_[i]->Build(finish)).GetDatum()); + } + + std::shared_ptr<arrow::Buffer> nullBitmap; + auto length = NullBitmapBuilder_->length(); + auto nullCount = NullBitmapBuilder_->false_count(); + ARROW_OK(NullBitmapBuilder_->Finish(&nullBitmap)); + auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0); + for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
+ arrayData->child_data.push_back(childrenValues[i].array());
+ }
+ + arrow::Datum result(arrayData); + + NullBitmapBuilder_ = nullptr; + if (!finish) { + Reserve(); } - return builder->Build(true); + return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result)); } private: - void RegisterDependencies() const final { - FlowDependsOn(Flow_); + void Reserve() { + NullBitmapBuilder_ = std::make_unique<arrow::TypedBufferBuilder<bool>>(&Ctx_.ArrowMemoryPool); + ARROW_OK(NullBitmapBuilder_->Reserve(MaxLength_)); } private: - IComputationNode* const Flow_; - NUdf::EDataSlot Slot_; + TTupleType* TupleType_; + std::shared_ptr<arrow::DataType> ArrowType_; + TVector<std::unique_ptr<TBlockBuilderBase>> Children_; + std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder_; +}; + +std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, TType* type) { + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (type->IsTuple()) { + bool isOptional; + std::shared_ptr<arrow::DataType> arrowType; + MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type"); + + auto tupleType = AS_TYPE(TTupleType, type); + TVector<std::unique_ptr<TBlockBuilderBase>> children; + size_t maxLength = MaxBlockSizeInBytes; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + children.emplace_back(MakeBlockBuilder(ctx, tupleType->GetElementType(i))); + maxLength = Min(maxLength, children.back()->MaxLength()); + } + + return std::make_unique<TTupleBlockBuilder>(ctx, arrowType, maxLength, tupleType, std::move(children)); + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8()); + case NUdf::EDataSlot::Uint8: + case NUdf::EDataSlot::Bool: + return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8()); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16()); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16()); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32()); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32()); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64()); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64()); + case NUdf::EDataSlot::Float: + return std::make_unique<TFixedSizeBlockBuilder<float, arrow::FloatBuilder>>(ctx, arrow::float32()); + case NUdf::EDataSlot::Double: + return std::make_unique<TFixedSizeBlockBuilder<double, arrow::DoubleBuilder>>(ctx, arrow::float64()); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } + } + + MKQL_ENSURE(false, "Unsupported type"); +} + +class TToBlocksWrapper : public TStatelessFlowComputationNode<TToBlocksWrapper> {
+public:
+ explicit TToBlocksWrapper(IComputationNode* flow, TType* itemType)
+ : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
+ , Flow_(flow)
+ , ItemType_(itemType)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto builder = MakeBlockBuilder(ctx, ItemType_);
+
+ for (size_t i = 0; i < builder->MaxLength(); ++i) {
+ auto result = Flow_->GetValue(ctx);
+ if (result.IsFinish() || result.IsYield()) {
+ if (i == 0) {
+ return result.Release();
+ }
+ break;
+ }
+ builder->Add(result);
+ }
+
+ return builder->Build(true);
+ }
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+private:
+ IComputationNode* const Flow_;
+ TType* ItemType_;
}; class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> { public: TWideToBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - TVector<NUdf::EDataSlot>&& slots) + TVector<TType*>&& types) : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) , Flow_(flow) - , Slots_(std::move(slots)) - , Width_(Slots_.size()) + , Types_(std::move(types)) + , Width_(Types_.size()) { } @@ -215,18 +318,18 @@ private: size_t Rows_ = 0; bool IsFinished_ = false; - TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<NUdf::EDataSlot>& slots) + TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types) : TComputationValue(memInfo) - , Values_(slots.size()) - , ValuePointers_(slots.size()) + , Values_(types.size()) + , ValuePointers_(types.size()) { - for (size_t i = 0; i < slots.size(); ++i) { + for (size_t i = 0; i < types.size(); ++i) { ValuePointers_[i] = &Values_[i]; - Builders_.push_back(MakeBlockBuilder(ctx, slots[i])); + Builders_.push_back(MakeBlockBuilder(ctx, types[i])); } MaxLength_ = MaxBlockSizeInBytes; - for (size_t i = 0; i < slots.size(); ++i) { + for (size_t i = 0; i < types.size(); ++i) { MaxLength_ = Min(MaxLength_, Builders_[i]->MaxLength()); } } @@ -239,14 +342,14 @@ private: TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(ctx, Slots_); + state = ctx.HolderFactory.Create<TState>(ctx, Types_); } return *static_cast<TState*>(state.AsBoxed().Get()); } private: IComputationWideFlowNode* Flow_; - const TVector<NUdf::EDataSlot> Slots_; + const TVector<TType*> Types_; const size_t Width_; }; @@ -263,144 +366,207 @@ template <typename T> class TFixedSizeBlockReader : public TBlockReaderBase { public: NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { + if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + return {}; + } + return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]); } NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } + return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data())); } }; -class TBoolBlockReader : public TBlockReaderBase { +class TTupleBlockReader : public TBlockReaderBase { public: + TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory) + : Children_(std::move(children)) + , HolderFactory_(holderFactory) + {} + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final { - return NUdf::TUnboxedValuePod(arrow::BitUtil::GetBit(data.GetValues<uint8_t>(1, 0), index + data.offset)); - } + if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) { + return {}; + } - NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { - return NUdf::TUnboxedValuePod(arrow::internal::checked_cast<const arrow::BooleanScalar&>(scalar).value); - } -}; + NUdf::TUnboxedValue* items; + auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items); + for (ui32 i = 0; i < Children_.size(); ++i) { + items[i] = Children_[i]->Get(*data.child_data[i], index); + } -std::unique_ptr<TBlockReaderBase> MakeBlockReader(NUdf::EDataSlot slot) { - switch (slot) { - case NUdf::EDataSlot::Int8: - return std::make_unique<TFixedSizeBlockReader<i8>>(); - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Uint8: - return std::make_unique<TFixedSizeBlockReader<ui8>>(); - case NUdf::EDataSlot::Int16: - return std::make_unique<TFixedSizeBlockReader<i16>>(); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return std::make_unique<TFixedSizeBlockReader<ui16>>(); - case NUdf::EDataSlot::Int32: - return std::make_unique<TFixedSizeBlockReader<i32>>(); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return std::make_unique<TFixedSizeBlockReader<ui32>>(); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - return std::make_unique<TFixedSizeBlockReader<i64>>(); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return std::make_unique<TFixedSizeBlockReader<ui64>>(); - default: - MKQL_ENSURE(false, "Unsupported data slot"); + return result; } -} -class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> { -public: - TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, NUdf::EDataSlot slot) - : TMutableComputationNode(mutables) - , Flow_(flow) - , Slot_(slot) - , StateIndex_(mutables.CurValueIndex++) - { - } + NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final { + if (!scalar.is_valid) { + return {}; + } - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto& state = GetState(ctx); + const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar); - if (state.Array_ == nullptr || state.Index_ == state.Array_->length) { - auto result = Flow_->GetValue(ctx); - if (result.IsFinish()) { - return NUdf::TUnboxedValue::MakeFinish(); - } - if (result.IsYield()) { - return NUdf::TUnboxedValue::MakeYield(); - } - state.Array_ = TArrowBlock::From(result).GetDatum().array(); - state.Index_ = 0; + NUdf::TUnboxedValue* items; + auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items); + for (ui32 i = 0; i < Children_.size(); ++i) { + items[i] = Children_[i]->GetScalar(*structScalar.value[i]); } - const auto result = state.GetValue(); - ++state.Index_; return result; } private: - struct TState : public TComputationValue<TState> { - using TComputationValue::TComputationValue; - - TState(TMemoryUsageInfo* memInfo, NUdf::EDataSlot slot) - : TComputationValue(memInfo) - { - Reader_ = MakeBlockReader(slot); - } - - NUdf::TUnboxedValuePod GetValue() const { - const auto nullCount = Array_->GetNullCount(); + TVector<std::unique_ptr<TBlockReaderBase>> Children_; + const THolderFactory& HolderFactory_; + TPlainContainerCache Cache_; +}; - return nullCount == Array_->length || (nullCount > 0 && !HasValue()) - ? NUdf::TUnboxedValuePod() - : DoGetValue(); - } +std::unique_ptr<TBlockReaderBase> MakeBlockReader(TType* type, const THolderFactory& holderFactory) { + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } - private: - NUdf::TUnboxedValuePod DoGetValue() const { - return Reader_->Get(*Array_, Index_); + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + TVector<std::unique_ptr<TBlockReaderBase>> children; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + children.emplace_back(MakeBlockReader(tupleType->GetElementType(i), holderFactory)); } - bool HasValue() const { - return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0, 0), Index_ + Array_->offset); + return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory); + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeBlockReader<i8>>(); + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + return std::make_unique<TFixedSizeBlockReader<ui8>>(); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeBlockReader<i16>>(); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return std::make_unique<TFixedSizeBlockReader<ui16>>(); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeBlockReader<i32>>(); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return std::make_unique<TFixedSizeBlockReader<ui32>>(); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + return std::make_unique<TFixedSizeBlockReader<i64>>(); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return std::make_unique<TFixedSizeBlockReader<ui64>>(); + case NUdf::EDataSlot::Float: + return std::make_unique<TFixedSizeBlockReader<float>>(); + case NUdf::EDataSlot::Double: + return std::make_unique<TFixedSizeBlockReader<double>>(); + default: + MKQL_ENSURE(false, "Unsupported data slot"); } - - std::unique_ptr<TBlockReaderBase> Reader_; - public: - std::shared_ptr<arrow::ArrayData> Array_{nullptr}; - size_t Index_{0}; - }; - -private: - void RegisterDependencies() const final { - this->DependsOn(Flow_); } - TState& GetState(TComputationContext& ctx) const { - auto& result = ctx.MutableValues[StateIndex_]; - if (!result.HasValue()) { - result = ctx.HolderFactory.Create<TState>(Slot_); - } - return *static_cast<TState*>(result.AsBoxed().Get()); - } + MKQL_ENSURE(false, "Unsupported type"); +} -private: - IComputationNode* const Flow_; - const NUdf::EDataSlot Slot_; - const ui32 StateIndex_; +class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
+public:
+ TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, TType* itemType)
+ : TMutableComputationNode(mutables)
+ , Flow_(flow)
+ , ItemType_(itemType)
+ , StateIndex_(mutables.CurValueIndex++)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto& state = GetState(ctx);
+
+ if (state.Array_ == nullptr || state.Index_ == state.Array_->length) {
+ auto result = Flow_->GetValue(ctx);
+ if (result.IsFinish()) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
+ if (result.IsYield()) {
+ return NUdf::TUnboxedValue::MakeYield();
+ }
+ state.Array_ = TArrowBlock::From(result).GetDatum().array();
+ state.Index_ = 0;
+ }
+
+ const auto result = state.GetValue();
+ ++state.Index_;
+ return result;
+ }
+
+private:
+ struct TState : public TComputationValue<TState> {
+ using TComputationValue::TComputationValue;
+
+ TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx)
+ : TComputationValue(memInfo)
+ {
+ Reader_ = MakeBlockReader(itemType, ctx.HolderFactory);
+ }
+
+ NUdf::TUnboxedValuePod GetValue() const {
+ const auto nullCount = Array_->GetNullCount();
+
+ return nullCount == Array_->length || (nullCount > 0 && !HasValue())
+ ? NUdf::TUnboxedValuePod()
+ : DoGetValue();
+ }
+
+ private:
+ NUdf::TUnboxedValuePod DoGetValue() const {
+ return Reader_->Get(*Array_, Index_);
+ }
+
+ bool HasValue() const {
+ return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0, 0), Index_ + Array_->offset);
+ }
+
+ std::unique_ptr<TBlockReaderBase> Reader_;
+ public:
+ std::shared_ptr<arrow::ArrayData> Array_{ nullptr };
+ size_t Index_{ 0 };
+ };
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(Flow_);
+ }
+
+ TState& GetState(TComputationContext& ctx) const {
+ auto& result = ctx.MutableValues[StateIndex_];
+ if (!result.HasValue()) {
+ result = ctx.HolderFactory.Create<TState>(ItemType_, ctx);
+ }
+ return *static_cast<TState*>(result.AsBoxed().Get());
+ }
+
+private:
+ IComputationNode* const Flow_;
+ TType* ItemType_;
+ const ui32 StateIndex_;
}; class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> { public: TWideFromBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - TVector<NUdf::EDataSlot>&& slots) + TVector<TType*>&& types) : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) , Flow_(flow) - , Slots_(std::move(slots)) - , Width_(Slots_.size()) + , Types_(std::move(types)) + , Width_(Types_.size()) { } @@ -470,19 +636,19 @@ private: size_t Count_ = 0; size_t Index_ = 0; - TState(TMemoryUsageInfo* memInfo, const TVector<NUdf::EDataSlot>& slots) + TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types, TComputationContext& ctx) : TComputationValue(memInfo) - , Values_(slots.size() + 1) - , ValuePointers_(slots.size() + 1) - , Arrays_(slots.size()) - , Scalars_(slots.size()) + , Values_(types.size() + 1) + , ValuePointers_(types.size() + 1) + , Arrays_(types.size()) + , Scalars_(types.size()) { - for (size_t i = 0; i < slots.size() + 1; ++i) { + for (size_t i = 0; i < types.size() + 1; ++i) { ValuePointers_[i] = &Values_[i]; } - for (size_t i = 0; i < slots.size(); ++i) { - Readers_.push_back(MakeBlockReader(slots[i])); + for (size_t i = 0; i < types.size(); ++i) { + Readers_.push_back(MakeBlockReader(types[i], ctx.HolderFactory)); } } }; @@ -494,14 +660,14 @@ private: TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Slots_); + state = ctx.HolderFactory.Create<TState>(Types_, ctx); } return *static_cast<TState*>(state.AsBoxed().Get()); } private: IComputationWideFlowNode* Flow_; - const TVector<NUdf::EDataSlot> Slots_; + const TVector<TType*> Types_; const size_t Width_; }; @@ -510,94 +676,113 @@ public: TAsScalarWrapper(TComputationMutables& mutables, IComputationNode* arg, TType* type) : TMutableComputationNode(mutables) , Arg_(arg) + , Type_(type) { bool isOptional; - auto unpacked = UnpackOptionalData(type, isOptional); - MKQL_ENSURE(ConvertArrowType(unpacked, isOptional, Type_), "Unsupported type of scalar"); - Slot_ = *unpacked->GetDataSlot(); + std::shared_ptr<arrow::DataType> arrowType; + MKQL_ENSURE(ConvertArrowType(Type_, isOptional, arrowType), "Unsupported type of scalar"); } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto value = Arg_->GetValue(ctx); - arrow::Datum result; + arrow::Datum result = ConvertScalar(Type_, value); + return ctx.HolderFactory.CreateArrowBlock(std::move(result)); + } + +private: + void RegisterDependencies() const final { + DependsOn(Arg_); + } + + arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value) const { if (!value) { - result = arrow::MakeNullScalar(Type_); - } else { - switch (Slot_) { + bool isOptional; + std::shared_ptr<arrow::DataType> arrowType; + MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar"); + return arrow::MakeNullScalar(arrowType); + } + + if (type->IsOptional()) { + type = AS_TYPE(TOptionalType, type)->GetItemType(); + } + + if (type->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, type); + bool isOptional; + std::shared_ptr<arrow::DataType> arrowType; + MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar"); + + std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i)).scalar()); + } + + return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType)); + } + + if (type->IsData()) { + auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); + switch (slot) { case NUdf::EDataSlot::Int8: - result = arrow::Datum(static_cast<int8_t>(value.Get<i8>())); - break; + return arrow::Datum(static_cast<int8_t>(value.Get<i8>())); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: - result = arrow::Datum(static_cast<uint8_t>(value.Get<ui8>())); - break; + return arrow::Datum(static_cast<uint8_t>(value.Get<ui8>())); case NUdf::EDataSlot::Int16: - result = arrow::Datum(static_cast<int16_t>(value.Get<i16>())); - break; + return arrow::Datum(static_cast<int16_t>(value.Get<i16>())); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: - result = arrow::Datum(static_cast<uint16_t>(value.Get<ui16>())); - break; + return arrow::Datum(static_cast<uint16_t>(value.Get<ui16>())); case NUdf::EDataSlot::Int32: - result = arrow::Datum(static_cast<int32_t>(value.Get<i32>())); - break; + return arrow::Datum(static_cast<int32_t>(value.Get<i32>())); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: - result = arrow::Datum(static_cast<uint32_t>(value.Get<ui32>())); - break; + return arrow::Datum(static_cast<uint32_t>(value.Get<ui32>())); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: - result = arrow::Datum(static_cast<int64_t>(value.Get<i64>())); - break; + return arrow::Datum(static_cast<int64_t>(value.Get<i64>())); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: - result = arrow::Datum(static_cast<uint64_t>(value.Get<ui64>())); - break; + return arrow::Datum(static_cast<uint64_t>(value.Get<ui64>())); + case NUdf::EDataSlot::Float: + return arrow::Datum(static_cast<float>(value.Get<float>())); + case NUdf::EDataSlot::Double: + return arrow::Datum(static_cast<double>(value.Get<double>())); default: MKQL_ENSURE(false, "Unsupported data slot"); } } - return ctx.HolderFactory.CreateArrowBlock(std::move(result)); - } - -private: - void RegisterDependencies() const final { - DependsOn(Arg_); + MKQL_ENSURE(false, "Unsupported type"); } private: IComputationNode* const Arg_; - std::shared_ptr<arrow::DataType> Type_; - NUdf::EDataSlot Slot_; + TType* Type_; }; } -IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - bool isOptional; - const auto slot = *UnpackOptionalData(flowType->GetItemType(), isOptional)->GetDataSlot(); - return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), slot); -} +IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), flowType->GetItemType());
+}
IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - TVector<NUdf::EDataSlot> slots; + TVector<TType*> items; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - bool isOptional; - const auto slot = *UnpackOptionalData(tupleType->GetElementType(i), isOptional)->GetDataSlot(); - slots.push_back(slot); + items.push_back(tupleType->GetElementType(i)); } auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots)); + return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(items)); } IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { @@ -605,9 +790,7 @@ IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFact const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); const auto blockType = AS_TYPE(TBlockType, flowType->GetItemType()); - bool isOptional; - const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot(); - return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), slot); + return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), blockType->GetItemType()); } IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { @@ -616,18 +799,16 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); - TVector<NUdf::EDataSlot> slots; + TVector<TType*> items; for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) { const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i)); - bool isOptional; - const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot(); - slots.push_back(slot); + items.push_back(blockType->GetItemType()); } auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots)); + return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(items)); } IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) { diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index 379b59148e..b04bc43ec3 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1344,6 +1344,12 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty case NUdf::EDataSlot::Timestamp: type = arrow::uint64(); return true; + case NUdf::EDataSlot::Float: + type = arrow::float32(); + return true; + case NUdf::EDataSlot::Double: + type = arrow::float64(); + return true; default: return false; } @@ -1351,6 +1357,23 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) { auto unpacked = UnpackOptional(itemType, isOptional); + if (unpacked->IsTuple()) { + auto tupleType = AS_TYPE(TTupleType, unpacked); + std::vector<std::shared_ptr<arrow::Field>> fields; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + std::shared_ptr<arrow::DataType> childType; + bool isChildOptional; + if (!ConvertArrowType(tupleType->GetElementType(i), isChildOptional, childType)) { + return false; + } + + fields.emplace_back(std::make_shared<arrow::Field>("field" + ToString(i), childType, isChildOptional)); + } + + type = std::make_shared<arrow::StructType>(fields); + return true; + } + if (!unpacked->IsData()) { return false; } |