aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-07-20 17:59:54 +0300
committeraneporada <aneporada@ydb.tech>2023-07-20 17:59:54 +0300
commit27820316f1e6f037bc7dcdd28015a4df8a0b65d5 (patch)
treeab9e31f2acd528d56573207bd553e449be0fac76
parent5b31f78da1eea0701abbcd8fdf35928c79e5d04e (diff)
downloadydb-27820316f1e6f037bc7dcdd28015a4df8a0b65d5.tar.gz
Support arrow blocks in transport packer
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.cpp14
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp23
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp10
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h11
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_transport.cpp549
-rw-r--r--ydb/library/yql/minikql/computation/mkql_block_transport.h38
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp213
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h27
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h12
-rw-r--r--ydb/library/yql/minikql/computation/ya.make.inc2
15 files changed, 867 insertions, 42 deletions
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
index 71cb32ff935..d96a7fee6d9 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp
@@ -26,12 +26,18 @@ T ReadNumber(TStringBuf& src) {
}
-void TDqSerializedBatch::SetPayload(const NKikimr::NMiniKQL::TPagedBuffer::TPtr& buffer) {
+void TDqSerializedBatch::SetPayload(TRope&& payload) {
+ Proto.ClearRaw();
if (IsOOBTransport((NDqProto::EDataTransportVersion)Proto.GetTransportVersion())) {
- Payload = NKikimr::NMiniKQL::TPagedBuffer::AsRope(std::move(buffer));
+ Payload = std::move(payload);
} else {
- Proto.MutableRaw()->reserve(buffer->Size());
- buffer->CopyTo(*Proto.MutableRaw());
+ Payload.clear();
+ Proto.MutableRaw()->reserve(payload.size());
+ while (!payload.IsEmpty()) {
+ auto it = payload.Begin();
+ Proto.MutableRaw()->append(it.ContiguousData(), it.ContiguousSize());
+ payload.Erase(it, it + it.ContiguousSize());
+ }
}
}
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h
index 17379060eec..7591f2b6685 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.h
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.h
@@ -38,7 +38,7 @@ struct TDqSerializedBatch {
return oob;
}
- void SetPayload(const NKikimr::NMiniKQL::TPagedBuffer::TPtr& buffer);
+ void SetPayload(TRope&& payload);
TRope PullPayload() {
TRope result;
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index f925b261155..f426885ce85 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -127,8 +127,8 @@ public:
if (packerSize >= MaxChunkBytes) {
Data.emplace_back();
Data.back().Buffer = Packer.Finish();
- BasicStats.Bytes += Data.back().Buffer->Size();
- PackedDataSize += Data.back().Buffer->Size();
+ BasicStats.Bytes += Data.back().Buffer.size();
+ PackedDataSize += Data.back().Buffer.size();
PackedRowCount += ChunkRowCount;
Data.back().RowCount = ChunkRowCount;
ChunkRowCount = 0;
@@ -137,13 +137,13 @@ public:
while (Storage && PackedDataSize && PackedDataSize + packerSize > MaxStoredBytes) {
auto& head = Data.front();
- size_t bufSize = head.Buffer->Size();
+ size_t bufSize = head.Buffer.size();
YQL_ENSURE(PackedDataSize >= bufSize);
TDqSerializedBatch data;
data.Proto.SetTransportVersion(TransportVersion);
data.Proto.SetRows(head.RowCount);
- data.SetPayload(head.Buffer);
+ data.SetPayload(std::move(head.Buffer));
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));
PackedDataSize -= bufSize;
@@ -203,15 +203,14 @@ public:
SpilledRowCount -= data.RowCount();
} else if (!Data.empty()) {
auto& packed = Data.front();
- data.Proto.SetRows(packed.RowCount);
- data.SetPayload(packed.Buffer);
PackedRowCount -= packed.RowCount;
- PackedDataSize -= packed.Buffer->Size();
+ PackedDataSize -= packed.Buffer.size();
+ data.Proto.SetRows(packed.RowCount);
+ data.SetPayload(std::move(packed.Buffer));
Data.pop_front();
} else {
data.Proto.SetRows(ChunkRowCount);
- auto buffer = Packer.Finish();
- data.SetPayload(buffer);
+ data.SetPayload(Packer.Finish());
ChunkRowCount = 0;
}
@@ -265,8 +264,8 @@ public:
if (ChunkRowCount) {
Data.emplace_back();
Data.back().Buffer = Packer.Finish();
- BasicStats.Bytes += Data.back().Buffer->Size();
- PackedDataSize += Data.back().Buffer->Size();
+ BasicStats.Bytes += Data.back().Buffer.size();
+ PackedDataSize += Data.back().Buffer.size();
PackedRowCount += ChunkRowCount;
Data.back().RowCount = ChunkRowCount;
ChunkRowCount = 0;
@@ -350,7 +349,7 @@ private:
TLogFunc LogFunc;
struct TSerializedBatch {
- TPagedBuffer::TPtr Buffer;
+ TRope Buffer;
ui64 RowCount = 0;
};
std::deque<TSerializedBatch> Data;
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index f2fcf48dc65..f22f0210aff 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -19,7 +19,7 @@ using namespace NYql;
namespace {
TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const TType* type, const NUdf::TUnboxedValuePod& value) {
- TPagedBuffer::TPtr packResult;
+ TRope packResult;
switch (version) {
case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED:
version = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0;
@@ -43,12 +43,12 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const
TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
result.Proto.SetRows(1);
- result.SetPayload(packResult);
+ result.SetPayload(std::move(packResult));
return result;
}
template<bool Fast>
-TPagedBuffer::TPtr DoSerializeBuffer(const TType* type, const TUnboxedValueBatch& buffer) {
+TRope DoSerializeBuffer(const TType* type, const TUnboxedValueBatch& buffer) {
using TPacker = TValuePackerTransport<Fast>;
TPacker packer(/* stable */ false, type);
@@ -65,7 +65,7 @@ TPagedBuffer::TPtr DoSerializeBuffer(const TType* type, const TUnboxedValueBatch
}
TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, const TType* type, const TUnboxedValueBatch& buffer) {
- TPagedBuffer::TPtr packResult;
+ TRope packResult;
switch (version) {
case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED:
version = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0;
@@ -87,7 +87,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons
TDqSerializedBatch result;
result.Proto.SetTransportVersion(version);
result.Proto.SetRows(buffer.RowCount());
- result.SetPayload(packResult);
+ result.SetPayload(std::move(packResult));
return result;
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index 656ca49fb1e..78764b746b6 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -80,19 +80,10 @@ private:
++first;
++count;
}
- const auto& packed = packer.Finish();
TDqSerializedBatch result;
result.Proto.SetTransportVersion(TransportVersion);
result.Proto.SetRows(count);
- if (TransportVersion == NDqProto::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 ||
- TransportVersion == NDqProto::DATA_TRANSPORT_OOB_PICKLE_1_0)
- {
- result.Payload = NKikimr::NMiniKQL::TPagedBuffer::AsRope(packed);
-
- } else {
- result.Proto.MutableRaw()->reserve(packed->Size());
- packed->CopyTo(*result.Proto.MutableRaw());
- }
+ result.SetPayload(packer.Finish());
return result;
}
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
index bbea34f1bd6..853b83dac49 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC
Target-X86-AsmParser
lib-Transforms-IPO
libs-apache-arrow
+ cpp-actors-util
library-cpp-enumbitset
library-cpp-packedtypes
library-cpp-random_provider
@@ -38,6 +39,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
index 2b98eb3a366..031693fb3a8 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt
@@ -24,6 +24,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC
Target-X86-AsmParser
lib-Transforms-IPO
libs-apache-arrow
+ cpp-actors-util
library-cpp-enumbitset
library-cpp-packedtypes
library-cpp-random_provider
@@ -39,6 +40,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
index 2b98eb3a366..031693fb3a8 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt
@@ -24,6 +24,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC
Target-X86-AsmParser
lib-Transforms-IPO
libs-apache-arrow
+ cpp-actors-util
library-cpp-enumbitset
library-cpp-packedtypes
library-cpp-random_provider
@@ -39,6 +40,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
index bbea34f1bd6..853b83dac49 100644
--- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC
Target-X86-AsmParser
lib-Transforms-IPO
libs-apache-arrow
+ cpp-actors-util
library-cpp-enumbitset
library-cpp-packedtypes
library-cpp-random_provider
@@ -38,6 +39,7 @@ target_sources(minikql-computation-llvm PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
new file mode 100644
index 00000000000..2ccc5d3a82d
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp
@@ -0,0 +1,549 @@
+#include "mkql_block_transport.h"
+#include "mkql_block_builder.h"
+
+#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/public/udf/arrow/block_reader.h>
+#include <ydb/library/yql/utils/rope_over_buffer.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+namespace NKikimr::NMiniKQL {
+
+namespace {
+
+class TOwnedArrowBuffer : public arrow::Buffer {
+public:
+ TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner)
+ : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.Data()), span.Size())
+ , Owner_(owner)
+ {
+ }
+private:
+ const std::shared_ptr<const void> Owner_;
+};
+
+std::shared_ptr<arrow::Buffer> MakeEmptyBuffer() {
+ return std::make_shared<arrow::Buffer>(nullptr, 0);
+}
+
+std::shared_ptr<arrow::Buffer> MakeZeroBuffer(size_t byteLen) {
+ constexpr size_t NullWordCount = (MaxBlockSizeInBytes + sizeof(ui8) - 1) / sizeof(ui8);
+ static ui64 nulls[NullWordCount] = { 0 };
+ if (byteLen <= sizeof(nulls)) {
+ return std::make_shared<arrow::Buffer>(reinterpret_cast<const ui8*>(nulls), byteLen);
+ }
+
+ size_t wordCount = (byteLen + sizeof(ui8) - 1) / sizeof(ui8);
+ std::shared_ptr<ui64[]> buf(new ui64[wordCount]);
+ std::fill(buf.get(), buf.get() + wordCount, 0);
+ return std::make_shared<TOwnedArrowBuffer>(TContiguousSpan{ reinterpret_cast<const char*>(buf.get()), byteLen }, buf);
+}
+
+std::shared_ptr<arrow::Buffer> MakeZeroBitmap(size_t bitCount) {
+ // align up 8 byte boundary
+ size_t byteCount = ((bitCount + 63u) & ~size_t(63u)) >> 3;
+ return MakeZeroBuffer(byteCount);
+}
+
+bool NeedStoreBitmap(const arrow::ArrayData& data) {
+ size_t nullCount = data.GetNullCount();
+ return nullCount != 0 && nullCount != data.length;
+}
+
+void StoreNullsSizes(const arrow::ArrayData& data, ui64 desiredOffset, const IBlockSerializer::TMetadataSink& metaSink) {
+ metaSink(data.GetNullCount());
+ if (!NeedStoreBitmap(data)) {
+ metaSink(0);
+ return;
+ }
+
+ size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
+ metaSink(nullBytes);
+}
+
+void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& nullsCount, TMaybe<ui64>& nullsSize) {
+ YQL_ENSURE(!nullsCount.Defined() && !nullsSize.Defined(), "Attempt to load null sizes twice (most likely LoadArray() is not called)");
+ nullsCount = metaSource();
+ nullsSize = metaSource();
+}
+
+void StoreNulls(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) {
+ if (!NeedStoreBitmap(data)) {
+ return;
+ }
+ size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
+ YQL_ENSURE(desiredOffset <= data.offset);
+ YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
+ const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
+ dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes));
+}
+
+void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
+ YQL_ENSURE(!result.Defined(), "Attempt to load buffer size twice (most likely LoadArray() is not called)");
+ result = metaSource();
+}
+
+std::shared_ptr<arrow::Buffer> LoadBuffer(TRope& source, TMaybe<ui64> size) {
+ YQL_ENSURE(size.Defined(), "Buffer size is not loaded");
+ if (!*size) {
+ return std::make_shared<arrow::Buffer>(nullptr, 0);
+ }
+
+ YQL_ENSURE(source.size() >= *size, "Premature end of data");
+ auto owner = std::make_shared<TRope>(source.Begin(), source.Begin() + *size);
+ source.EraseFront(*size);
+
+ owner->Compact();
+ return std::make_shared<TOwnedArrowBuffer>(owner->GetContiguousSpan(), owner);
+}
+
+std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TRope& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) {
+ YQL_ENSURE(nullCount.Defined(), "Bitmap null count is not loaded");
+ YQL_ENSURE(bitmapSize.Defined(), "Bitmap size is not loaded");
+ if (*nullCount == 0) {
+ YQL_ENSURE(!*bitmapSize);
+ return {};
+ }
+ YQL_ENSURE(*bitmapSize);
+ return LoadBuffer(source, bitmapSize);
+}
+
+class TBlockDeserializerBase : public IBlockDeserializer {
+public:
+ TBlockDeserializerBase() = default;
+
+ virtual void SetArrowType(const std::shared_ptr<arrow::DataType>& type) {
+ ArrowType_ = type;
+ }
+
+ void LoadMetadata(const TMetadataSource& metaSource) final {
+ if (IsNullable()) {
+ LoadNullsSizes(metaSource, NullsCount_, NullsSize_);
+ }
+ DoLoadMetadata(metaSource);
+ }
+
+ virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) final {
+ YQL_ENSURE(blockLen > 0, "Should be handled earlier");
+ std::shared_ptr<arrow::Buffer> nulls;
+ i64 nullsCount = 0;
+ if (IsNullable()) {
+ YQL_ENSURE(NullsCount_.Defined() && NullsSize_.Defined(), "Nulls metadata should be loaded");
+ if (*NullsCount_ != 0) {
+ if (*NullsSize_ == 0) {
+ return MakeDefaultValue(blockLen, offset);
+ }
+ nulls = LoadNullsBitmap(src, NullsCount_, NullsSize_);
+ nullsCount = *NullsCount_;
+ }
+ }
+ auto result = DoLoadArray(src, nulls, nullsCount, blockLen, offset);
+ ResetMetadata();
+ return result;
+ }
+
+ void ResetMetadata() {
+ NullsCount_ = NullsSize_ = {};
+ DoResetMetadata();
+ }
+
+ std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) {
+ std::shared_ptr<arrow::Buffer> nulls;
+ i64 nullsCount = 0;
+ if (IsNullable()) {
+ nulls = MakeZeroBitmap(blockLen + offset);
+ nullsCount = blockLen;
+ }
+ return DoMakeDefaultValue(nulls, nullsCount, blockLen, offset);
+ }
+
+protected:
+ virtual void DoLoadMetadata(const TMetadataSource& metaSource) = 0;
+ virtual void DoResetMetadata() = 0;
+ virtual bool IsNullable() const = 0;
+ virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0;
+ virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0;
+
+ std::shared_ptr<arrow::DataType> ArrowType_;
+ TMaybe<ui64> NullsCount_;
+ TMaybe<ui64> NullsSize_;
+};
+
+template<size_t ObjectSize, bool Nullable>
+class TFixedSizeBlockSerializer final : public IBlockSerializer {
+public:
+ TFixedSizeBlockSerializer() = default;
+private:
+ size_t ArrayMetadataCount() const final {
+ return Nullable ? 3 : 1;
+ }
+
+ void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ YQL_ENSURE(offset <= data.offset);
+ if constexpr (Nullable) {
+ StoreNullsSizes(data, offset, metaSink);
+ if (data.GetNullCount() == data.length) {
+ metaSink(0);
+ return;
+ }
+ }
+ size_t dataBytes = ((size_t)data.length + offset) * ObjectSize;
+ metaSink(dataBytes);
+ }
+
+ void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const final {
+ YQL_ENSURE(offset <= data.offset);
+ if constexpr (Nullable) {
+ StoreNulls(data, offset, dst);
+ if (data.GetNullCount() == data.length) {
+ return;
+ }
+ }
+
+ const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - offset) * ObjectSize;
+ size_t dataBytes = ((size_t)data.length + offset) * ObjectSize;
+ dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes));
+ }
+};
+
+template<size_t ObjectSize, bool Nullable>
+class TFixedSizeBlockDeserializer final : public TBlockDeserializerBase {
+public:
+ TFixedSizeBlockDeserializer() = default;
+private:
+ void DoLoadMetadata(const TMetadataSource& metaSource) final {
+ LoadBufferSize(metaSource, DataSize_);
+ }
+
+ bool IsNullable() const final {
+ return Nullable;
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ auto data = MakeZeroBuffer((blockLen + offset) * ObjectSize);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ auto data = LoadBuffer(src, DataSize_);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset);
+ }
+
+ void DoResetMetadata() final {
+ DataSize_ = {};
+ }
+
+ TMaybe<ui64> DataSize_;
+};
+
+
+template<typename TStringType, bool Nullable>
+class TStringBlockSerializer final : public IBlockSerializer {
+ using TOffset = typename TStringType::offset_type;
+public:
+ TStringBlockSerializer() = default;
+private:
+ size_t ArrayMetadataCount() const final {
+ return Nullable ? 4 : 2;
+ }
+
+ void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ YQL_ENSURE(offset <= data.offset);
+ if constexpr (Nullable) {
+ StoreNullsSizes(data, offset, metaSink);
+ if (data.GetNullCount() == data.length) {
+ metaSink(0);
+ metaSink(0);
+ return;
+ }
+ }
+
+ size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset);
+ metaSink(offsetsSize);
+ metaSink(data.buffers[2]->size());
+ }
+
+ void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const {
+ YQL_ENSURE(offset <= data.offset);
+ if constexpr (Nullable) {
+ StoreNulls(data, offset, dst);
+ if (data.GetNullCount() == data.length) {
+ return;
+ }
+ }
+
+ const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - offset);
+ size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset);
+ dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize));
+
+ const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
+ size_t mainSize = data.buffers[2]->size();
+ dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize));
+ }
+};
+
+template<typename TStringType, bool Nullable>
+class TStringBlockDeserializer final : public TBlockDeserializerBase {
+ using TOffset = typename TStringType::offset_type;
+public:
+ TStringBlockDeserializer() = default;
+private:
+ void DoLoadMetadata(const TMetadataSource& metaSource) final {
+ LoadBufferSize(metaSource, OffsetsSize_);
+ LoadBufferSize(metaSource, DataSize_);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ auto offsets = MakeZeroBuffer((blockLen + 1 + offset) * sizeof(TOffset));
+ auto data = MakeEmptyBuffer();
+ return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ auto offsets = LoadBuffer(src, OffsetsSize_);
+ auto data = LoadBuffer(src, DataSize_);
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset);
+ }
+
+ bool IsNullable() const final {
+ return Nullable;
+ }
+
+ void DoResetMetadata() final {
+ OffsetsSize_ = DataSize_ = {};
+ }
+
+ TMaybe<ui64> OffsetsSize_;
+ TMaybe<ui64> DataSize_;
+};
+
+class TExtOptionalBlockSerializer final : public IBlockSerializer {
+public:
+ explicit TExtOptionalBlockSerializer(std::unique_ptr<IBlockSerializer>&& inner)
+ : Inner_(std::move(inner))
+ {
+ }
+private:
+ size_t ArrayMetadataCount() const final {
+ return 2 + Inner_->ArrayMetadataCount();
+ }
+
+ void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ YQL_ENSURE(offset <= data.offset);
+ StoreNullsSizes(data, offset, metaSink);
+ if (data.GetNullCount() == data.length) {
+ auto innerCount = Inner_->ArrayMetadataCount();
+ for (size_t i = 0; i < innerCount; ++i) {
+ metaSink(0);
+ }
+ } else {
+ Inner_->StoreMetadata(*data.child_data[0], offset, metaSink);
+ }
+ }
+
+ void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const {
+ YQL_ENSURE(offset <= data.offset);
+ StoreNulls(data, offset, dst);
+ if (data.GetNullCount() != data.length) {
+ Inner_->StoreArray(*data.child_data[0], offset, dst);
+ }
+ }
+
+ const std::unique_ptr<IBlockSerializer> Inner_;
+};
+
+class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase {
+public:
+ explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner)
+ : Inner_(std::move(inner))
+ {
+ }
+private:
+ void DoLoadMetadata(const TMetadataSource& metaSource) final {
+ Inner_->LoadMetadata(metaSource);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset);
+ }
+
+ bool IsNullable() const final {
+ return true;
+ }
+
+ void DoResetMetadata() final {
+ Inner_->ResetMetadata();
+ }
+
+ void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final {
+ ArrowType_ = type;
+ YQL_ENSURE(type->fields().size() == 1);
+ Inner_->SetArrowType(type->fields().front()->type());
+ }
+
+ const std::unique_ptr<TBlockDeserializerBase> Inner_;
+};
+
+template<bool Nullable>
+class TTupleBlockSerializer final : public IBlockSerializer {
+public:
+ explicit TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children)
+ : Children_(std::move(children))
+ {
+ }
+private:
+ size_t ArrayMetadataCount() const final {
+ size_t result = GetChildMetaCount();
+ if constexpr (Nullable) {
+ result += 2;
+ }
+ return result;
+ }
+
+ void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ YQL_ENSURE(offset <= data.offset);
+ if constexpr (Nullable) {
+ StoreNullsSizes(data, offset, metaSink);
+ }
+ if (data.GetNullCount() == data.length) {
+ auto childCount = GetChildMetaCount();
+ for (size_t i = 0; i < childCount; ++i) {
+ metaSink(0);
+ }
+ } else {
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->StoreMetadata(*data.child_data[i], offset, metaSink);
+ }
+ }
+ }
+
+ void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const {
+ YQL_ENSURE(offset <= data.offset);
+ if constexpr (Nullable) {
+ StoreNulls(data, offset, dst);
+ }
+ if (data.GetNullCount() != data.length) {
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->StoreArray(*data.child_data[i], offset, dst);
+ }
+ }
+ }
+
+ size_t GetChildMetaCount() const {
+ size_t result = 0;
+ for (const auto& child : Children_) {
+ result += child->ArrayMetadataCount();
+ }
+ return result;
+ }
+
+ const TVector<std::unique_ptr<IBlockSerializer>> Children_;
+};
+
+template<bool Nullable>
+class TTupleBlockDeserializer final : public TBlockDeserializerBase {
+public:
+ explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children)
+ : Children_(std::move(children))
+ {
+ }
+private:
+ void DoLoadMetadata(const TMetadataSource& metaSource) final {
+ for (auto& child : Children_) {
+ child->LoadMetadata(metaSource);
+ }
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ std::vector<std::shared_ptr<arrow::ArrayData>> childData;
+ for (auto& child : Children_) {
+ childData.emplace_back(child->MakeDefaultValue(blockLen, offset));
+ }
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::vector<std::shared_ptr<arrow::ArrayData>> childData;
+ for (auto& child : Children_) {
+ childData.emplace_back(child->LoadArray(src, blockLen, offset));
+ }
+ return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
+ }
+
+ void DoResetMetadata() final {
+ for (auto& child : Children_) {
+ child->ResetMetadata();
+ }
+ }
+
+ bool IsNullable() const final {
+ return Nullable;
+ }
+
+ void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final {
+ ArrowType_ = type;
+ YQL_ENSURE(type->fields().size() == Children_.size());
+ for (size_t i = 0; i < Children_.size(); ++i) {
+ Children_[i]->SetArrowType(type->field(i)->type());
+ }
+ }
+
+ const TVector<std::unique_ptr<TBlockDeserializerBase>> Children_;
+};
+
+struct TSerializerTraits {
+ using TResult = IBlockSerializer;
+ template <bool Nullable>
+ using TTuple = TTupleBlockSerializer<Nullable>;
+ template <typename T, bool Nullable>
+ using TFixedSize = TFixedSizeBlockSerializer<sizeof(T), Nullable>;
+ template <typename TStringType, bool Nullable>
+ using TStrings = TStringBlockSerializer<TStringType, Nullable>;
+ using TExtOptional = TExtOptionalBlockSerializer;
+
+ static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
+ Y_UNUSED(pgBuilder);
+ if (desc.PassByValue) {
+ return std::make_unique<TFixedSize<ui64, true>>();
+ }
+ return std::make_unique<TStrings<arrow::BinaryType, true>>();
+ }
+};
+
+struct TDeserializerTraits {
+ using TResult = TBlockDeserializerBase;
+ template <bool Nullable>
+ using TTuple = TTupleBlockDeserializer<Nullable>;
+ template <typename T, bool Nullable>
+ using TFixedSize = TFixedSizeBlockDeserializer<sizeof(T), Nullable>;
+ template <typename TStringType, bool Nullable>
+ using TStrings = TStringBlockDeserializer<TStringType, Nullable>;
+ using TExtOptional = TExtOptionalBlockDeserializer;
+
+ static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
+ Y_UNUSED(pgBuilder);
+ if (desc.PassByValue) {
+ return std::make_unique<TFixedSize<ui64, true>>();
+ }
+ return std::make_unique<TStrings<arrow::BinaryType, true>>();
+ }
+};
+
+} // namespace
+
+
+std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
+ return NYql::NUdf::MakeBlockReaderImpl<TSerializerTraits>(typeInfoHelper, type, nullptr);
+}
+
+std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) {
+ std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::MakeBlockReaderImpl<TDeserializerTraits>(typeInfoHelper, type, nullptr);
+ result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type));
+ return std::move(result);
+}
+
+
+} // namespace NKikimr::NMiniKQL
diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.h b/ydb/library/yql/minikql/computation/mkql_block_transport.h
new file mode 100644
index 00000000000..97f35241682
--- /dev/null
+++ b/ydb/library/yql/minikql/computation/mkql_block_transport.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <ydb/library/yql/minikql/mkql_node.h>
+
+#include <library/cpp/actors/util/rope.h>
+
+#include <arrow/datum.h>
+
+#include <functional>
+#include <memory>
+
+namespace NKikimr::NMiniKQL {
+
+class IBlockSerializer {
+public:
+ virtual ~IBlockSerializer() = default;
+
+ virtual size_t ArrayMetadataCount() const = 0;
+
+ using TMetadataSink = std::function<void(ui64 meta)>;
+ virtual void StoreMetadata(const arrow::ArrayData& data, ui64 desiredOffset, const TMetadataSink& metaSink) const = 0;
+ virtual void StoreArray(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) const = 0;
+};
+
+class IBlockDeserializer {
+public:
+ virtual ~IBlockDeserializer() = default;
+
+ using TMetadataSource = std::function<ui64()>;
+ virtual void LoadMetadata(const TMetadataSource& metaSource) = 0;
+ virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) = 0;
+};
+
+
+std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type);
+std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type);
+
+}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 776ee539bda..745feeccd5e 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -1,3 +1,4 @@
+#include "mkql_block_impl.h"
#include "mkql_computation_node_pack.h"
#include "mkql_computation_node_pack_impl.h"
#include "mkql_computation_node_holders.h"
@@ -225,6 +226,11 @@ bool HasOptionalFields(const TType* type) {
return false;
}
+ case TType::EKind::Block: {
+ auto blockType = static_cast<const TBlockType*>(type);
+ return HasOptionalFields(blockType->GetItemType());
+ }
+
default:
THROW yexception() << "Unsupported type: " << type->GetKindAsStr();
}
@@ -855,6 +861,16 @@ void PackImpl(const TType* type, TBuf& buffer, const NUdf::TUnboxedValuePod& val
}
}
+bool HasNonTrivialNulls(const arrow::ArrayData& array, i64 expectedOffset) {
+ MKQL_ENSURE(array.offset == expectedOffset, "Unexpected offset in child arrays");
+ i64 nulls = array.GetNullCount();
+ if (nulls > 0 && nulls != array.length) {
+ return true;
+ }
+
+ return AnyOf(array.child_data, [&](const auto& child) { return HasNonTrivialNulls(*child, expectedOffset); });
+}
+
} // namespace
@@ -939,20 +955,73 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
// Transport packer
template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type)
+TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool)
: Type_(type)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
+ , ArrowPool_(pool ? *pool : *arrow::default_memory_pool())
{
MKQL_ENSURE(!stable, "Stable packing is not supported");
+ InitBlocks();
}
template<bool Fast>
-TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type)
+TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool)
: Type_(type)
, State_(ScanTypeProperties(Type_, false))
, IncrementalState_(ScanTypeProperties(Type_, true))
+ , ArrowPool_(pool ? *pool : *arrow::default_memory_pool())
{
+ InitBlocks();
+}
+
+template<bool Fast>
+void TValuePackerTransport<Fast>::InitBlocks() {
+ if (!Type_->IsMulti()) {
+ return;
+ }
+
+ const TMultiType* multiType = static_cast<const TMultiType*>(Type_);
+ ui32 width = multiType->GetElementsCount();
+ if (!width) {
+ return;
+ }
+
+ const TType* last = multiType->GetElementType(width - 1);
+ if (!last->IsBlock()) {
+ return;
+ }
+
+ const TBlockType* blockLenType = static_cast<const TBlockType*>(multiType->GetElementType(width - 1));
+ if (blockLenType->GetShape() != TBlockType::EShape::Scalar) {
+ return;
+ }
+
+ if (!blockLenType->GetItemType()->IsData()) {
+ return;
+ }
+
+ if (static_cast<const TDataType*>(blockLenType->GetItemType())->GetDataSlot() != NUdf::EDataSlot::Uint64) {
+ return;
+ }
+
+ if (AnyOf(multiType->GetElements(), [](const auto& t) { return !t->IsBlock(); })) {
+ return;
+ }
+
+ IsBlock_ = true;
+ ConvertedScalars_.resize(width - 1);
+ for (ui32 i = 0; i < width - 1; ++i) {
+ const TBlockType* itemType = static_cast<const TBlockType*>(multiType->GetElementType(i));
+ const bool isScalar = itemType->GetShape() == TBlockType::EShape::Scalar;
+ BlockSerializers_.emplace_back(MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType()));
+ BlockDeserializers_.emplace_back(MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType()));
+ if (itemType->GetShape() == TBlockType::EShape::Scalar) {
+ BlockReaders_.emplace_back(NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType()));
+ } else {
+ BlockReaders_.emplace_back();
+ }
+ }
}
template<bool Fast>
@@ -964,13 +1033,16 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THold
template<bool Fast>
void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
+ if (IsBlock_) {
+ return UnpackBatchBlocks(std::move(buf), holderFactory, result);
+ }
const size_t totalSize = buf.GetSize();
TChunkedInputBuffer chunked(std::move(buf));
DoUnpackBatch<Fast>(Type_, chunked, totalSize, holderFactory, IncrementalState_, result);
}
template<bool Fast>
-TPagedBuffer::TPtr TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
+TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls");
TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>();
if constexpr (Fast) {
@@ -981,7 +1053,7 @@ TPagedBuffer::TPtr TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePo
PackImpl<Fast, false>(Type_, *result, value, State_);
BuildMeta(result, false);
}
- return result;
+ return TPagedBuffer::AsRope(result);
}
template<bool Fast>
@@ -1013,6 +1085,10 @@ template<bool Fast>
TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 width) {
Y_VERIFY_DEBUG(Type_->IsMulti());
Y_VERIFY_DEBUG(static_cast<const TMultiType*>(Type_)->GetElementsCount() == width);
+ if (IsBlock_) {
+ return AddWideItemBlocks(values, width);
+ }
+
const TMultiType* itemType = static_cast<const TMultiType*>(Type_);
if (!ItemCount_) {
StartPack();
@@ -1026,13 +1102,133 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf
}
template<bool Fast>
+TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 width) {
+ const ui64 len = TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
+
+ auto metadataBuffer = std::make_shared<TPagedBuffer>();
+ PackData<false>(len, *metadataBuffer);
+
+ TMaybe<ui64> originalOffset;
+ bool hasNonTrivialNulls = false;
+ // all offsets should be equal
+ for (size_t i = 0; i < width - 1; ++i) {
+ arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum();
+ if (datum.is_array()) {
+ i64 offset = datum.array()->offset;
+ MKQL_ENSURE(offset >= 0, "Negative offset");
+ if (!originalOffset.Defined()) {
+ originalOffset = offset;
+ } else {
+ MKQL_ENSURE(*originalOffset == offset, "All columns should have equal offsets");
+ }
+ hasNonTrivialNulls = hasNonTrivialNulls || HasNonTrivialNulls(*datum.array(), offset);
+ }
+ }
+
+ const ui64 desiredOffset = (originalOffset.Defined() && hasNonTrivialNulls) ? (*originalOffset % 8) : 0;
+ PackData<false>(desiredOffset, *metadataBuffer);
+
+ // "scalars are present"
+ const ui64 metadataFlags = 1 << 0;
+ PackData<false>(metadataFlags, *metadataBuffer);
+
+ ui32 totalMetadataCount = 0;
+ for (size_t i = 0; i < width - 1; ++i) {
+ totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount();
+ }
+ PackData<false>(totalMetadataCount, *metadataBuffer);
+
+ TVector<std::shared_ptr<arrow::ArrayData>> arrays;
+ arrays.reserve(width - 1);
+ for (ui32 i = 0; i < width - 1; ++i) {
+ arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum();
+ if (datum.is_scalar()) {
+ if (!ConvertedScalars_[i]) {
+ const TType* itemType = static_cast<const TMultiType*>(Type_)->GetElementType(i);
+ datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_);
+ ConvertedScalars_[i] = datum.array();
+ }
+ arrays.emplace_back(ConvertedScalars_[i]);
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array or scalar");
+ arrays.emplace_back(datum.array());
+ }
+ }
+
+ ui32 savedMetadata = 0;
+ for (size_t i = 0; i < width - 1; ++i) {
+ const bool isScalar = BlockReaders_[i] != nullptr;
+ BlockSerializers_[i]->StoreMetadata(*arrays[i], isScalar ? 0 : desiredOffset, [&](ui64 meta) {
+ PackData<false>(meta, *metadataBuffer);
+ ++savedMetadata;
+ });
+ }
+
+ MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error");
+
+ BlockBuffer_.Insert(BlockBuffer_.End(), TPagedBuffer::AsRope(metadataBuffer));
+ for (size_t i = 0; i < width - 1; ++i) {
+ const bool isScalar = BlockReaders_[i] != nullptr;
+ BlockSerializers_[i]->StoreArray(*arrays[i], isScalar ? 0 : desiredOffset, BlockBuffer_);
+ }
+ ++ItemCount_;
+ return *this;
+}
+
+template<bool Fast>
+void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
+ while (!buf.empty()) {
+ TChunkedInputBuffer chunked(std::move(buf));
+ const ui64 len = UnpackData<false, ui64>(chunked);
+ if (len == 0) {
+ continue;
+ }
+
+ const ui64 offset = UnpackData<false, ui64>(chunked);
+ const ui64 metadataFlags = UnpackData<false, ui64>(chunked);
+ MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags");
+
+ ui32 metaCount = UnpackData<false, ui32>(chunked);
+ for (auto& deserializer : BlockDeserializers_) {
+ deserializer->LoadMetadata([&]() -> ui64 {
+ MKQL_ENSURE(metaCount > 0, "No more metadata available");
+ --metaCount;
+ return UnpackData<false, ui64>(chunked);
+ });
+ }
+ MKQL_ENSURE(metaCount == 0, "Partial buffers read");
+ TRope ropeTail = chunked.ReleaseRope();
+ result.PushRow([&](ui32 i) {
+ if (i < BlockDeserializers_.size()) {
+ const bool isScalar = BlockReaders_[i] != nullptr;
+ auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, isScalar ? 0 : offset);
+ if (isScalar) {
+ TBlockItem item = BlockReaders_[i]->GetItem(*array, 0);
+ const TBlockType* itemType = static_cast<const TBlockType*>(static_cast<const TMultiType*>(Type_)->GetElementType(i));
+ return holderFactory.CreateArrowBlock(ConvertScalar(itemType->GetItemType(), item, ArrowPool_));
+ }
+ return holderFactory.CreateArrowBlock(array);
+ }
+ MKQL_ENSURE(i == BlockDeserializers_.size(), "Unexpected row index");
+ return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len)));
+ });
+ buf = std::move(ropeTail);
+ }
+}
+
+template<bool Fast>
void TValuePackerTransport<Fast>::Clear() {
Buffer_.reset();
+ BlockBuffer_.clear();
ItemCount_ = 0;
}
template<bool Fast>
-TPagedBuffer::TPtr TValuePackerTransport<Fast>::Finish() {
+TRope TValuePackerTransport<Fast>::Finish() {
+ if (IsBlock_) {
+ return FinishBlocks();
+ }
+
if (!ItemCount_) {
StartPack();
}
@@ -1045,6 +1241,13 @@ TPagedBuffer::TPtr TValuePackerTransport<Fast>::Finish() {
}
TPagedBuffer::TPtr result = std::move(Buffer_);
Clear();
+ return TPagedBuffer::AsRope(result);
+}
+
+template<bool Fast>
+TRope TValuePackerTransport<Fast>::FinishBlocks() {
+ TRope result = std::move(BlockBuffer_);
+ Clear();
return result;
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index 03255a09ba5..52c8b10a25a 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -3,6 +3,8 @@
#include "mkql_computation_node.h"
#include "mkql_computation_node_holders.h"
#include "mkql_optional_usage_mask.h"
+#include "mkql_block_transport.h"
+#include "mkql_block_reader.h"
#include <ydb/library/yql/minikql/mkql_buffer.h>
#include <ydb/library/yql/public/udf/udf_value.h>
@@ -73,32 +75,47 @@ class TValuePackerTransport {
public:
using TSelf = TValuePackerTransport<Fast>;
- explicit TValuePackerTransport(const TType* type);
+ explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr);
// for compatibility with TValuePackerGeneric - stable packing is not supported
- TValuePackerTransport(bool stable, const TType* type);
+ TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr);
// AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count);
size_t PackedSizeEstimate() const {
- return Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0;
+ return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
}
void Clear();
- TPagedBuffer::TPtr Finish();
+ TRope Finish();
// Pack()/Unpack() will pack/unpack single value of type T
- TPagedBuffer::TPtr Pack(const NUdf::TUnboxedValuePod& value) const;
+ TRope Pack(const NUdf::TUnboxedValuePod& value) const;
NUdf::TUnboxedValue Unpack(TRope&& buf, const THolderFactory& holderFactory) const;
void UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
private:
void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const;
void StartPack();
+ void InitBlocks();
+ TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count);
+ TRope FinishBlocks();
+ void UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
+
const TType* const Type_;
ui64 ItemCount_ = 0;
TPagedBuffer::TPtr Buffer_;
mutable NDetails::TPackerState State_;
mutable NDetails::TPackerState IncrementalState_;
+
+ arrow::MemoryPool& ArrowPool_;
+ bool IsBlock_ = false;
+
+ TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_;
+ TVector<std::unique_ptr<IBlockReader>> BlockReaders_;
+ TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_;
+ TRope BlockBuffer_;
+
+ TVector<std::unique_ptr<IBlockDeserializer>> BlockDeserializers_;
};
using TValuePacker = TValuePackerGeneric<false>;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h
index 60d9597252f..d71d4692f6a 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h
@@ -108,6 +108,18 @@ public:
}
}
+ inline TRope ReleaseRope() {
+ Y_VERIFY_DEBUG(OriginalLen_ >= Len_);
+ Rope_.EraseFront(OriginalLen_ - Len_);
+ TRope result = std::move(Rope_);
+
+ Data_ = nullptr;
+ Len_ = OriginalLen_ = 0;
+ Rope_.clear();
+
+ return result;
+ }
+
void Next() {
Y_VERIFY_DEBUG(Len_ == 0);
Rope_.EraseFront(OriginalLen_);
diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc
index 7785d0762e9..be501a3097a 100644
--- a/ydb/library/yql/minikql/computation/ya.make.inc
+++ b/ydb/library/yql/minikql/computation/ya.make.inc
@@ -4,6 +4,7 @@ SRCS(
mkql_block_builder.cpp
mkql_block_impl.cpp
mkql_block_reader.cpp
+ mkql_block_transport.cpp
mkql_computation_node.cpp
mkql_computation_node_codegen.cpp
mkql_computation_node_graph.cpp
@@ -20,6 +21,7 @@ SRCS(
PEERDIR(
contrib/libs/apache/arrow
+ library/cpp/actors/util
library/cpp/enumbitset
library/cpp/packedtypes
library/cpp/random_provider