diff options
author | vvvv <vvvv@ydb.tech> | 2022-09-20 12:29:32 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-09-20 12:29:32 +0300 |
commit | 1c0cc1a8107aa2123b564ac3832a8cdcc644d8d3 (patch) | |
tree | 969e204943b516534d14100b27cdf2ad0141cf06 | |
parent | aac6159f005fb36cabe1d93ccc45d9fab6100c8f (diff) | |
download | ydb-1c0cc1a8107aa2123b564ac3832a8cdcc644d8d3.tar.gz |
support of fixed size types: Bool, [U]Int{8,16,32,64}.
4 files changed, 354 insertions, 145 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 64f027f69a..0525349a78 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4305,6 +4305,7 @@ struct TBlockRules { static constexpr std::initializer_list<TBlockFuncMap::value_type> FuncsInit = { {"+", "add" }, + {"Not", "invert" }, }; TBlockRules() diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.cpp b/ydb/library/yql/minikql/arrow/mkql_functions.cpp index 2f4133405f..73b82e7cd9 100644 --- a/ydb/library/yql/minikql/arrow/mkql_functions.cpp +++ b/ydb/library/yql/minikql/arrow/mkql_functions.cpp @@ -9,10 +9,8 @@ namespace NKikimr::NMiniKQL { -bool ConvertInputArrowType(TType* type, bool& isOptional, arrow::ValueDescr& descr) { - auto blockType = AS_TYPE(TBlockType, type); - descr.shape = blockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY; - auto unpacked = UnpackOptional(blockType->GetItemType(), isOptional); +bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) { + auto unpacked = UnpackOptional(itemType, isOptional); if (!unpacked->IsData()) { return false; } @@ -24,16 +22,43 @@ bool ConvertInputArrowType(TType* type, bool& isOptional, arrow::ValueDescr& des switch (*slot) { case NUdf::EDataSlot::Bool: - descr.type = arrow::boolean(); + type = arrow::boolean(); + return true; + case NUdf::EDataSlot::Uint8: + type = arrow::uint8(); + return true; + case NUdf::EDataSlot::Int8: + type = arrow::int8(); + return true; + case NUdf::EDataSlot::Uint16: + type = arrow::uint16(); + return true; + case NUdf::EDataSlot::Int16: + type = arrow::int16(); + return true; + case NUdf::EDataSlot::Uint32: + type = arrow::uint32(); + return true; + case NUdf::EDataSlot::Int32: + type = arrow::int32(); + return true; + case NUdf::EDataSlot::Int64: + type = arrow::int64(); return true; case NUdf::EDataSlot::Uint64: - descr.type = arrow::uint64(); + type = arrow::uint64(); return true; default: return false; } } +bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr) { + auto asBlockType = AS_TYPE(TBlockType, blockType); + descr.shape = asBlockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY; + return ConvertArrowType(asBlockType->GetItemType(), isOptional, descr.type); +} + class TOutputTypeVisitor : public arrow::TypeVisitor { public: @@ -46,6 +71,41 @@ public: return arrow::Status::OK(); } + arrow::Status Visit(const arrow::Int8Type&) { + SetDataType(NUdf::EDataSlot::Int8); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::UInt8Type&) { + SetDataType(NUdf::EDataSlot::Uint8); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::Int16Type&) { + SetDataType(NUdf::EDataSlot::Int16); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::UInt16Type&) { + SetDataType(NUdf::EDataSlot::Uint16); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::Int32Type&) { + SetDataType(NUdf::EDataSlot::Int32); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::UInt32Type&) { + SetDataType(NUdf::EDataSlot::Uint32); + return arrow::Status::OK(); + } + + arrow::Status Visit(const arrow::Int64Type&) { + SetDataType(NUdf::EDataSlot::Int64); + return arrow::Status::OK(); + } + arrow::Status Visit(const arrow::UInt64Type&) { SetDataType(NUdf::EDataSlot::Uint64); return arrow::Status::OK(); diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.h b/ydb/library/yql/minikql/arrow/mkql_functions.h index 30d84518e9..329456ecab 100644 --- a/ydb/library/yql/minikql/arrow/mkql_functions.h +++ b/ydb/library/yql/minikql/arrow/mkql_functions.h @@ -6,6 +6,7 @@ namespace NKikimr::NMiniKQL { bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TType*& outputType, TTypeEnvironment& env); -bool ConvertInputArrowType(TType* type, bool& isOptional, arrow::ValueDescr& descr); +bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr); +bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 175b3adbc7..524fe761a3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -1,6 +1,7 @@ #include "mkql_blocks.h" #include <ydb/library/yql/minikql/arrow/arrow_defs.h> +#include <ydb/library/yql/minikql/arrow/mkql_functions.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> @@ -16,35 +17,22 @@ namespace NMiniKQL { namespace { -class TBlockBuilder { +class TBlockBuilderBase { public: - explicit TBlockBuilder(TComputationContext& ctx) - : ItemType(arrow::uint64()) - , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType)) - , Ctx(ctx) - , Builder(&Ctx.ArrowMemoryPool) - { - ARROW_OK(Builder.Reserve(MaxLength_)); - } + TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) + : Ctx_(ctx) + , ItemType_(itemType) + , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType_)) + {} - void Add(NUdf::TUnboxedValue& value) { - Y_VERIFY_DEBUG(Builder.length() < MaxLength_); - if (value) { - Builder.UnsafeAppend(value.Get<ui64>()); - } else { - Builder.UnsafeAppendNull(); - } - } + virtual ~TBlockBuilderBase() = default; inline size_t MaxLength() const noexcept { return MaxLength_; } - NUdf::TUnboxedValuePod Build() { - std::shared_ptr<arrow::ArrayData> result; - ARROW_OK(Builder.FinishInternal(&result)); - return Ctx.HolderFactory.CreateArrowBlock(std::move(result)); - } + virtual void Add(NUdf::TUnboxedValue& value) = 0; + virtual NUdf::TUnboxedValuePod Build() = 0; private: static int64_t TypeSize(arrow::DataType& itemType) { @@ -52,94 +40,144 @@ private: return arrow::BitUtil::BytesForBits(bits); } -private: static constexpr size_t MaxBlockSizeInBytes = 1_MB; -private: - std::shared_ptr<arrow::DataType> ItemType; +protected: + TComputationContext& Ctx_; + const std::shared_ptr<arrow::DataType> ItemType_; const size_t MaxLength_; - TComputationContext& Ctx; - arrow::UInt64Builder Builder; }; +template <typename T, typename TBuilder> +class TFixedSizeBlockBuilder : public TBlockBuilderBase { +public: + TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) + : TBlockBuilderBase(ctx, itemType) + , Builder_(&Ctx_.ArrowMemoryPool) + { + this->Reserve(); + } + + void Add(NUdf::TUnboxedValue& value) override { + Y_VERIFY_DEBUG(Builder_.length() < MaxLength_); + if (value) { + this->Builder_.UnsafeAppend(value.Get<T>()); + } else { + this->Builder_.UnsafeAppendNull(); + } + } + + NUdf::TUnboxedValuePod Build() override { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(this->Builder_.FinishInternal(&result)); + return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result)); + } + +private: + void Reserve() { + ARROW_OK(this->Builder_.Reserve(MaxLength_)); + } + +private: + TBuilder Builder_; +}; + +std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, NUdf::EDataSlot slot) { + switch (slot) { + case NUdf::EDataSlot::Bool: + return std::make_unique<TFixedSizeBlockBuilder<bool, arrow::BooleanBuilder>>(ctx, arrow::boolean()); + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8()); + case NUdf::EDataSlot::Uint8: + return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8()); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16()); + case NUdf::EDataSlot::Uint16: + return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16()); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32()); + case NUdf::EDataSlot::Uint32: + return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32()); + case NUdf::EDataSlot::Int64: + return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64()); + case NUdf::EDataSlot::Uint64: + return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64()); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } +} + class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> { public: - explicit TToBlocksWrapper(IComputationNode* flow) + explicit TToBlocksWrapper(IComputationNode* flow, NUdf::EDataSlot slot) : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed) - , Flow(flow) + , Flow_(flow) + , Slot_(slot) { } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto builder = TBlockBuilder(ctx); + auto builder = MakeBlockBuilder(ctx, Slot_); - for (size_t i = 0; i < builder.MaxLength(); ++i) { - auto result = Flow->GetValue(ctx); + for (size_t i = 0; i < builder->MaxLength(); ++i) { + auto result = Flow_->GetValue(ctx); if (result.IsFinish() || result.IsYield()) { if (i == 0) { return result.Release(); } break; } - builder.Add(result); + builder->Add(result); } - return builder.Build(); + return builder->Build(); } private: void RegisterDependencies() const final { - FlowDependsOn(Flow); + FlowDependsOn(Flow_); } private: - IComputationNode* const Flow; + IComputationNode* const Flow_; + NUdf::EDataSlot Slot_; }; class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> { public: TWideToBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - size_t width) + TVector<NUdf::EDataSlot>&& slots) : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) - , Flow(flow) - , Width(width) + , Flow_(flow) + , Slots_(std::move(slots)) + , Width_(Slots_.size()) { - Y_VERIFY_DEBUG(Width > 0); + Y_VERIFY_DEBUG(Width_ > 0); } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - auto builders = std::vector<TBlockBuilder>(); - builders.reserve(Width); - for (size_t i = 0; i < Width; ++i) { - builders.push_back(TBlockBuilder(ctx)); - } - size_t maxLength = builders.front().MaxLength(); - for (size_t i = 1; i < Width; ++i) { - maxLength = Min(maxLength, builders[i].MaxLength()); - } - auto& s = GetState(state, ctx); - for (size_t i = 0; i < maxLength; ++i) { - if (const auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); EFetchResult::One != result) { + for (size_t i = 0; i < s.MaxLength_; ++i) { + if (const auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); EFetchResult::One != result) { if (i == 0) { return result; } break; } - for (size_t j = 0; j < Width; ++j) { + for (size_t j = 0; j < Width_; ++j) { if (output[j] != nullptr) { - builders[j].Add(s.Values[j]); + s.Builders_[j]->Add(s.Values_[j]); } } } - for (size_t i = 0; i < Width; ++i) { + for (size_t i = 0; i < Width_; ++i) { if (auto* out = output[i]; out != nullptr) { - *out = builders[i].Build(); + *out = s.Builders_[i]->Build(); } } @@ -147,121 +185,187 @@ public: } private: - struct TState: public TComputationValue<TState> { - std::vector<NUdf::TUnboxedValue> Values; - std::vector<NUdf::TUnboxedValue*> ValuePointers; + struct TState : public TComputationValue<TState> { + std::vector<NUdf::TUnboxedValue> Values_; + std::vector<NUdf::TUnboxedValue*> ValuePointers_; + std::vector<std::unique_ptr<TBlockBuilderBase>> Builders_; + size_t MaxLength_; - TState(TMemoryUsageInfo* memInfo, size_t width) + TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<NUdf::EDataSlot>& slots) : TComputationValue(memInfo) - , Values(width) - , ValuePointers(width) + , Values_(slots.size()) + , ValuePointers_(slots.size()) { - for (size_t i = 0; i < width; ++i) { - ValuePointers[i] = &Values[i]; + for (size_t i = 0; i < slots.size(); ++i) { + ValuePointers_[i] = &Values_[i]; + Builders_.push_back(MakeBlockBuilder(ctx, slots[i])); + } + + MaxLength_ = Builders_.front()->MaxLength(); + for (size_t i = 1; i < slots.size(); ++i) { + MaxLength_ = Min(MaxLength_, Builders_[i]->MaxLength()); } } }; private: void RegisterDependencies() const final { - FlowDependsOn(Flow); + FlowDependsOn(Flow_); } TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Width); + state = ctx.HolderFactory.Create<TState>(ctx, Slots_); } return *static_cast<TState*>(state.AsBoxed().Get()); } private: - IComputationWideFlowNode* Flow; - const size_t Width; + IComputationWideFlowNode* Flow_; + const TVector<NUdf::EDataSlot> Slots_; + const size_t Width_; +}; + +class TBlockReaderBase { +public: + virtual ~TBlockReaderBase() = default; + + virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0; +}; + +template <typename T> +class TFixedSizeBlockReader : public TBlockReaderBase { +public: + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) override { + return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]); + } }; +class TBoolBlockReader : public TBlockReaderBase { +public: + NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) override { + return NUdf::TUnboxedValuePod(arrow::BitUtil::GetBit(data.GetValues<uint8_t>(1), index)); + } +}; + +std::unique_ptr<TBlockReaderBase> MakeBlockReader(NUdf::EDataSlot slot) { + switch (slot) { + case NUdf::EDataSlot::Bool: + return std::make_unique<TBoolBlockReader>(); + case NUdf::EDataSlot::Int8: + return std::make_unique<TFixedSizeBlockReader<i8>>(); + case NUdf::EDataSlot::Uint8: + return std::make_unique<TFixedSizeBlockReader<ui8>>(); + case NUdf::EDataSlot::Int16: + return std::make_unique<TFixedSizeBlockReader<i16>>(); + case NUdf::EDataSlot::Uint16: + return std::make_unique<TFixedSizeBlockReader<ui16>>(); + case NUdf::EDataSlot::Int32: + return std::make_unique<TFixedSizeBlockReader<i32>>(); + case NUdf::EDataSlot::Uint32: + return std::make_unique<TFixedSizeBlockReader<ui32>>(); + case NUdf::EDataSlot::Int64: + return std::make_unique<TFixedSizeBlockReader<i64>>(); + case NUdf::EDataSlot::Uint64: + return std::make_unique<TFixedSizeBlockReader<ui64>>(); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } +} + class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> { public: - TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow) + TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, NUdf::EDataSlot slot) : TMutableComputationNode(mutables) - , Flow(flow) - , StateIndex(mutables.CurValueIndex++) + , Flow_(flow) + , Slot_(slot) + , StateIndex_(mutables.CurValueIndex++) { } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto& state = GetState(ctx); - if (state.Array == nullptr || state.Index == state.Array->length) { - auto result = Flow->GetValue(ctx); + if (state.Array_ == nullptr || state.Index_ == state.Array_->length) { + auto result = Flow_->GetValue(ctx); if (result.IsFinish()) { return NUdf::TUnboxedValue::MakeFinish(); } if (result.IsYield()) { return NUdf::TUnboxedValue::MakeYield(); } - state.Array = TArrowBlock::From(result).GetDatum().array(); - state.Index = 0; + state.Array_ = TArrowBlock::From(result).GetDatum().array(); + state.Index_ = 0; } const auto result = state.GetValue(); - ++state.Index; + ++state.Index_; return result; } private: - struct TState: public TComputationValue<TState> { + struct TState : public TComputationValue<TState> { using TComputationValue::TComputationValue; + TState(TMemoryUsageInfo* memInfo, NUdf::EDataSlot slot) + : TComputationValue(memInfo) + { + Reader_ = MakeBlockReader(slot); + } + NUdf::TUnboxedValuePod GetValue() const { - const auto nullCount = Array->GetNullCount(); + const auto nullCount = Array_->GetNullCount(); - return nullCount == Array->length || (nullCount > 0 && !HasValue()) + return nullCount == Array_->length || (nullCount > 0 && !HasValue()) ? NUdf::TUnboxedValuePod() : DoGetValue(); } private: NUdf::TUnboxedValuePod DoGetValue() const { - return NUdf::TUnboxedValuePod(Array->GetValues<ui64>(1)[Index]); + return Reader_->Get(*Array_, Index_); } bool HasValue() const { - return arrow::BitUtil::GetBit(Array->GetValues<uint8_t>(0), Index + Array->offset); + return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0), Index_ + Array_->offset); } + std::unique_ptr<TBlockReaderBase> Reader_; public: - std::shared_ptr<arrow::ArrayData> Array{nullptr}; - size_t Index{0}; + std::shared_ptr<arrow::ArrayData> Array_{nullptr}; + size_t Index_{0}; }; private: void RegisterDependencies() const final { - this->DependsOn(Flow); + this->DependsOn(Flow_); } TState& GetState(TComputationContext& ctx) const { - auto& result = ctx.MutableValues[StateIndex]; + auto& result = ctx.MutableValues[StateIndex_]; if (!result.HasValue()) { - result = ctx.HolderFactory.Create<TState>(); + result = ctx.HolderFactory.Create<TState>(Slot_); } return *static_cast<TState*>(result.AsBoxed().Get()); } private: - IComputationNode* const Flow; - const ui32 StateIndex; + IComputationNode* const Flow_; + const NUdf::EDataSlot Slot_; + const ui32 StateIndex_; }; class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> { public: TWideFromBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, - size_t width) + TVector<NUdf::EDataSlot>&& slots) : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any) - , Flow(flow) - , Width(width) + , Flow_(flow) + , Slots_(std::move(slots)) + , Width_(Slots_.size()) { - Y_VERIFY_DEBUG(Width > 0); + Y_VERIFY_DEBUG(Width_ > 0); } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, @@ -269,130 +373,173 @@ public: NUdf::TUnboxedValue*const* output) const { auto& s = GetState(state, ctx); - while (s.Index == s.Count) { - for (size_t i = 0; i < Width; ++i) { - s.Arrays[i] = nullptr; + while (s.Index_ == s.Count_) { + for (size_t i = 0; i < Width_; ++i) { + s.Arrays_[i] = nullptr; } - auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); + auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); if (result != EFetchResult::One) { return result; } - s.Index = 0; - for (size_t i = 0; i < Width; ++i) { - s.Arrays[i] = TArrowBlock::From(s.Values[i]).GetDatum().array(); + s.Index_ = 0; + for (size_t i = 0; i < Width_; ++i) { + s.Arrays_[i] = TArrowBlock::From(s.Values_[i]).GetDatum().array(); } - s.Count = s.Arrays[0]->length; + s.Count_ = s.Arrays_[0]->length; } - for (size_t i = 0; i < Width; ++i) { + for (size_t i = 0; i < Width_; ++i) { if (!output[i]) { continue; } - const auto& array = s.Arrays[i]; + const auto& array = s.Arrays_[i]; const auto nullCount = array->GetNullCount(); - if (nullCount == array->length || (nullCount > 0 && !arrow::BitUtil::GetBit(array->GetValues<uint8_t>(0), s.Index + array->offset))) { + if (nullCount == array->length || (nullCount > 0 && !arrow::BitUtil::GetBit(array->GetValues<uint8_t>(0), s.Index_ + array->offset))) { *(output[i]) = NUdf::TUnboxedValue(); } else { - *(output[i]) = NUdf::TUnboxedValuePod(array->GetValues<ui64>(1)[s.Index]); + *(output[i]) = s.Readers_[i]->Get(*array, s.Index_); } } - ++s.Index; + ++s.Index_; return EFetchResult::One; } private: struct TState : public TComputationValue<TState> { - std::vector<NUdf::TUnboxedValue> Values; - std::vector<NUdf::TUnboxedValue*> ValuePointers; - std::vector<std::shared_ptr<arrow::ArrayData>> Arrays; - size_t Count = 0; - size_t Index = 0; - - TState(TMemoryUsageInfo* memInfo, size_t width) + TVector<NUdf::TUnboxedValue> Values_; + TVector<NUdf::TUnboxedValue*> ValuePointers_; + TVector<std::shared_ptr<arrow::ArrayData>> Arrays_; + TVector<std::unique_ptr<TBlockReaderBase>> Readers_; + size_t Count_ = 0; + size_t Index_ = 0; + + TState(TMemoryUsageInfo* memInfo, const TVector<NUdf::EDataSlot>& slots) : TComputationValue(memInfo) - , Values(width) - , ValuePointers(width) - , Arrays(width) + , Values_(slots.size()) + , ValuePointers_(slots.size()) + , Arrays_(slots.size()) { - for (size_t i = 0; i < width; ++i) { - ValuePointers[i] = &Values[i]; + for (size_t i = 0; i < slots.size(); ++i) { + ValuePointers_[i] = &Values_[i]; + Readers_.push_back(MakeBlockReader(slots[i])); } } }; private: void RegisterDependencies() const final { - FlowDependsOn(Flow); + FlowDependsOn(Flow_); } TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Width); + state = ctx.HolderFactory.Create<TState>(Slots_); } return *static_cast<TState*>(state.AsBoxed().Get()); } private: - IComputationWideFlowNode* Flow; - const size_t Width; + IComputationWideFlowNode* Flow_; + const TVector<NUdf::EDataSlot> Slots_; + const size_t Width_; }; arrow::Datum ExtractLiteral(TRuntimeNode n) { if (n.GetStaticType()->IsOptional()) { const auto* dataLiteral = AS_VALUE(TOptionalLiteral, n); if (!dataLiteral->HasItem()) { - return arrow::MakeNullScalar(arrow::uint64()); + bool isOptional; + auto unpacked = UnpackOptionalData(dataLiteral->GetType(), isOptional); + std::shared_ptr<arrow::DataType> type; + MKQL_ENSURE(ConvertArrowType(unpacked, isOptional, type), "Unsupported type of literal"); + return arrow::MakeNullScalar(type); } n = dataLiteral->GetItem(); } const auto* dataLiteral = AS_VALUE(TDataLiteral, n); - return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>())); + switch (*dataLiteral->GetType()->GetDataSlot()) { + case NUdf::EDataSlot::Bool: + return arrow::Datum(static_cast<bool>(dataLiteral->AsValue().Get<bool>())); + case NUdf::EDataSlot::Int8: + return arrow::Datum(static_cast<int8_t>(dataLiteral->AsValue().Get<i8>())); + case NUdf::EDataSlot::Uint8: + return arrow::Datum(static_cast<uint8_t>(dataLiteral->AsValue().Get<ui8>())); + case NUdf::EDataSlot::Int16: + return arrow::Datum(static_cast<int16_t>(dataLiteral->AsValue().Get<i16>())); + case NUdf::EDataSlot::Uint16: + return arrow::Datum(static_cast<uint16_t>(dataLiteral->AsValue().Get<ui16>())); + case NUdf::EDataSlot::Int32: + return arrow::Datum(static_cast<int32_t>(dataLiteral->AsValue().Get<i32>())); + case NUdf::EDataSlot::Uint32: + return arrow::Datum(static_cast<uint32_t>(dataLiteral->AsValue().Get<ui32>())); + case NUdf::EDataSlot::Int64: + return arrow::Datum(static_cast<int64_t>(dataLiteral->AsValue().Get<i64>())); + case NUdf::EDataSlot::Uint64: + return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>())); + default: + MKQL_ENSURE(false, "Unsupported data slot"); + } } } IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - - return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0)); + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + bool isOptional; + const auto slot = *UnpackOptionalData(flowType->GetItemType(), isOptional)->GetDataSlot(); + return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), slot); } IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + TVector<NUdf::EDataSlot> slots; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + bool isOptional; + const auto slot = *UnpackOptionalData(tupleType->GetElementType(i), isOptional)->GetDataSlot(); + slots.push_back(slot); + } - auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TWideToBlocksWrapper(ctx.Mutables, - wideFlow, - tupleType->GetElementsCount()); + return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots)); } IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0)); + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto blockType = AS_TYPE(TBlockType, flowType->GetItemType()); + bool isOptional; + const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot(); + return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), slot); } IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + TVector<NUdf::EDataSlot> slots; + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i)); + bool isOptional; + const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot(); + slots.push_back(slot); + } - auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TWideFromBlocksWrapper(ctx.Mutables, - wideFlow, - tupleType->GetElementsCount()); + return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots)); } IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) { |