aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-01-11 11:53:01 +0300
committeraneporada <aneporada@ydb.tech>2023-01-11 11:53:01 +0300
commitdae1c49d39a044bdfdd67ecd3c7347103625516e (patch)
tree8eaccb311fe7e9529521dbdc25fd2e41b2f94f06
parentb5af12df89ca027b41c4bc5ebd895cc4983e6620 (diff)
downloadydb-dae1c49d39a044bdfdd67ecd3c7347103625516e.tar.gz
Introduce TBlockItem: lightweight not-owning replacement for TUnboxedValue for BlockReader/BlockBuilder API
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp62
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_item.h136
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp188
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h18
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp60
-rw-r--r--ydb/library/yql/public/udf/udf_value_inl.h4
7 files changed, 340 insertions, 131 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
index e3174be55b..a94d01c8b0 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
@@ -76,6 +76,13 @@ public:
CurrLen++;
}
+ void Add(TBlockItem value) final {
+ Y_VERIFY(CurrLen < MaxLen);
+ DoAdd(value);
+ CurrLen++;
+ }
+
+
void AddDefault() {
Y_VERIFY(CurrLen < MaxLen);
DoAddDefault();
@@ -118,6 +125,7 @@ public:
}
protected:
virtual void DoAdd(NUdf::TUnboxedValuePod value) = 0;
+ virtual void DoAdd(TBlockItem value) = 0;
virtual void DoAddDefault() = 0;
virtual void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) = 0;
virtual TBlockArrayTree::Ptr DoBuildTree(bool finish) = 0;
@@ -200,6 +208,15 @@ public:
void DoAdd(NUdf::TUnboxedValuePod value) final {
if constexpr (Nullable) {
if (!value) {
+ return DoAdd(TBlockItem{});
+ }
+ }
+ DoAdd(TBlockItem(value.Get<T>()));
+ }
+
+ void DoAdd(TBlockItem value) final {
+ if constexpr (Nullable) {
+ if (!value) {
NullBuilder->UnsafeAppend(0);
DataBuilder->UnsafeAppend(T{});
return;
@@ -207,7 +224,7 @@ public:
NullBuilder->UnsafeAppend(1);
}
- DataBuilder->UnsafeAppend(value.Get<T>());
+ DataBuilder->UnsafeAppend(value.As<T>());
}
void DoAddDefault() final {
@@ -285,13 +302,23 @@ public:
void DoAdd(NUdf::TUnboxedValuePod value) final {
if constexpr (Nullable) {
if (!value) {
+ return DoAdd(TBlockItem{});
+ }
+ }
+
+ DoAdd(TBlockItem(value.AsStringRef()));
+ }
+
+ void DoAdd(TBlockItem value) final {
+ if constexpr (Nullable) {
+ if (!value) {
NullBuilder->UnsafeAppend(0);
AppendCurrentOffset();
return;
}
}
- const TStringBuf str = value.AsStringRef();
+ const std::string_view str = value.AsStringRef();
size_t currentLen = DataBuilder->Length();
// empty string can always be appended
@@ -311,6 +338,7 @@ public:
}
}
+
void DoAddDefault() final {
if constexpr (Nullable) {
NullBuilder->UnsafeAppend(1);
@@ -489,6 +517,25 @@ public:
}
}
+ void DoAdd(TBlockItem value) final {
+ TTupleType* tupleType = AS_TYPE(TTupleType, Type);
+ if constexpr (Nullable) {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ Children[i]->AddDefault();
+ }
+ return;
+ }
+ NullBuilder->UnsafeAppend(1);
+ }
+
+ auto elements = value.AsTuple();
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ Children[i]->Add(elements[i]);
+ }
+ }
+
void DoAddDefault() final {
TTupleType* tupleType = AS_TYPE(TTupleType, Type);
if constexpr (Nullable) {
@@ -577,6 +624,17 @@ public:
Inner->Add(value.GetOptionalValue());
}
+ void DoAdd(TBlockItem value) final {
+ if (!value) {
+ NullBuilder->UnsafeAppend(0);
+ Inner->AddDefault();
+ return;
+ }
+
+ NullBuilder->UnsafeAppend(1);
+ Inner->Add(value.GetOptionalValue());
+ }
+
void DoAddDefault() final {
NullBuilder->UnsafeAppend(1);
Inner->AddDefault();
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
index e1024a8c80..4769c3d59f 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h
@@ -1,5 +1,7 @@
#pragma once
+#include "mkql_block_item.h"
+
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
@@ -27,6 +29,7 @@ public:
virtual ~IBlockBuilder() = default;
virtual size_t MaxLength() const = 0;
virtual void Add(NUdf::TUnboxedValuePod value) = 0;
+ virtual void Add(TBlockItem value) = 0;
virtual void AddMany(const arrow::ArrayData& array, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) = 0;
virtual NUdf::TUnboxedValuePod Build(TComputationContext& ctx, bool finish) = 0;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h
new file mode 100644
index 0000000000..84924977f0
--- /dev/null
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_item.h
@@ -0,0 +1,136 @@
+#pragma once
+
+#include <ydb/library/yql/public/udf/udf_data_type.h>
+#include <ydb/library/yql/public/udf/udf_string_ref.h>
+#include <ydb/library/yql/public/udf/udf_type_size_check.h>
+
+namespace NKikimr::NMiniKQL {
+
+class TBlockItem {
+ enum class EMarkers : ui8 {
+ Empty = 0,
+ Present = 1,
+ };
+
+public:
+ TBlockItem() noexcept = default;
+ ~TBlockItem() noexcept = default;
+
+ TBlockItem(const TBlockItem& value) noexcept = default;
+ TBlockItem(TBlockItem&& value) noexcept = default;
+
+ TBlockItem& operator=(const TBlockItem& value) noexcept = default;
+ TBlockItem& operator=(TBlockItem&& value) noexcept = default;
+
+ template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>>
+ inline explicit TBlockItem(T value);
+
+ inline explicit TBlockItem(NYql::NUdf::TStringRef value) {
+ Raw.String.Value = value.Data();
+ Raw.String.Size = value.Size();
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present);
+ }
+
+ inline explicit TBlockItem(const TBlockItem* tupleItems) {
+ Raw.Tuple.Value = tupleItems;
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present);
+ }
+
+ template <typename T, typename = std::enable_if_t<NYql::NUdf::TPrimitiveDataType<T>::Result>>
+ inline T As() const;
+
+ inline const TBlockItem* AsTuple() const {
+ Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present);
+ return Raw.Tuple.Value;
+ }
+
+ inline NYql::NUdf::TStringRef AsStringRef() const {
+ Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present);
+ return NYql::NUdf::TStringRef(Raw.String.Value, Raw.String.Size);
+ }
+
+ inline TBlockItem MakeOptional() const
+ {
+ if (Raw.Simple.Meta)
+ return *this;
+
+ TBlockItem result(*this);
+ ++result.Raw.Simple.Count;
+ return result;
+ }
+
+ inline TBlockItem GetOptionalValue() const
+ {
+ if (Raw.Simple.Meta)
+ return *this;
+
+ Y_VERIFY_DEBUG(Raw.Simple.Count > 0U, "Can't get value from empty.");
+
+ TBlockItem result(*this);
+ --result.Raw.Simple.Count;
+ return result;
+ }
+
+ inline explicit operator bool() const { return bool(Raw); }
+private:
+ union TRaw {
+ ui64 Halfs[2] = {0, 0};
+ struct {
+ union {
+ #define FIELD(type) type type##_;
+ PRIMITIVE_VALUE_TYPES(FIELD);
+ #undef FIELD
+ ui64 Count;
+ };
+ union {
+ ui64 Pad;
+ struct {
+ ui8 Reserved[7];
+ ui8 Meta;
+ };
+ };
+ } Simple;
+
+ struct {
+ const char* Value;
+ ui32 Size;
+ } String;
+
+ struct {
+ // client should know tuple size
+ const TBlockItem* Value;
+ } Tuple;
+
+ EMarkers GetMarkers() const {
+ return static_cast<EMarkers>(Simple.Meta);
+ }
+
+ explicit operator bool() const { return Simple.Meta | Simple.Count; }
+ } Raw;
+};
+
+UDF_ASSERT_TYPE_SIZE(TBlockItem, 16);
+
+#define VALUE_GET(xType) \
+ template <> \
+ inline xType TBlockItem::As<xType>() const \
+ { \
+ Y_VERIFY_DEBUG(Raw.GetMarkers() == EMarkers::Present); \
+ return Raw.Simple.xType##_; \
+ }
+
+#define VALUE_CONSTR(xType) \
+ template <> \
+ inline TBlockItem::TBlockItem(xType value) \
+ { \
+ Raw.Simple.xType##_ = value; \
+ Raw.Simple.Meta = static_cast<ui8>(EMarkers::Present); \
+ }
+
+PRIMITIVE_VALUE_TYPES(VALUE_GET)
+PRIMITIVE_VALUE_TYPES(VALUE_CONSTR)
+
+#undef VALUE_GET
+#undef VALUE_CONSTR
+
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
index 5c6e4585f0..914ec7e6ca 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.cpp
@@ -12,172 +12,163 @@ namespace NMiniKQL {
namespace {
-class TBlockReaderBase : public IBlockReader {
-public:
- using Ptr = std::unique_ptr<TBlockReaderBase>;
-
- void Reset(const arrow::Datum& datum) final {
- Index = 0;
- ArrayValues.clear();
- if (datum.is_scalar()) {
- ScalarValue = GetScalar(*datum.scalar());
- } else {
- Y_VERIFY(datum.is_arraylike());
- ScalarValue = {};
- for (auto& arr : datum.chunks()) {
- ArrayValues.push_back(arr->data());
- }
- }
- }
-
- TMaybe<NUdf::TUnboxedValuePod> GetNextValue() final {
- if (ScalarValue.Defined()) {
- return ScalarValue;
- }
-
- TMaybe<NUdf::TUnboxedValuePod> result;
- while (!ArrayValues.empty()) {
- if (Index < ArrayValues.front()->length) {
- result = Get(*ArrayValues.front(), Index++);
- break;
- }
- ArrayValues.pop_front();
- Index = 0;
- }
- return result;
- }
-
- virtual NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) = 0;
- virtual NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) = 0;
-
-private:
- std::deque<std::shared_ptr<arrow::ArrayData>> ArrayValues;
- TMaybe<NUdf::TUnboxedValuePod> ScalarValue;
- size_t Index = 0;
-};
+inline bool IsNull(const arrow::ArrayData& data, size_t index) {
+ return data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset);
+}
template <typename T>
-class TFixedSizeBlockReader : public TBlockReaderBase {
+class TFixedSizeBlockReader : public IBlockReader {
public:
- NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
- if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ if (IsNull(data, index)) {
return {};
}
- return NUdf::TUnboxedValuePod(data.GetValues<T>(1)[index]);
+ return TBlockItem(data.GetValues<T>(1)[index]);
}
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
if (!scalar.is_valid) {
return {};
}
- return NUdf::TUnboxedValuePod(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
+ return TBlockItem(*static_cast<const T*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()));
+ }
+
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ Y_UNUSED(holderFactory);
+ return item ? NUdf::TUnboxedValuePod(item.As<T>()) : NUdf::TUnboxedValuePod{};
}
};
-class TStringBlockReader : public TBlockReaderBase {
+template<typename TStringType>
+class TStringBlockReader : public IBlockReader {
public:
- NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
+ using TOffset = typename TStringType::offset_type;
+
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
Y_VERIFY_DEBUG(data.buffers.size() == 3);
- if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ if (IsNull(data, index)) {
return {};
}
- arrow::util::string_view result;
- if (data.type->id() == arrow::Type::BINARY) {
- arrow::BinaryArray arr(std::make_shared<arrow::ArrayData>(data));
- result = arr.GetView(index);
- } else {
- Y_VERIFY(data.type->id() == arrow::Type::STRING);
- arrow::StringArray arr(std::make_shared<arrow::ArrayData>(data));
- result = arr.GetView(index);
- }
+ const TOffset* offsets = data.GetValues<TOffset>(1);
+ const char* strData = data.GetValues<char>(2, 0);
- return MakeString(NUdf::TStringRef(result.data(), result.size()));
+ std::string_view str(strData + offsets[index], offsets[index + 1] - offsets[index]);
+ return TBlockItem(str);
}
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
if (!scalar.is_valid) {
return {};
}
auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
- return MakeString(NUdf::TStringRef(reinterpret_cast<const char*>(buffer->data()), buffer->size()));
+ std::string_view str(reinterpret_cast<const char*>(buffer->data()), buffer->size());
+ return TBlockItem(str);
+ }
+
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ Y_UNUSED(holderFactory);
+ if (!item) {
+ return {};
+ }
+ return MakeString(item.AsStringRef());
}
};
-class TTupleBlockReader : public TBlockReaderBase {
+class TTupleBlockReader : public IBlockReader {
public:
- TTupleBlockReader(TVector<std::unique_ptr<TBlockReaderBase>>&& children, const THolderFactory& holderFactory)
+ TTupleBlockReader(TVector<std::unique_ptr<IBlockReader>>&& children)
: Children(std::move(children))
- , HolderFactory(holderFactory)
+ , Items(Children.size())
{}
- NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
- if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ if (IsNull(data, index)) {
return {};
}
- NUdf::TUnboxedValue* items;
- auto result = Cache.NewArray(HolderFactory, Children.size(), items);
for (ui32 i = 0; i < Children.size(); ++i) {
- items[i] = Children[i]->Get(*data.child_data[i], index);
+ Items[i] = Children[i]->GetItem(*data.child_data[i], index);
}
- return result;
+ return TBlockItem(Items.data());
}
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
if (!scalar.is_valid) {
return {};
}
const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
- NUdf::TUnboxedValue* items;
- auto result = Cache.NewArray(HolderFactory, Children.size(), items);
for (ui32 i = 0; i < Children.size(); ++i) {
- items[i] = Children[i]->GetScalar(*structScalar.value[i]);
+ Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]);
+ }
+
+ return TBlockItem(Items.data());
+ }
+
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ if (!item) {
+ return {};
+ }
+
+ NUdf::TUnboxedValue* values;
+ auto result = Cache.NewArray(holderFactory, Children.size(), values);
+ const TBlockItem* childItems = item.AsTuple();
+ for (ui32 i = 0; i < Children.size(); ++i) {
+ values[i] = Children[i]->MakeValue(childItems[i], holderFactory);
}
return result;
}
private:
- TVector<std::unique_ptr<TBlockReaderBase>> Children;
- const THolderFactory& HolderFactory;
- TPlainContainerCache Cache;
+ const TVector<std::unique_ptr<IBlockReader>> Children;
+ TVector<TBlockItem> Items;
+ mutable TPlainContainerCache Cache;
};
-class TExternalOptionalBlockReader : public TBlockReaderBase {
+class TExternalOptionalBlockReader : public IBlockReader {
public:
- TExternalOptionalBlockReader(std::unique_ptr<TBlockReaderBase>&& inner)
+ TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
: Inner(std::move(inner))
{}
- NUdf::TUnboxedValuePod Get(const arrow::ArrayData& data, size_t index) final {
- if (data.GetNullCount() > 0 && !arrow::BitUtil::GetBit(data.GetValues<uint8_t>(0, 0), index + data.offset)) {
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final {
+ if (IsNull(data, index)) {
return {};
}
- return Inner->Get(*data.child_data[0], index).MakeOptional();
+ return Inner->GetItem(*data.child_data[0], index).MakeOptional();
}
- NUdf::TUnboxedValuePod GetScalar(const arrow::Scalar& scalar) final {
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) final {
if (!scalar.is_valid) {
return {};
}
const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(scalar);
- return Inner->GetScalar(*structScalar.value[0]).MakeOptional();
+ return Inner->GetScalarItem(*structScalar.value[0]).MakeOptional();
+ }
+
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ if (!item) {
+ return {};
+ }
+ return Inner->MakeValue(item.GetOptionalValue(), holderFactory).MakeOptional();
}
private:
- std::unique_ptr<TBlockReaderBase> Inner;
+ const std::unique_ptr<IBlockReader> Inner;
};
-std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolderFactory& holderFactory) {
+} // namespace
+
+std::unique_ptr<IBlockReader> MakeBlockReader(TType* type) {
TType* unpacked = type;
if (type->IsOptional()) {
unpacked = AS_TYPE(TOptionalType, type)->GetItemType();
@@ -194,7 +185,7 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder
currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
} while (currentType->IsOptional());
- std::unique_ptr<TBlockReaderBase> reader = MakeBlockReaderBase(previousType, holderFactory);
+ std::unique_ptr<IBlockReader> reader = MakeBlockReader(previousType);
for (ui32 i = 1; i < nestLevel; ++i) {
reader = std::make_unique<TExternalOptionalBlockReader>(std::move(reader));
}
@@ -206,12 +197,12 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder
if (type->IsTuple()) {
auto tupleType = AS_TYPE(TTupleType, type);
- TVector<std::unique_ptr<TBlockReaderBase>> children;
+ TVector<std::unique_ptr<IBlockReader>> children;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- children.emplace_back(MakeBlockReaderBase(tupleType->GetElementType(i), holderFactory));
+ children.emplace_back(MakeBlockReader(tupleType->GetElementType(i)));
}
- return std::make_unique<TTupleBlockReader>(std::move(children), holderFactory);
+ return std::make_unique<TTupleBlockReader>(std::move(children));
}
if (type->IsData()) {
@@ -243,8 +234,9 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder
case NUdf::EDataSlot::Double:
return std::make_unique<TFixedSizeBlockReader<double>>();
case NUdf::EDataSlot::String:
+ return std::make_unique<TStringBlockReader<arrow::BinaryType>>();
case NUdf::EDataSlot::Utf8:
- return std::make_unique<TStringBlockReader>();
+ return std::make_unique<TStringBlockReader<arrow::StringType>>();
default:
MKQL_ENSURE(false, "Unsupported data slot");
}
@@ -253,13 +245,5 @@ std::unique_ptr<TBlockReaderBase> MakeBlockReaderBase(TType* type, const THolder
MKQL_ENSURE(false, "Unsupported type");
}
-
-} // namespace
-
-std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory) {
- return MakeBlockReaderBase(type, holderFactory);
-}
-
-
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
index f4eb297770..4e144d78a5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h
@@ -1,22 +1,24 @@
#pragma once
+#include "mkql_block_item.h"
+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <arrow/datum.h>
-namespace NKikimr {
-namespace NMiniKQL {
+namespace NKikimr::NMiniKQL {
-class IBlockReader {
+class IBlockReader : private TNonCopyable {
public:
virtual ~IBlockReader() = default;
- virtual void Reset(const arrow::Datum& datum) = 0;
- // for scalars will continuously return same value
- virtual TMaybe<NUdf::TUnboxedValuePod> GetNextValue() = 0;
+ // result will reference to Array/Scalar internals and will be valid until next call to GetItem/GetScalarItem
+ virtual TBlockItem GetItem(const arrow::ArrayData& data, size_t index) = 0;
+ virtual TBlockItem GetScalarItem(const arrow::Scalar& scalar) = 0;
+
+ virtual NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const = 0;
};
-std::unique_ptr<IBlockReader> MakeBlockReader(TType* type, const THolderFactory& holderFactory);
+std::unique_ptr<IBlockReader> MakeBlockReader(TType* type);
}
-}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 488dd2af04..789cb67df2 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -169,7 +169,7 @@ public:
auto& state = GetState(ctx);
for (;;) {
- auto item = state.GetValue();
+ auto item = state.GetValue(ctx);
if (item) {
return *item;
}
@@ -190,23 +190,43 @@ private:
struct TState : public TComputationValue<TState> {
using TComputationValue::TComputationValue;
- TState(TMemoryUsageInfo* memInfo, TType* itemType, TComputationContext& ctx)
+ TState(TMemoryUsageInfo* memInfo, TType* itemType)
: TComputationValue(memInfo)
- , Reader_(MakeBlockReader(itemType, ctx.HolderFactory))
+ , Reader_(MakeBlockReader(itemType))
{
}
- TMaybe<NUdf::TUnboxedValuePod> GetValue() const {
- return Reader_->GetNextValue();
+ TMaybe<NUdf::TUnboxedValuePod> GetValue(TComputationContext& ctx) {
+ for (;;) {
+ if (Arrays_.empty()) {
+ return {};
+ }
+ if (Index_ < Arrays_.front()->length) {
+ break;
+ }
+ Index_ = 0;
+ Arrays_.pop_front();
+ }
+ return Reader_->MakeValue(Reader_->GetItem(*Arrays_.front(), Index_++), ctx.HolderFactory);
}
- void Reset(const arrow::Datum& datum) const {
+ void Reset(const arrow::Datum& datum) {
MKQL_ENSURE(datum.is_arraylike(), "Expecting array as FromBlocks argument");
- Reader_->Reset(datum);
+ MKQL_ENSURE(Arrays_.empty(), "Not all input is processed");
+ if (datum.is_array()) {
+ Arrays_.push_back(datum.array());
+ } else {
+ for (auto& chunk : datum.chunks()) {
+ Arrays_.push_back(chunk->data());
+ }
+ }
+ Index_ = 0;
}
private:
const std::unique_ptr<IBlockReader> Reader_;
+ TDeque<std::shared_ptr<arrow::ArrayData>> Arrays_;
+ size_t Index_ = 0;
};
private:
@@ -217,7 +237,7 @@ private:
TState& GetState(TComputationContext& ctx) const {
auto& result = ctx.MutableValues[StateIndex_];
if (!result.HasValue()) {
- result = ctx.HolderFactory.Create<TState>(ItemType_, ctx);
+ result = ctx.HolderFactory.Create<TState>(ItemType_);
}
return *static_cast<TState*>(result.AsBoxed().Get());
}
@@ -252,11 +272,6 @@ public:
}
s.Index_ = 0;
- for (size_t i = 0; i < Width_; ++i) {
- const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum();
- s.Readers_[i]->Reset(datum);
- }
-
s.Count_ = TArrowBlock::From(s.Values_[Width_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
}
@@ -265,9 +280,16 @@ public:
continue;
}
- auto result = s.Readers_[i]->GetNextValue();
- Y_VERIFY(result);
- *(output[i]) = *result;
+ const auto& datum = TArrowBlock::From(s.Values_[i]).GetDatum();
+ TBlockItem item;
+ if (datum.is_scalar()) {
+ item = s.Readers_[i]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ item = s.Readers_[i]->GetItem(*datum.array(), s.Index_);
+ }
+
+ *(output[i]) = s.Readers_[i]->MakeValue(item, ctx.HolderFactory);
}
++s.Index_;
@@ -282,7 +304,7 @@ private:
size_t Count_ = 0;
size_t Index_ = 0;
- TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types, TComputationContext& ctx)
+ TState(TMemoryUsageInfo* memInfo, const TVector<TType*>& types)
: TComputationValue(memInfo)
, Values_(types.size() + 1)
, ValuePointers_(types.size() + 1)
@@ -292,7 +314,7 @@ private:
}
for (size_t i = 0; i < types.size(); ++i) {
- Readers_.push_back(MakeBlockReader(types[i], ctx.HolderFactory));
+ Readers_.push_back(MakeBlockReader(types[i]));
}
}
};
@@ -304,7 +326,7 @@ private:
TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Types_, ctx);
+ state = ctx.HolderFactory.Create<TState>(Types_);
}
return *static_cast<TState*>(state.AsBoxed().Get());
}
diff --git a/ydb/library/yql/public/udf/udf_value_inl.h b/ydb/library/yql/public/udf/udf_value_inl.h
index bbe323cbd9..2bcd3a98bc 100644
--- a/ydb/library/yql/public/udf/udf_value_inl.h
+++ b/ydb/library/yql/public/udf/udf_value_inl.h
@@ -675,6 +675,10 @@ PRIMITIVE_VALUE_TYPES(VALUE_GET)
PRIMITIVE_VALUE_TYPES(VALUE_GET_DEF)
PRIMITIVE_VALUE_TYPES(VALUE_CONSTR)
+#undef VALUE_GET
+#undef VALUE_GET_DEF
+#undef VALUE_CONSTR
+
template <>
inline bool TUnboxedValuePod::Get<bool>() const
{