aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-12-13 20:15:20 +0300
committervvvv <vvvv@ydb.tech>2022-12-13 20:15:20 +0300
commit1a43879c5f8c64404638e6a48714106f427bdc71 (patch)
treea2e78b49414813a18c7f24a4ce014abd79d3714f
parent9fbdf83be5486c7553b2c6cc11162b68a3d01834 (diff)
downloadydb-1a43879c5f8c64404638e6a48714106f427bdc71.tar.gz
initial support of float/double/tuple
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp643
-rw-r--r--ydb/library/yql/minikql/mkql_type_builder.cpp23
2 files changed, 435 insertions, 231 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index a24f4a2a0b..96d82c583f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -21,10 +21,10 @@ constexpr size_t MaxBlockSizeInBytes = 1_MB;
class TBlockBuilderBase {
public:
- TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
+ TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength)
: Ctx_(ctx)
, ItemType_(itemType)
- , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType_))
+ , MaxLength_(maxLength)
{}
virtual ~TBlockBuilderBase() = default;
@@ -33,15 +33,9 @@ public:
return MaxLength_;
}
- virtual void Add(NUdf::TUnboxedValue& value) = 0;
+ virtual void Add(const NUdf::TUnboxedValue& value) = 0;
virtual NUdf::TUnboxedValuePod Build(bool finish) = 0;
-private:
- static int64_t TypeSize(arrow::DataType& itemType) {
- const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width();
- return arrow::BitUtil::BytesForBits(bits);
- }
-
protected:
TComputationContext& Ctx_;
const std::shared_ptr<arrow::DataType> ItemType_;
@@ -52,13 +46,13 @@ template <typename T, typename TBuilder>
class TFixedSizeBlockBuilder : public TBlockBuilderBase {
public:
TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
- : TBlockBuilderBase(ctx, itemType)
+ : TBlockBuilderBase(ctx, itemType, MaxBlockSizeInBytes / TypeSize(*itemType))
, Builder_(std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool))
{
this->Reserve();
}
- void Add(NUdf::TUnboxedValue& value) override {
+ void Add(const NUdf::TUnboxedValue& value) override {
Y_VERIFY_DEBUG(Builder_->length() < MaxLength_);
if (value) {
this->Builder_->UnsafeAppend(value.Get<T>());
@@ -84,83 +78,192 @@ private:
ARROW_OK(this->Builder_->Reserve(MaxLength_));
}
+ static int64_t TypeSize(arrow::DataType& itemType) {
+ const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width();
+ return arrow::BitUtil::BytesForBits(bits);
+ }
+
private:
std::unique_ptr<TBuilder> Builder_;
};
-std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, NUdf::EDataSlot slot) {
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8());
- case NUdf::EDataSlot::Uint8:
- case NUdf::EDataSlot::Bool:
- return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8());
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16());
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16());
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32());
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32());
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64());
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64());
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
- }
-}
-
-class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> {
+class TTupleBlockBuilder : public TBlockBuilderBase {
public:
- explicit TToBlocksWrapper(IComputationNode* flow, NUdf::EDataSlot slot)
- : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
- , Flow_(flow)
- , Slot_(slot)
+ TTupleBlockBuilder(TComputationContext & ctx, const std::shared_ptr<arrow::DataType>& itemType, size_t maxLength,
+ TTupleType* tupleType, TVector<std::unique_ptr<TBlockBuilderBase>>&& children)
+ : TBlockBuilderBase(ctx, itemType, maxLength)
+ , TupleType_(tupleType)
+ , Children_(std::move(children))
{
+ bool isOptional;
+ MKQL_ENSURE(ConvertArrowType(TupleType_, isOptional, ArrowType_), "Unsupported type");
+ Reserve();
}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto builder = MakeBlockBuilder(ctx, Slot_);
+ void Add(const NUdf::TUnboxedValue& value) final {
+ if (!value) {
+ NullBitmapBuilder_->UnsafeAppend(false);
+ for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
+ Children_[i]->Add({});
+ }
- for (size_t i = 0; i < builder->MaxLength(); ++i) {
- auto result = Flow_->GetValue(ctx);
- if (result.IsFinish() || result.IsYield()) {
- if (i == 0) {
- return result.Release();
- }
- break;
+ return;
+ }
+
+ NullBitmapBuilder_->UnsafeAppend(true);
+ auto elements = value.GetElements();
+ if (elements) {
+ for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
+ Children_[i]->Add(elements[i]);
+ }
+ } else {
+ for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
+ Children_[i]->Add(value.GetElement(i));
}
- builder->Add(result);
+ }
+ }
+
+ NUdf::TUnboxedValuePod Build(bool finish) final {
+ std::vector<arrow::Datum> childrenValues;
+ childrenValues.reserve(TupleType_->GetElementsCount());
+ for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
+ childrenValues.emplace_back(TArrowBlock::From(Children_[i]->Build(finish)).GetDatum());
+ }
+
+ std::shared_ptr<arrow::Buffer> nullBitmap;
+ auto length = NullBitmapBuilder_->length();
+ auto nullCount = NullBitmapBuilder_->false_count();
+ ARROW_OK(NullBitmapBuilder_->Finish(&nullBitmap));
+ auto arrayData = arrow::ArrayData::Make(ArrowType_, length, { nullBitmap }, nullCount, 0);
+ for (ui32 i = 0; i < TupleType_->GetElementsCount(); ++i) {
+ arrayData->child_data.push_back(childrenValues[i].array());
+ }
+
+ arrow::Datum result(arrayData);
+
+ NullBitmapBuilder_ = nullptr;
+ if (!finish) {
+ Reserve();
}
- return builder->Build(true);
+ return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result));
}
private:
- void RegisterDependencies() const final {
- FlowDependsOn(Flow_);
+ void Reserve() {
+ NullBitmapBuilder_ = std::make_unique<arrow::TypedBufferBuilder<bool>>(&Ctx_.ArrowMemoryPool);
+ ARROW_OK(NullBitmapBuilder_->Reserve(MaxLength_));
}
private:
- IComputationNode* const Flow_;
- NUdf::EDataSlot Slot_;
+ TTupleType* TupleType_;
+ std::shared_ptr<arrow::DataType> ArrowType_;
+ TVector<std::unique_ptr<TBlockBuilderBase>> Children_;
+ std::unique_ptr<arrow::TypedBufferBuilder<bool>> NullBitmapBuilder_;
+};
+
+std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, TType* type) {
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (type->IsTuple()) {
+ bool isOptional;
+ std::shared_ptr<arrow::DataType> arrowType;
+ MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type");
+
+ auto tupleType = AS_TYPE(TTupleType, type);
+ TVector<std::unique_ptr<TBlockBuilderBase>> children;
+ size_t maxLength = MaxBlockSizeInBytes;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ children.emplace_back(MakeBlockBuilder(ctx, tupleType->GetElementType(i)));
+ maxLength = Min(maxLength, children.back()->MaxLength());
+ }
+
+ return std::make_unique<TTupleBlockBuilder>(ctx, arrowType, maxLength, tupleType, std::move(children));
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeBlockBuilder<i8, arrow::Int8Builder>>(ctx, arrow::int8());
+ case NUdf::EDataSlot::Uint8:
+ case NUdf::EDataSlot::Bool:
+ return std::make_unique<TFixedSizeBlockBuilder<ui8, arrow::UInt8Builder>>(ctx, arrow::uint8());
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeBlockBuilder<i16, arrow::Int16Builder>>(ctx, arrow::int16());
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TFixedSizeBlockBuilder<ui16, arrow::UInt16Builder>>(ctx, arrow::uint16());
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeBlockBuilder<i32, arrow::Int32Builder>>(ctx, arrow::int32());
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TFixedSizeBlockBuilder<ui32, arrow::UInt32Builder>>(ctx, arrow::uint32());
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TFixedSizeBlockBuilder<i64, arrow::Int64Builder>>(ctx, arrow::int64());
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TFixedSizeBlockBuilder<ui64, arrow::UInt64Builder>>(ctx, arrow::uint64());
+ case NUdf::EDataSlot::Float:
+ return std::make_unique<TFixedSizeBlockBuilder<float, arrow::FloatBuilder>>(ctx, arrow::float32());
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<TFixedSizeBlockBuilder<double, arrow::DoubleBuilder>>(ctx, arrow::float64());
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
+ }
+ }
+
+ MKQL_ENSURE(false, "Unsupported type");
+}
+
+class TToBlocksWrapper : public TStatelessFlowComputationNode<TToBlocksWrapper> {
+public:
+ explicit TToBlocksWrapper(IComputationNode* flow, TType* itemType)
+ : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
+ , Flow_(flow)
+ , ItemType_(itemType)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto builder = MakeBlockBuilder(ctx, ItemType_);
+
+ for (size_t i = 0; i < builder->MaxLength(); ++i) {
+ auto result = Flow_->GetValue(ctx);
+ if (result.IsFinish() || result.IsYield()) {
+ if (i == 0) {
+ return result.Release();
+ }
+ break;
+ }
+ builder->Add(result);
+ }
+
+ return builder->Build(true);
+ }
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow_);
+ }
+
+private:
+ IComputationNode* const Flow_;
+ TType* ItemType_;
};
class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> {
public:
TWideToBlocksWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
- TVector<NUdf::EDataSlot>&& slots)
+ TVector<TType*>&& types)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
- , Slots_(std::move(slots))
- , Width_(Slots_.size())
+ , Types_(std::move(types))
+ , Width_(Types_.size())
{
}
@@ -215,18 +318,18 @@ private:
size_t Rows_ = 0;
bool IsFinished_ = false;
- TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<NUdf::EDataSlot>& slots)
+ TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types)
: TComputationValue(memInfo)
- , Values_(slots.size())
- , ValuePointers_(slots.size())
+ , Values_(types.size())
+ , ValuePointers_(types.size())
{
- for (size_t i = 0; i < slots.size(); ++i) {
+ for (size_t i = 0; i < types.size(); ++i) {
ValuePointers_[i] = &Values_[i];
- Builders_.push_back(MakeBlockBuilder(ctx, slots[i]));
+ Builders_.push_back(MakeBlockBuilder(ctx, types[i]));
}
MaxLength_ = MaxBlockSizeInBytes;
- for (size_t i = 0; i < slots.size(); ++i) {
+ for (size_t i = 0; i < types.size(); ++i) {
MaxLength_ = Min(MaxLength_, Builders_[i]->MaxLength());
}
}
@@ -239,14 +342,14 @@ private:
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(ctx, Slots_);
+ state = ctx.HolderFactory.Create<TState>(ctx, Types_);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
private:
IComputationWideFlowNode* Flow_;
- const TVector<NUdf::EDataSlot> Slots_;
+ const TVector<TType*> Types_;
const size_t Width_;
};
@@ -263,144 +366,207 @@ template <typename T>
class TFixedSizeBlockReader : public TBlockReaderBase {
public:
NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
+ if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ return {};
+ }
+
return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]);
}
NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
+
return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
}
};
-class TBoolBlockReader : public TBlockReaderBase {
+class TTupleBlockReader : public TBlockReaderBase {
public:
+ TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory)
+ : Children_(std::move(children))
+ , HolderFactory_(holderFactory)
+ {}
+
NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
- return NUdf::TUnboxedValuePod(arrow::BitUtil::GetBit(data.GetValues<uint8_t>(1, 0), index + data.offset));
- }
+ if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ return {};
+ }
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
- return NUdf::TUnboxedValuePod(arrow::internal::checked_cast<const arrow::BooleanScalar&>(scalar).value);
- }
-};
+ NUdf::TUnboxedValue* items;
+ auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items);
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ items[i] = Children_[i]->Get(*data.child_data[i], index);
+ }
-std::unique_ptr<TBlockReaderBase> MakeBlockReader(NUdf::EDataSlot slot) {
- switch (slot) {
- case NUdf::EDataSlot::Int8:
- return std::make_unique<TFixedSizeBlockReader<i8>>();
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- return std::make_unique<TFixedSizeBlockReader<ui8>>();
- case NUdf::EDataSlot::Int16:
- return std::make_unique<TFixedSizeBlockReader<i16>>();
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return std::make_unique<TFixedSizeBlockReader<ui16>>();
- case NUdf::EDataSlot::Int32:
- return std::make_unique<TFixedSizeBlockReader<i32>>();
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return std::make_unique<TFixedSizeBlockReader<ui32>>();
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- return std::make_unique<TFixedSizeBlockReader<i64>>();
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return std::make_unique<TFixedSizeBlockReader<ui64>>();
- default:
- MKQL_ENSURE(false, "Unsupported data slot");
+ return result;
}
-}
-class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
-public:
- TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, NUdf::EDataSlot slot)
- : TMutableComputationNode(mutables)
- , Flow_(flow)
- , Slot_(slot)
- , StateIndex_(mutables.CurValueIndex++)
- {
- }
+ NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ if (!scalar.is_valid) {
+ return {};
+ }
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto& state = GetState(ctx);
+ const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
- if (state.Array_ == nullptr || state.Index_ == state.Array_->length) {
- auto result = Flow_->GetValue(ctx);
- if (result.IsFinish()) {
- return NUdf::TUnboxedValue::MakeFinish();
- }
- if (result.IsYield()) {
- return NUdf::TUnboxedValue::MakeYield();
- }
- state.Array_ = TArrowBlock::From(result).GetDatum().array();
- state.Index_ = 0;
+ NUdf::TUnboxedValue* items;
+ auto result = Cache_.NewArray(HolderFactory_, Children_.size(), items);
+ for (ui32 i = 0; i < Children_.size(); ++i) {
+ items[i] = Children_[i]->GetScalar(*structScalar.value[i]);
}
- const auto result = state.GetValue();
- ++state.Index_;
return result;
}
private:
- struct TState : public TComputationValue<TState> {
- using TComputationValue::TComputationValue;
-
- TState(TMemoryUsageInfo* memInfo, NUdf::EDataSlot slot)
- : TComputationValue(memInfo)
- {
- Reader_ = MakeBlockReader(slot);
- }
-
- NUdf::TUnboxedValuePod GetValue() const {
- const auto nullCount = Array_->GetNullCount();
+ TVector<std::unique_ptr<TBlockReaderBase>> Children_;
+ const THolderFactory& HolderFactory_;
+ TPlainContainerCache Cache_;
+};
- return nullCount == Array_->length || (nullCount > 0 && !HasValue())
- ? NUdf::TUnboxedValuePod()
- : DoGetValue();
- }
+std::unique_ptr<TBlockReaderBase> MakeBlockReader(TType* type, const THolderFactory& holderFactory) {
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
- private:
- NUdf::TUnboxedValuePod DoGetValue() const {
- return Reader_->Get(*Array_, Index_);
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ TVector<std::unique_ptr<TBlockReaderBase>> children;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ children.emplace_back(MakeBlockReader(tupleType->GetElementType(i), holderFactory));
}
- bool HasValue() const {
- return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0, 0), Index_ + Array_->offset);
+ return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory);
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
+ case NUdf::EDataSlot::Int8:
+ return std::make_unique<TFixedSizeBlockReader<i8>>();
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ return std::make_unique<TFixedSizeBlockReader<ui8>>();
+ case NUdf::EDataSlot::Int16:
+ return std::make_unique<TFixedSizeBlockReader<i16>>();
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return std::make_unique<TFixedSizeBlockReader<ui16>>();
+ case NUdf::EDataSlot::Int32:
+ return std::make_unique<TFixedSizeBlockReader<i32>>();
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return std::make_unique<TFixedSizeBlockReader<ui32>>();
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ return std::make_unique<TFixedSizeBlockReader<i64>>();
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return std::make_unique<TFixedSizeBlockReader<ui64>>();
+ case NUdf::EDataSlot::Float:
+ return std::make_unique<TFixedSizeBlockReader<float>>();
+ case NUdf::EDataSlot::Double:
+ return std::make_unique<TFixedSizeBlockReader<double>>();
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot");
}
-
- std::unique_ptr<TBlockReaderBase> Reader_;
- public:
- std::shared_ptr<arrow::ArrayData> Array_{nullptr};
- size_t Index_{0};
- };
-
-private:
- void RegisterDependencies() const final {
- this->DependsOn(Flow_);
}
- TState& GetState(TComputationContext& ctx) const {
- auto& result = ctx.MutableValues[StateIndex_];
- if (!result.HasValue()) {
- result = ctx.HolderFactory.Create<TState>(Slot_);
- }
- return *static_cast<TState*>(result.AsBoxed().Get());
- }
+ MKQL_ENSURE(false, "Unsupported type");
+}
-private:
- IComputationNode* const Flow_;
- const NUdf::EDataSlot Slot_;
- const ui32 StateIndex_;
+class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
+public:
+ TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow, TType* itemType)
+ : TMutableComputationNode(mutables)
+ , Flow_(flow)
+ , ItemType_(itemType)
+ , StateIndex_(mutables.CurValueIndex++)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto& state = GetState(ctx);
+
+ if (state.Array_ == nullptr || state.Index_ == state.Array_->length) {
+ auto result = Flow_->GetValue(ctx);
+ if (result.IsFinish()) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
+ if (result.IsYield()) {
+ return NUdf::TUnboxedValue::MakeYield();
+ }
+ state.Array_ = TArrowBlock::From(result).GetDatum().array();
+ state.Index_ = 0;
+ }
+
+ const auto result = state.GetValue();
+ ++state.Index_;
+ return result;
+ }
+
+private:
+ struct TState : public TComputationValue<TState> {
+ using TComputationValue::TComputationValue;
+
+ TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx)
+ : TComputationValue(memInfo)
+ {
+ Reader_ = MakeBlockReader(itemType, ctx.HolderFactory);
+ }
+
+ NUdf::TUnboxedValuePod GetValue() const {
+ const auto nullCount = Array_->GetNullCount();
+
+ return nullCount == Array_->length || (nullCount > 0 && !HasValue())
+ ? NUdf::TUnboxedValuePod()
+ : DoGetValue();
+ }
+
+ private:
+ NUdf::TUnboxedValuePod DoGetValue() const {
+ return Reader_->Get(*Array_, Index_);
+ }
+
+ bool HasValue() const {
+ return arrow::BitUtil::GetBit(Array_->GetValues<uint8_t>(0, 0), Index_ + Array_->offset);
+ }
+
+ std::unique_ptr<TBlockReaderBase> Reader_;
+ public:
+ std::shared_ptr<arrow::ArrayData> Array_{ nullptr };
+ size_t Index_{ 0 };
+ };
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(Flow_);
+ }
+
+ TState& GetState(TComputationContext& ctx) const {
+ auto& result = ctx.MutableValues[StateIndex_];
+ if (!result.HasValue()) {
+ result = ctx.HolderFactory.Create<TState>(ItemType_, ctx);
+ }
+ return *static_cast<TState*>(result.AsBoxed().Get());
+ }
+
+private:
+ IComputationNode* const Flow_;
+ TType* ItemType_;
+ const ui32 StateIndex_;
};
class TWideFromBlocksWrapper : public TStatefulWideFlowComputationNode<TWideFromBlocksWrapper> {
public:
TWideFromBlocksWrapper(TComputationMutables& mutables,
IComputationWideFlowNode* flow,
- TVector<NUdf::EDataSlot>&& slots)
+ TVector<TType*>&& types)
: TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Any)
, Flow_(flow)
- , Slots_(std::move(slots))
- , Width_(Slots_.size())
+ , Types_(std::move(types))
+ , Width_(Types_.size())
{
}
@@ -470,19 +636,19 @@ private:
size_t Count_ = 0;
size_t Index_ = 0;
- TState(TMemoryUsageInfo* memInfo, const TVector<NUdf::EDataSlot>& slots)
+ TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types, TComputationContext& ctx)
: TComputationValue(memInfo)
- , Values_(slots.size() + 1)
- , ValuePointers_(slots.size() + 1)
- , Arrays_(slots.size())
- , Scalars_(slots.size())
+ , Values_(types.size() + 1)
+ , ValuePointers_(types.size() + 1)
+ , Arrays_(types.size())
+ , Scalars_(types.size())
{
- for (size_t i = 0; i < slots.size() + 1; ++i) {
+ for (size_t i = 0; i < types.size() + 1; ++i) {
ValuePointers_[i] = &Values_[i];
}
- for (size_t i = 0; i < slots.size(); ++i) {
- Readers_.push_back(MakeBlockReader(slots[i]));
+ for (size_t i = 0; i < types.size(); ++i) {
+ Readers_.push_back(MakeBlockReader(types[i], ctx.HolderFactory));
}
}
};
@@ -494,14 +660,14 @@ private:
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Slots_);
+ state = ctx.HolderFactory.Create<TState>(Types_, ctx);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
private:
IComputationWideFlowNode* Flow_;
- const TVector<NUdf::EDataSlot> Slots_;
+ const TVector<TType*> Types_;
const size_t Width_;
};
@@ -510,94 +676,113 @@ public:
TAsScalarWrapper(TComputationMutables& mutables, IComputationNode* arg, TType* type)
: TMutableComputationNode(mutables)
, Arg_(arg)
+ , Type_(type)
{
bool isOptional;
- auto unpacked = UnpackOptionalData(type, isOptional);
- MKQL_ENSURE(ConvertArrowType(unpacked, isOptional, Type_), "Unsupported type of scalar");
- Slot_ = *unpacked->GetDataSlot();
+ std::shared_ptr<arrow::DataType> arrowType;
+ MKQL_ENSURE(ConvertArrowType(Type_, isOptional, arrowType), "Unsupported type of scalar");
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto value = Arg_->GetValue(ctx);
- arrow::Datum result;
+ arrow::Datum result = ConvertScalar(Type_, value);
+ return ctx.HolderFactory.CreateArrowBlock(std::move(result));
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(Arg_);
+ }
+
+ arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value) const {
if (!value) {
- result = arrow::MakeNullScalar(Type_);
- } else {
- switch (Slot_) {
+ bool isOptional;
+ std::shared_ptr<arrow::DataType> arrowType;
+ MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar");
+ return arrow::MakeNullScalar(arrowType);
+ }
+
+ if (type->IsOptional()) {
+ type = AS_TYPE(TOptionalType, type)->GetItemType();
+ }
+
+ if (type->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, type);
+ bool isOptional;
+ std::shared_ptr<arrow::DataType> arrowType;
+ MKQL_ENSURE(ConvertArrowType(type, isOptional, arrowType), "Unsupported type of scalar");
+
+ std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ arrowValue.emplace_back(ConvertScalar(tupleType->GetElementType(i), value.GetElement(i)).scalar());
+ }
+
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
+ }
+
+ if (type->IsData()) {
+ auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
+ switch (slot) {
case NUdf::EDataSlot::Int8:
- result = arrow::Datum(static_cast<int8_t>(value.Get<i8>()));
- break;
+ return arrow::Datum(static_cast<int8_t>(value.Get<i8>()));
case NUdf::EDataSlot::Bool:
case NUdf::EDataSlot::Uint8:
- result = arrow::Datum(static_cast<uint8_t>(value.Get<ui8>()));
- break;
+ return arrow::Datum(static_cast<uint8_t>(value.Get<ui8>()));
case NUdf::EDataSlot::Int16:
- result = arrow::Datum(static_cast<int16_t>(value.Get<i16>()));
- break;
+ return arrow::Datum(static_cast<int16_t>(value.Get<i16>()));
case NUdf::EDataSlot::Uint16:
case NUdf::EDataSlot::Date:
- result = arrow::Datum(static_cast<uint16_t>(value.Get<ui16>()));
- break;
+ return arrow::Datum(static_cast<uint16_t>(value.Get<ui16>()));
case NUdf::EDataSlot::Int32:
- result = arrow::Datum(static_cast<int32_t>(value.Get<i32>()));
- break;
+ return arrow::Datum(static_cast<int32_t>(value.Get<i32>()));
case NUdf::EDataSlot::Uint32:
case NUdf::EDataSlot::Datetime:
- result = arrow::Datum(static_cast<uint32_t>(value.Get<ui32>()));
- break;
+ return arrow::Datum(static_cast<uint32_t>(value.Get<ui32>()));
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Interval:
- result = arrow::Datum(static_cast<int64_t>(value.Get<i64>()));
- break;
+ return arrow::Datum(static_cast<int64_t>(value.Get<i64>()));
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Timestamp:
- result = arrow::Datum(static_cast<uint64_t>(value.Get<ui64>()));
- break;
+ return arrow::Datum(static_cast<uint64_t>(value.Get<ui64>()));
+ case NUdf::EDataSlot::Float:
+ return arrow::Datum(static_cast<float>(value.Get<float>()));
+ case NUdf::EDataSlot::Double:
+ return arrow::Datum(static_cast<double>(value.Get<double>()));
default:
MKQL_ENSURE(false, "Unsupported data slot");
}
}
- return ctx.HolderFactory.CreateArrowBlock(std::move(result));
- }
-
-private:
- void RegisterDependencies() const final {
- DependsOn(Arg_);
+ MKQL_ENSURE(false, "Unsupported type");
}
private:
IComputationNode* const Arg_;
- std::shared_ptr<arrow::DataType> Type_;
- NUdf::EDataSlot Slot_;
+ TType* Type_;
};
}
-IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
- const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- bool isOptional;
- const auto slot = *UnpackOptionalData(flowType->GetItemType(), isOptional)->GetDataSlot();
- return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), slot);
-}
+IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+ const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0), flowType->GetItemType());
+}
IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- TVector<NUdf::EDataSlot> slots;
+ TVector<TType*> items;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- bool isOptional;
- const auto slot = *UnpackOptionalData(tupleType->GetElementType(i), isOptional)->GetDataSlot();
- slots.push_back(slot);
+ items.push_back(tupleType->GetElementType(i));
}
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots));
+ return new TWideToBlocksWrapper(ctx.Mutables, wideFlow, std::move(items));
}
IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -605,9 +790,7 @@ IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFact
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
const auto blockType = AS_TYPE(TBlockType, flowType->GetItemType());
- bool isOptional;
- const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot();
- return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), slot);
+ return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0), blockType->GetItemType());
}
IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -616,18 +799,16 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
- TVector<NUdf::EDataSlot> slots;
+ TVector<TType*> items;
for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) {
const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i));
- bool isOptional;
- const auto slot = *UnpackOptionalData(blockType->GetItemType(), isOptional)->GetDataSlot();
- slots.push_back(slot);
+ items.push_back(blockType->GetItemType());
}
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(slots));
+ return new TWideFromBlocksWrapper(ctx.Mutables, wideFlow, std::move(items));
}
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp
index 379b59148e..b04bc43ec3 100644
--- a/ydb/library/yql/minikql/mkql_type_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_type_builder.cpp
@@ -1344,6 +1344,12 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
case NUdf::EDataSlot::Timestamp:
type = arrow::uint64();
return true;
+ case NUdf::EDataSlot::Float:
+ type = arrow::float32();
+ return true;
+ case NUdf::EDataSlot::Double:
+ type = arrow::float64();
+ return true;
default:
return false;
}
@@ -1351,6 +1357,23 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
bool ConvertArrowType(TType* itemType, bool& isOptional, std::shared_ptr<arrow::DataType>& type) {
auto unpacked = UnpackOptional(itemType, isOptional);
+ if (unpacked->IsTuple()) {
+ auto tupleType = AS_TYPE(TTupleType, unpacked);
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ std::shared_ptr<arrow::DataType> childType;
+ bool isChildOptional;
+ if (!ConvertArrowType(tupleType->GetElementType(i), isChildOptional, childType)) {
+ return false;
+ }
+
+ fields.emplace_back(std::make_shared<arrow::Field>("field" + ToString(i), childType, isChildOptional));
+ }
+
+ type = std::make_shared<arrow::StructType>(fields);
+ return true;
+ }
+
if (!unpacked->IsData()) {
return false;
}