aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-09-20 12:29:32 +0300
committervvvv <vvvv@ydb.tech>2022-09-20 12:29:32 +0300
commit1c0cc1a8107aa2123b564ac3832a8cdcc644d8d3 (patch)
tree969e204943b516534d14100b27cdf2ad0141cf06
parentaac6159f005fb36cabe1d93ccc45d9fab6100c8f (diff)
downloadydb-1c0cc1a8107aa2123b564ac3832a8cdcc644d8d3.tar.gz
support of fixed size types: Bool, [U]Int{8,16,32,64}.
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp1
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_functions.cpp72
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_functions.h3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp423
4 files changed, 354 insertions, 145 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 64f027f69a..0525349a78 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -4305,6 +4305,7 @@ struct TBlockRules {
static constexpr std::initializer_list<TBlockFuncMap::value_type> FuncsInit = {
{"+", "add" },
+ {"Not", "invert" },
};
TBlockRules()
diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.cpp b/ydb/library/yql/minikql/arrow/mkql_functions.cpp
index 2f4133405f..73b82e7cd9 100644
--- a/ydb/library/yql/minikql/arrow/mkql_functions.cpp
+++ b/ydb/library/yql/minikql/arrow/mkql_functions.cpp
@@ -9,10 +9,8 @@
namespace NKikimr::NMiniKQL {
-bool ConvertInputArrowType(TType* type, bool& isOptional, arrow::ValueDescr& descr) {
- auto blockType = AS_TYPE(TBlockType, type);
- descr.shape = blockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY;
- auto unpacked = UnpackOptional(blockType->GetItemType(), isOptional);
+bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) {
+ auto unpacked = UnpackOptional(itemType, isOptional);
if (!unpacked->IsData()) {
return false;
}
@@ -24,16 +22,43 @@ bool ConvertInputArrowType(TType* type, bool& isOptional, arrow::ValueDescr& des
switch (*slot) {
case NUdf::EDataSlot::Bool:
- descr.type = arrow::boolean();
+ type = arrow::boolean();
+ return true;
+ case NUdf::EDataSlot::Uint8:
+ type = arrow::uint8();
+ return true;
+ case NUdf::EDataSlot::Int8:
+ type = arrow::int8();
+ return true;
+ case NUdf::EDataSlot::Uint16:
+ type = arrow::uint16();
+ return true;
+ case NUdf::EDataSlot::Int16:
+ type = arrow::int16();
+ return true;
+ case NUdf::EDataSlot::Uint32:
+ type = arrow::uint32();
+ return true;
+ case NUdf::EDataSlot::Int32:
+ type = arrow::int32();
+ return true;
+ case NUdf::EDataSlot::Int64:
+ type = arrow::int64();
return true;
case NUdf::EDataSlot::Uint64:
- descr.type = arrow::uint64();
+ type = arrow::uint64();
return true;
default:
return false;
}
}
+bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr) {
+ auto asBlockType = AS_TYPE(TBlockType, blockType);
+ descr.shape = asBlockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY;
+ return ConvertArrowType(asBlockType->GetItemType(), isOptional, descr.type);
+}
+
class TOutputTypeVisitor : public arrow::TypeVisitor
{
public:
@@ -46,6 +71,41 @@ public:
return arrow::Status::OK();
}
+ arrow::Status Visit(const arrow::Int8Type&) {
+ SetDataType(NUdf::EDataSlot::Int8);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::UInt8Type&) {
+ SetDataType(NUdf::EDataSlot::Uint8);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::Int16Type&) {
+ SetDataType(NUdf::EDataSlot::Int16);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::UInt16Type&) {
+ SetDataType(NUdf::EDataSlot::Uint16);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::Int32Type&) {
+ SetDataType(NUdf::EDataSlot::Int32);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::UInt32Type&) {
+ SetDataType(NUdf::EDataSlot::Uint32);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Visit(const arrow::Int64Type&) {
+ SetDataType(NUdf::EDataSlot::Int64);
+ return arrow::Status::OK();
+ }
+
arrow::Status Visit(const arrow::UInt64Type&) {
SetDataType(NUdf::EDataSlot::Uint64);
return arrow::Status::OK();
diff --git a/ydb/library/yql/minikql/arrow/mkql_functions.h b/ydb/library/yql/minikql/arrow/mkql_functions.h
index 30d84518e9..329456ecab 100644
--- a/ydb/library/yql/minikql/arrow/mkql_functions.h
+++ b/ydb/library/yql/minikql/arrow/mkql_functions.h
@@ -6,6 +6,7 @@
namespace NKikimr::NMiniKQL {
bool FindArrowFunction(TStringBuf name, const TArrayRef<TType*>& inputTypes, TType*& outputType, TTypeEnvironment& env);
-bool ConvertInputArrowType(TType* type, bool& isOptional, arrow::ValueDescr& descr);
+bool ConvertInputArrowType(TType* blockType, bool& isOptional, arrow::ValueDescr& descr);
+bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type);
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 175b3adbc7..524fe761a3 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -1,6 +1,7 @@
#include "mkql_blocks.h"
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
+#include <ydb/library/yql/minikql/arrow/mkql_functions.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
@@ -16,35 +17,22 @@ namespace NMiniKQL {
namespace {
-class TBlockBuilder {
+class TBlockBuilderBase {
public:
- explicit TBlockBuilder(TComputationContext& ctx)
- : ItemType(arrow::uint64())
- , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType))
- , Ctx(ctx)
- , Builder(&Ctx.ArrowMemoryPool)
- {
- ARROW_OK(Builder.Reserve(MaxLength_));
- }
+ TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
+ : Ctx_(ctx)
+ , ItemType_(itemType)
+ , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType_))
+ {}
- void Add(NUdf::TUnboxedValue& value) {
- Y_VERIFY_DEBUG(Builder.length() < MaxLength_);
- if (value) {
- Builder.UnsafeAppend(value.Get<ui64>());
- } else {
- Builder.UnsafeAppendNull();
- }
- }
+ virtual ~TBlockBuilderBase() = default;
inline size_t MaxLength() const noexcept {
return MaxLength_;
}
- NUdf::TUnboxedValuePod Build() {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder.FinishInternal(&result));
- return Ctx.HolderFactory.CreateArrowBlock(std::move(result));
- }
+ virtual void Add(NUdf::TUnboxedValue& value) = 0;
+ virtual NUdf::TUnboxedValuePod Build() = 0;
private:
static int64_t TypeSize(arrow::DataType& itemType) {
@@ -52,94 +40,144 @@ private:
return arrow::BitUtil::BytesForBits(bits);
}
-private:
static constexpr size_t MaxBlockSizeInBytes = 1_MB;
-private:
- std::shared_ptr<arrow::DataType> ItemType;
+protected:
+ TComputationContext& Ctx_;
+ const std::shared_ptr<arrow::DataType> ItemType_;
const size_t MaxLength_;
- TComputationContext& Ctx;
- arrow::UInt64Builder Builder;
};
+template <typename T, typename TBuilder>
+class TFixedSizeBlockBuilder : public TBlockBuilderBase {
+public:
+ TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
+ : TBlockBuilderBase(ctx, itemType)
+ , Builder_(&Ctx_.ArrowMemoryPool)
+ {
+ this->Reserve();
+ }
+
+ void Add(NUdf::TUnboxedValue& value) override {
+ Y_VERIFY_DEBUG(Builder_.length() < MaxLength_);
+ if (value) {
+ this->Builder_.UnsafeAppend(value.Get<T>());
+ } else {
+ this->Builder_.UnsafeAppendNull();
+ }
+ }
+
+ NUdf::TUnboxedValuePod Build() override {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(this->Builder_.FinishInternal(&result));
+ return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result));
+ }
+
+private:
+ void Reserve() {
+ ARROW_OK(this->Builder_.Reserve(MaxLength_));
+ }
+
+private:
+ TBuilder Builder_;
+};
+
+std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, NUdf::EDataSlot slot) {
+ switch (slot) {
+ case NUdf::EDataSlot::Bool:
+ return std::make_unique<TFixedSizeBlockBuilder<bool, arrow::BooleanBuilder>>(ctx, arrow::boolean());
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8());
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8());
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16());
+ case NUdf::EDataSlot::Uint16:
+ return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16());
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32());
+ case NUdf::EDataSlot::Uint32:
+ return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32());
+ case NUdf::EDataSlot::Int64:
+ return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64());
+ case NUdf::EDataSlot::Uint64:
+ return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64());
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+}
+
class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> {
public:
- explicit TToBlocksWrapper(IComputationNode* flow)
+ explicit TToBlocksWrapper(IComputationNode* flow, NUdf::EDataSlot slot)
: TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
- , Flow(flow)
+ , Flow_(flow)
+ , Slot_(slot)
{
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto builder = TBlockBuilder(ctx);
+ auto builder = MakeBlockBuilder(ctx, Slot_);
- for (size_t i = 0; i < builder.MaxLength(); ++i) {
- auto result = Flow->GetValue(ctx);
+ for (size_t i = 0; i < builder->MaxLength(); ++i) {
+ auto result = Flow_->GetValue(ctx);
if (result.IsFinish() || result.IsYield()) {
if (i == 0) {
return result.Release();
}
break;
}
- builder.Add(result);
+ builder->Add(result);
}
- return builder.Build();
+ return builder->Build();
}
private:
void RegisterDependencies() const final {
- FlowDependsOn(Flow);
+ FlowDependsOn(Flow_);
}
private:
- IComputationNode* const Flow;
+ IComputationNode* const Flow_;
+ NUdf::EDataSlot Slot_;
};
class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> {
public:
TWideToBlocksWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
- size_t width)
+ TVector<NUdf::EDataSlot>&& slots)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
- , Flow(flow)
- , Width(width)
+ , Flow_(flow)
+ , Slots_(std::move(slots))
+ , Width_(Slots_.size())
{
- Y_VERIFY_DEBUG(Width > 0);
+ Y_VERIFY_DEBUG(Width_ > 0);
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
TComputationContext& ctx,
NUdf::TUnboxedValue*const* output) const
{
- auto builders = std::vector<TBlockBuilder>();
- builders.reserve(Width);
- for (size_t i = 0; i < Width; ++i) {
- builders.push_back(TBlockBuilder(ctx));
- }
- size_t maxLength = builders.front().MaxLength();
- for (size_t i = 1; i < Width; ++i) {
- maxLength = Min(maxLength, builders[i].MaxLength());
- }
-
auto& s = GetState(state, ctx);
- for (size_t i = 0; i < maxLength; ++i) {
- if (const auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); EFetchResult::One != result) {
+ for (size_t i = 0; i < s.MaxLength_; ++i) {
+ if (const auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); EFetchResult::One != result) {
if (i == 0) {
return result;
}
break;
}
- for (size_t j = 0; j < Width; ++j) {
+ for (size_t j = 0; j < Width_; ++j) {
if (output[j] != nullptr) {
- builders[j].Add(s.Values[j]);
+ s.Builders_[j]->Add(s.Values_[j]);
}
}
}
- for (size_t i = 0; i < Width; ++i) {
+ for (size_t i = 0; i < Width_; ++i) {
if (auto* out = output[i]; out != nullptr) {
- *out = builders[i].Build();
+ *out = s.Builders_[i]->Build();
}
}
@@ -147,121 +185,187 @@ public:
}
private:
- struct TState: public TComputationValue<TState> {
- std::vector<NUdf::TUnboxedValue> Values;
- std::vector<NUdf::TUnboxedValue*> ValuePointers;
+ struct TState : public TComputationValue<TState> {
+ std::vector<NUdf::TUnboxedValue> Values_;
+ std::vector<NUdf::TUnboxedValue*> ValuePointers_;
+ std::vector<std::unique_ptr<TBlockBuilderBase>> Builders_;
+ size_t MaxLength_;
- TState(TMemoryUsageInfo* memInfo, size_t width)
+ TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<NUdf::EDataSlot>& slots)
: TComputationValue(memInfo)
- , Values(width)
- , ValuePointers(width)
+ , Values_(slots.size())
+ , ValuePointers_(slots.size())
{
- for (size_t i = 0; i < width; ++i) {
- ValuePointers[i] = &Values[i];
+ for (size_t i = 0; i < slots.size(); ++i) {
+ ValuePointers_[i] = &Values_[i];
+ Builders_.push_back(MakeBlockBuilder(ctx, slots[i]));
+ }
+
+ MaxLength_ = Builders_.front()->MaxLength();
+ for (size_t i = 1; i < slots.size(); ++i) {
+ MaxLength_ = Min(MaxLength_, Builders_[i]->MaxLength());
}
}
};
private:
void RegisterDependencies() const final {
- FlowDependsOn(Flow);
+ FlowDependsOn(Flow_);
}
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Width);
+ state = ctx.HolderFactory.Create<TState>(ctx, Slots_);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
private:
- IComputationWideFlowNode* Flow;
- const size_t Width;
+ IComputationWideFlowNode* Flow_;
+ const TVector<NUdf::EDataSlot> Slots_;
+ const size_t Width_;
+};
+
+class TBlockReaderBase {
+public:
+ virtual ~TBlockReaderBase() = default;
+
+ virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0;
+};
+
+template <typename T>
+class TFixedSizeBlockReader : public TBlockReaderBase {
+public:
+ NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) override {
+ return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]);
+ }
};
+class TBoolBlockReader : public TBlockReaderBase {
+public:
+ NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) override {
+ return NUdf::TUnboxedValuePod(arrow::BitUtil::GetBit(data.GetValues<uint8_t>(1), index));
+ }
+};
+
+std::unique_ptr<TBlockReaderBase> MakeBlockReader(NUdf::EDataSlot slot) {
+ switch (slot) {
+ case NUdf::EDataSlot::Bool:
+ return std::make_unique<TBoolBlockReader>();
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeBlockReader<i8>>();
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TFixedSizeBlockReader<ui8>>();
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeBlockReader<i16>>();
+ case NUdf::EDataSlot::Uint16:
+ return std::make_unique<TFixedSizeBlockReader<ui16>>();
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeBlockReader<i32>>();
+ case NUdf::EDataSlot::Uint32:
+ return std::make_unique<TFixedSizeBlockReader<ui32>>();
+ case NUdf::EDataSlot::Int64:
+ return std::make_unique<TFixedSizeBlockReader<i64>>();
+ case NUdf::EDataSlot::Uint64:
+ return std::make_unique<TFixedSizeBlockReader<ui64>>();
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+}
+
class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
public:
- TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow)
+ TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, NUdf::EDataSlot slot)
: TMutableComputationNode(mutables)
- , Flow(flow)
- , StateIndex(mutables.CurValueIndex++)
+ , Flow_(flow)
+ , Slot_(slot)
+ , StateIndex_(mutables.CurValueIndex++)
{
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto& state = GetState(ctx);
- if (state.Array == nullptr || state.Index == state.Array->length) {
- auto result = Flow->GetValue(ctx);
+ if (state.Array_ == nullptr || state.Index_ == state.Array_->length) {
+ auto result = Flow_->GetValue(ctx);
if (result.IsFinish()) {
return NUdf::TUnboxedValue::MakeFinish();
}
if (result.IsYield()) {
return NUdf::TUnboxedValue::MakeYield();
}
- state.Array = TArrowBlock::From(result).GetDatum().array();
- state.Index = 0;
+ state.Array_ = TArrowBlock::From(result).GetDatum().array();
+ state.Index_ = 0;
}
const auto result = state.GetValue();
- ++state.Index;
+ ++state.Index_;
return result;
}
private:
- struct TState: public TComputationValue<TState> {
+ struct TState : public TComputationValue<TState> {
using TComputationValue::TComputationValue;
+ TState(TMemoryUsageInfo* memInfo, NUdf::EDataSlot slot)
+ : TComputationValue(memInfo)
+ {
+ Reader_ = MakeBlockReader(slot);
+ }
+
NUdf::TUnboxedValuePod GetValue() const {
- const auto nullCount = Array->GetNullCount();
+ const auto nullCount = Array_->GetNullCount();
- return nullCount == Array->length || (nullCount > 0 && !HasValue())
+ return nullCount == Array_->length || (nullCount > 0 && !HasValue())
? NUdf::TUnboxedValuePod()
: DoGetValue();
}
private:
NUdf::TUnboxedValuePod DoGetValue() const {
- return NUdf::TUnboxedValuePod(Array->GetValues<ui64>(1)[Index]);
+ return Reader_->Get(*Array_, Index_);
}
bool HasValue() const {
- return arrow::BitUtil::GetBit(Array->GetValues<uint8_t>(0), Index + Array->offset);
+ return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0), Index_ + Array_->offset);
}
+ std::unique_ptr<TBlockReaderBase> Reader_;
public:
- std::shared_ptr<arrow::ArrayData> Array{nullptr};
- size_t Index{0};
+ std::shared_ptr<arrow::ArrayData> Array_{nullptr};
+ size_t Index_{0};
};
private:
void RegisterDependencies() const final {
- this->DependsOn(Flow);
+ this->DependsOn(Flow_);
}
TState& GetState(TComputationContext& ctx) const {
- auto& result = ctx.MutableValues[StateIndex];
+ auto& result = ctx.MutableValues[StateIndex_];
if (!result.HasValue()) {
- result = ctx.HolderFactory.Create<TState>();
+ result = ctx.HolderFactory.Create<TState>(Slot_);
}
return *static_cast<TState*>(result.AsBoxed().Get());
}
private:
- IComputationNode* const Flow;
- const ui32 StateIndex;
+ IComputationNode* const Flow_;
+ const NUdf::EDataSlot Slot_;
+ const ui32 StateIndex_;
};
class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> {
public:
TWideFromBlocksWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
- size_t width)
+ TVector<NUdf::EDataSlot>&& slots)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
- , Flow(flow)
- , Width(width)
+ , Flow_(flow)
+ , Slots_(std::move(slots))
+ , Width_(Slots_.size())
{
- Y_VERIFY_DEBUG(Width > 0);
+ Y_VERIFY_DEBUG(Width_ > 0);
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
@@ -269,130 +373,173 @@ public:
NUdf::TUnboxedValue*const* output) const
{
auto& s = GetState(state, ctx);
- while (s.Index == s.Count) {
- for (size_t i = 0; i < Width; ++i) {
- s.Arrays[i] = nullptr;
+ while (s.Index_ == s.Count_) {
+ for (size_t i = 0; i < Width_; ++i) {
+ s.Arrays_[i] = nullptr;
}
- auto result = Flow->FetchValues(ctx, s.ValuePointers.data());
+ auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data());
if (result != EFetchResult::One) {
return result;
}
- s.Index = 0;
- for (size_t i = 0; i < Width; ++i) {
- s.Arrays[i] = TArrowBlock::From(s.Values[i]).GetDatum().array();
+ s.Index_ = 0;
+ for (size_t i = 0; i < Width_; ++i) {
+ s.Arrays_[i] = TArrowBlock::From(s.Values_[i]).GetDatum().array();
}
- s.Count = s.Arrays[0]->length;
+ s.Count_ = s.Arrays_[0]->length;
}
- for (size_t i = 0; i < Width; ++i) {
+ for (size_t i = 0; i < Width_; ++i) {
if (!output[i]) {
continue;
}
- const auto& array = s.Arrays[i];
+ const auto& array = s.Arrays_[i];
const auto nullCount = array->GetNullCount();
- if (nullCount == array->length || (nullCount > 0 && !arrow::BitUtil::GetBit(array->GetValues<uint8_t>(0), s.Index + array->offset))) {
+ if (nullCount == array->length || (nullCount > 0 && !arrow::BitUtil::GetBit(array->GetValues<uint8_t>(0), s.Index_ + array->offset))) {
*(output[i]) = NUdf::TUnboxedValue();
} else {
- *(output[i]) = NUdf::TUnboxedValuePod(array->GetValues<ui64>(1)[s.Index]);
+ *(output[i]) = s.Readers_[i]->Get(*array, s.Index_);
}
}
- ++s.Index;
+ ++s.Index_;
return EFetchResult::One;
}
private:
struct TState : public TComputationValue<TState> {
- std::vector<NUdf::TUnboxedValue> Values;
- std::vector<NUdf::TUnboxedValue*> ValuePointers;
- std::vector<std::shared_ptr<arrow::ArrayData>> Arrays;
- size_t Count = 0;
- size_t Index = 0;
-
- TState(TMemoryUsageInfo* memInfo, size_t width)
+ TVector<NUdf::TUnboxedValue> Values_;
+ TVector<NUdf::TUnboxedValue*> ValuePointers_;
+ TVector<std::shared_ptr<arrow::ArrayData>> Arrays_;
+ TVector<std::unique_ptr<TBlockReaderBase>> Readers_;
+ size_t Count_ = 0;
+ size_t Index_ = 0;
+
+ TState(TMemoryUsageInfo* memInfo, const TVector<NUdf::EDataSlot>& slots)
: TComputationValue(memInfo)
- , Values(width)
- , ValuePointers(width)
- , Arrays(width)
+ , Values_(slots.size())
+ , ValuePointers_(slots.size())
+ , Arrays_(slots.size())
{
- for (size_t i = 0; i < width; ++i) {
- ValuePointers[i] = &Values[i];
+ for (size_t i = 0; i < slots.size(); ++i) {
+ ValuePointers_[i] = &Values_[i];
+ Readers_.push_back(MakeBlockReader(slots[i]));
}
}
};
private:
void RegisterDependencies() const final {
- FlowDependsOn(Flow);
+ FlowDependsOn(Flow_);
}
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Width);
+ state = ctx.HolderFactory.Create<TState>(Slots_);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
private:
- IComputationWideFlowNode* Flow;
- const size_t Width;
+ IComputationWideFlowNode* Flow_;
+ const TVector<NUdf::EDataSlot> Slots_;
+ const size_t Width_;
};
arrow::Datum ExtractLiteral(TRuntimeNode n) {
if (n.GetStaticType()->IsOptional()) {
const auto* dataLiteral = AS_VALUE(TOptionalLiteral, n);
if (!dataLiteral->HasItem()) {
- return arrow::MakeNullScalar(arrow::uint64());
+ bool isOptional;
+ auto unpacked = UnpackOptionalData(dataLiteral->GetType(), isOptional);
+ std::shared_ptr<arrow::DataType> type;
+ MKQL_ENSURE(ConvertArrowType(unpacked, isOptional, type), "Unsupported type of literal");
+ return arrow::MakeNullScalar(type);
}
n = dataLiteral->GetItem();
}
const auto* dataLiteral = AS_VALUE(TDataLiteral, n);
- return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>()));
+ switch (*dataLiteral->GetType()->GetDataSlot()) {
+ case NUdf::EDataSlot::Bool:
+ return arrow::Datum(static_cast<bool>(dataLiteral->AsValue().Get<bool>()));
+ case NUdf::EDataSlot::Int8:
+ return arrow::Datum(static_cast<int8_t>(dataLiteral->AsValue().Get<i8>()));
+ case NUdf::EDataSlot::Uint8:
+ return arrow::Datum(static_cast<uint8_t>(dataLiteral->AsValue().Get<ui8>()));
+ case NUdf::EDataSlot::Int16:
+ return arrow::Datum(static_cast<int16_t>(dataLiteral->AsValue().Get<i16>()));
+ case NUdf::EDataSlot::Uint16:
+ return arrow::Datum(static_cast<uint16_t>(dataLiteral->AsValue().Get<ui16>()));
+ case NUdf::EDataSlot::Int32:
+ return arrow::Datum(static_cast<int32_t>(dataLiteral->AsValue().Get<i32>()));
+ case NUdf::EDataSlot::Uint32:
+ return arrow::Datum(static_cast<uint32_t>(dataLiteral->AsValue().Get<ui32>()));
+ case NUdf::EDataSlot::Int64:
+ return arrow::Datum(static_cast<int64_t>(dataLiteral->AsValue().Get<i64>()));
+ case NUdf::EDataSlot::Uint64:
+ return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>()));
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
}
}
IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
-
- return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0));
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ bool isOptional;
+ const auto slot = *UnpackOptionalData(flowType->GetItemType(), isOptional)->GetDataSlot();
+ return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), slot);
}
IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
- const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ TVector<NUdf::EDataSlot> slots;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ bool isOptional;
+ const auto slot = *UnpackOptionalData(tupleType->GetElementType(i), isOptional)->GetDataSlot();
+ slots.push_back(slot);
+ }
- auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TWideToBlocksWrapper(ctx.Mutables,
- wideFlow,
- tupleType->GetElementsCount());
+ return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots));
}
IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
- return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0));
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto blockType = AS_TYPE(TBlockType, flowType->GetItemType());
+ bool isOptional;
+ const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot();
+ return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), slot);
}
IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ TVector<NUdf::EDataSlot> slots;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i));
+ bool isOptional;
+ const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot();
+ slots.push_back(slot);
+ }
- auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TWideFromBlocksWrapper(ctx.Mutables,
- wideFlow,
- tupleType->GetElementsCount());
+ return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots));
}
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) {