diff options
author | aneporada <aneporada@ydb.tech> | 2023-07-20 17:59:54 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-07-20 17:59:54 +0300 |
commit | 27820316f1e6f037bc7dcdd28015a4df8a0b65d5 (patch) | |
tree | ab9e31f2acd528d56573207bd553e449be0fac76 | |
parent | 5b31f78da1eea0701abbcd8fdf35928c79e5d04e (diff) | |
download | ydb-27820316f1e6f037bc7dcdd28015a4df8a0b65d5.tar.gz |
Support arrow blocks in transport packer
15 files changed, 867 insertions, 42 deletions
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.cpp b/ydb/library/yql/dq/common/dq_serialized_batch.cpp index 71cb32ff935..d96a7fee6d9 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.cpp +++ b/ydb/library/yql/dq/common/dq_serialized_batch.cpp @@ -26,12 +26,18 @@ T ReadNumber(TStringBuf& src) { } -void TDqSerializedBatch::SetPayload(const NKikimr::NMiniKQL::TPagedBuffer::TPtr& buffer) { +void TDqSerializedBatch::SetPayload(TRope&& payload) { + Proto.ClearRaw(); if (IsOOBTransport((NDqProto::EDataTransportVersion)Proto.GetTransportVersion())) { - Payload = NKikimr::NMiniKQL::TPagedBuffer::AsRope(std::move(buffer)); + Payload = std::move(payload); } else { - Proto.MutableRaw()->reserve(buffer->Size()); - buffer->CopyTo(*Proto.MutableRaw()); + Payload.clear(); + Proto.MutableRaw()->reserve(payload.size()); + while (!payload.IsEmpty()) { + auto it = payload.Begin(); + Proto.MutableRaw()->append(it.ContiguousData(), it.ContiguousSize()); + payload.Erase(it, it + it.ContiguousSize()); + } } } diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h index 17379060eec..7591f2b6685 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.h +++ b/ydb/library/yql/dq/common/dq_serialized_batch.h @@ -38,7 +38,7 @@ struct TDqSerializedBatch { return oob; } - void SetPayload(const NKikimr::NMiniKQL::TPagedBuffer::TPtr& buffer); + void SetPayload(TRope&& payload); TRope PullPayload() { TRope result; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index f925b261155..f426885ce85 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -127,8 +127,8 @@ public: if (packerSize >= MaxChunkBytes) { Data.emplace_back(); Data.back().Buffer = Packer.Finish(); - BasicStats.Bytes += Data.back().Buffer->Size(); - PackedDataSize += Data.back().Buffer->Size(); + BasicStats.Bytes += Data.back().Buffer.size(); + PackedDataSize += Data.back().Buffer.size(); PackedRowCount += ChunkRowCount; Data.back().RowCount = ChunkRowCount; ChunkRowCount = 0; @@ -137,13 +137,13 @@ public: while (Storage && PackedDataSize && PackedDataSize + packerSize > MaxStoredBytes) { auto& head = Data.front(); - size_t bufSize = head.Buffer->Size(); + size_t bufSize = head.Buffer.size(); YQL_ENSURE(PackedDataSize >= bufSize); TDqSerializedBatch data; data.Proto.SetTransportVersion(TransportVersion); data.Proto.SetRows(head.RowCount); - data.SetPayload(head.Buffer); + data.SetPayload(std::move(head.Buffer)); Storage->Put(NextStoredId++, SaveForSpilling(std::move(data))); PackedDataSize -= bufSize; @@ -203,15 +203,14 @@ public: SpilledRowCount -= data.RowCount(); } else if (!Data.empty()) { auto& packed = Data.front(); - data.Proto.SetRows(packed.RowCount); - data.SetPayload(packed.Buffer); PackedRowCount -= packed.RowCount; - PackedDataSize -= packed.Buffer->Size(); + PackedDataSize -= packed.Buffer.size(); + data.Proto.SetRows(packed.RowCount); + data.SetPayload(std::move(packed.Buffer)); Data.pop_front(); } else { data.Proto.SetRows(ChunkRowCount); - auto buffer = Packer.Finish(); - data.SetPayload(buffer); + data.SetPayload(Packer.Finish()); ChunkRowCount = 0; } @@ -265,8 +264,8 @@ public: if (ChunkRowCount) { Data.emplace_back(); Data.back().Buffer = Packer.Finish(); - BasicStats.Bytes += Data.back().Buffer->Size(); - PackedDataSize += Data.back().Buffer->Size(); + BasicStats.Bytes += Data.back().Buffer.size(); + PackedDataSize += Data.back().Buffer.size(); PackedRowCount += ChunkRowCount; Data.back().RowCount = ChunkRowCount; ChunkRowCount = 0; @@ -350,7 +349,7 @@ private: TLogFunc LogFunc; struct TSerializedBatch { - TPagedBuffer::TPtr Buffer; + TRope Buffer; ui64 RowCount = 0; }; std::deque<TSerializedBatch> Data; diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index f2fcf48dc65..f22f0210aff 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -19,7 +19,7 @@ using namespace NYql; namespace { TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const TType* type, const NUdf::TUnboxedValuePod& value) { - TPagedBuffer::TPtr packResult; + TRope packResult; switch (version) { case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED: version = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0; @@ -43,12 +43,12 @@ TDqSerializedBatch SerializeValue(NDqProto::EDataTransportVersion version, const TDqSerializedBatch result; result.Proto.SetTransportVersion(version); result.Proto.SetRows(1); - result.SetPayload(packResult); + result.SetPayload(std::move(packResult)); return result; } template<bool Fast> -TPagedBuffer::TPtr DoSerializeBuffer(const TType* type, const TUnboxedValueBatch& buffer) { +TRope DoSerializeBuffer(const TType* type, const TUnboxedValueBatch& buffer) { using TPacker = TValuePackerTransport<Fast>; TPacker packer(/* stable */ false, type); @@ -65,7 +65,7 @@ TPagedBuffer::TPtr DoSerializeBuffer(const TType* type, const TUnboxedValueBatch } TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, const TType* type, const TUnboxedValueBatch& buffer) { - TPagedBuffer::TPtr packResult; + TRope packResult; switch (version) { case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED: version = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0; @@ -87,7 +87,7 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons TDqSerializedBatch result; result.Proto.SetTransportVersion(version); result.Proto.SetRows(buffer.RowCount()); - result.SetPayload(packResult); + result.SetPayload(std::move(packResult)); return result; } diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index 656ca49fb1e..78764b746b6 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -80,19 +80,10 @@ private: ++first; ++count; } - const auto& packed = packer.Finish(); TDqSerializedBatch result; result.Proto.SetTransportVersion(TransportVersion); result.Proto.SetRows(count); - if (TransportVersion == NDqProto::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0 || - TransportVersion == NDqProto::DATA_TRANSPORT_OOB_PICKLE_1_0) - { - result.Payload = NKikimr::NMiniKQL::TPagedBuffer::AsRope(packed); - - } else { - result.Proto.MutableRaw()->reserve(packed->Size()); - packed->CopyTo(*result.Proto.MutableRaw()); - } + result.SetPayload(packer.Finish()); return result; } diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt index bbea34f1bd6..853b83dac49 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.darwin-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC Target-X86-AsmParser lib-Transforms-IPO libs-apache-arrow + cpp-actors-util library-cpp-enumbitset library-cpp-packedtypes library-cpp-random_provider @@ -38,6 +39,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt index 2b98eb3a366..031693fb3a8 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC Target-X86-AsmParser lib-Transforms-IPO libs-apache-arrow + cpp-actors-util library-cpp-enumbitset library-cpp-packedtypes library-cpp-random_provider @@ -39,6 +40,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt index 2b98eb3a366..031693fb3a8 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.linux-x86_64.txt @@ -24,6 +24,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC Target-X86-AsmParser lib-Transforms-IPO libs-apache-arrow + cpp-actors-util library-cpp-enumbitset library-cpp-packedtypes library-cpp-random_provider @@ -39,6 +40,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp diff --git a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt index bbea34f1bd6..853b83dac49 100644 --- a/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/computation/llvm/CMakeLists.windows-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(minikql-computation-llvm PUBLIC Target-X86-AsmParser lib-Transforms-IPO libs-apache-arrow + cpp-actors-util library-cpp-enumbitset library-cpp-packedtypes library-cpp-random_provider @@ -38,6 +39,7 @@ target_sources(minikql-computation-llvm PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_reader.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_block_transport.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_codegen.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp new file mode 100644 index 00000000000..2ccc5d3a82d --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -0,0 +1,549 @@ +#include "mkql_block_transport.h" +#include "mkql_block_builder.h" + +#include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/public/udf/arrow/block_reader.h> +#include <ydb/library/yql/utils/rope_over_buffer.h> +#include <ydb/library/yql/utils/yql_panic.h> + +namespace NKikimr::NMiniKQL { + +namespace { + +class TOwnedArrowBuffer : public arrow::Buffer { +public: + TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner) + : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.Data()), span.Size()) + , Owner_(owner) + { + } +private: + const std::shared_ptr<const void> Owner_; +}; + +std::shared_ptr<arrow::Buffer> MakeEmptyBuffer() { + return std::make_shared<arrow::Buffer>(nullptr, 0); +} + +std::shared_ptr<arrow::Buffer> MakeZeroBuffer(size_t byteLen) { + constexpr size_t NullWordCount = (MaxBlockSizeInBytes + sizeof(ui8) - 1) / sizeof(ui8); + static ui64 nulls[NullWordCount] = { 0 }; + if (byteLen <= sizeof(nulls)) { + return std::make_shared<arrow::Buffer>(reinterpret_cast<const ui8*>(nulls), byteLen); + } + + size_t wordCount = (byteLen + sizeof(ui8) - 1) / sizeof(ui8); + std::shared_ptr<ui64[]> buf(new ui64[wordCount]); + std::fill(buf.get(), buf.get() + wordCount, 0); + return std::make_shared<TOwnedArrowBuffer>(TContiguousSpan{ reinterpret_cast<const char*>(buf.get()), byteLen }, buf); +} + +std::shared_ptr<arrow::Buffer> MakeZeroBitmap(size_t bitCount) { + // align up 8 byte boundary + size_t byteCount = ((bitCount + 63u) & ~size_t(63u)) >> 3; + return MakeZeroBuffer(byteCount); +} + +bool NeedStoreBitmap(const arrow::ArrayData& data) { + size_t nullCount = data.GetNullCount(); + return nullCount != 0 && nullCount != data.length; +} + +void StoreNullsSizes(const arrow::ArrayData& data, ui64 desiredOffset, const IBlockSerializer::TMetadataSink& metaSink) { + metaSink(data.GetNullCount()); + if (!NeedStoreBitmap(data)) { + metaSink(0); + return; + } + + size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3; + metaSink(nullBytes); +} + +void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& nullsCount, TMaybe<ui64>& nullsSize) { + YQL_ENSURE(!nullsCount.Defined() && !nullsSize.Defined(), "Attempt to load null sizes twice (most likely LoadArray() is not called)"); + nullsCount = metaSource(); + nullsSize = metaSource(); +} + +void StoreNulls(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) { + if (!NeedStoreBitmap(data)) { + return; + } + size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3; + YQL_ENSURE(desiredOffset <= data.offset); + YQL_ENSURE((data.offset - desiredOffset) % 8 == 0); + const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8; + dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[0], nulls, nullBytes)); +} + +void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) { + YQL_ENSURE(!result.Defined(), "Attempt to load buffer size twice (most likely LoadArray() is not called)"); + result = metaSource(); +} + +std::shared_ptr<arrow::Buffer> LoadBuffer(TRope& source, TMaybe<ui64> size) { + YQL_ENSURE(size.Defined(), "Buffer size is not loaded"); + if (!*size) { + return std::make_shared<arrow::Buffer>(nullptr, 0); + } + + YQL_ENSURE(source.size() >= *size, "Premature end of data"); + auto owner = std::make_shared<TRope>(source.Begin(), source.Begin() + *size); + source.EraseFront(*size); + + owner->Compact(); + return std::make_shared<TOwnedArrowBuffer>(owner->GetContiguousSpan(), owner); +} + +std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TRope& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) { + YQL_ENSURE(nullCount.Defined(), "Bitmap null count is not loaded"); + YQL_ENSURE(bitmapSize.Defined(), "Bitmap size is not loaded"); + if (*nullCount == 0) { + YQL_ENSURE(!*bitmapSize); + return {}; + } + YQL_ENSURE(*bitmapSize); + return LoadBuffer(source, bitmapSize); +} + +class TBlockDeserializerBase : public IBlockDeserializer { +public: + TBlockDeserializerBase() = default; + + virtual void SetArrowType(const std::shared_ptr<arrow::DataType>& type) { + ArrowType_ = type; + } + + void LoadMetadata(const TMetadataSource& metaSource) final { + if (IsNullable()) { + LoadNullsSizes(metaSource, NullsCount_, NullsSize_); + } + DoLoadMetadata(metaSource); + } + + virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) final { + YQL_ENSURE(blockLen > 0, "Should be handled earlier"); + std::shared_ptr<arrow::Buffer> nulls; + i64 nullsCount = 0; + if (IsNullable()) { + YQL_ENSURE(NullsCount_.Defined() && NullsSize_.Defined(), "Nulls metadata should be loaded"); + if (*NullsCount_ != 0) { + if (*NullsSize_ == 0) { + return MakeDefaultValue(blockLen, offset); + } + nulls = LoadNullsBitmap(src, NullsCount_, NullsSize_); + nullsCount = *NullsCount_; + } + } + auto result = DoLoadArray(src, nulls, nullsCount, blockLen, offset); + ResetMetadata(); + return result; + } + + void ResetMetadata() { + NullsCount_ = NullsSize_ = {}; + DoResetMetadata(); + } + + std::shared_ptr<arrow::ArrayData> MakeDefaultValue(ui64 blockLen, ui64 offset) { + std::shared_ptr<arrow::Buffer> nulls; + i64 nullsCount = 0; + if (IsNullable()) { + nulls = MakeZeroBitmap(blockLen + offset); + nullsCount = blockLen; + } + return DoMakeDefaultValue(nulls, nullsCount, blockLen, offset); + } + +protected: + virtual void DoLoadMetadata(const TMetadataSource& metaSource) = 0; + virtual void DoResetMetadata() = 0; + virtual bool IsNullable() const = 0; + virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0; + virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0; + + std::shared_ptr<arrow::DataType> ArrowType_; + TMaybe<ui64> NullsCount_; + TMaybe<ui64> NullsSize_; +}; + +template<size_t ObjectSize, bool Nullable> +class TFixedSizeBlockSerializer final : public IBlockSerializer { +public: + TFixedSizeBlockSerializer() = default; +private: + size_t ArrayMetadataCount() const final { + return Nullable ? 3 : 1; + } + + void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { + YQL_ENSURE(offset <= data.offset); + if constexpr (Nullable) { + StoreNullsSizes(data, offset, metaSink); + if (data.GetNullCount() == data.length) { + metaSink(0); + return; + } + } + size_t dataBytes = ((size_t)data.length + offset) * ObjectSize; + metaSink(dataBytes); + } + + void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const final { + YQL_ENSURE(offset <= data.offset); + if constexpr (Nullable) { + StoreNulls(data, offset, dst); + if (data.GetNullCount() == data.length) { + return; + } + } + + const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - offset) * ObjectSize; + size_t dataBytes = ((size_t)data.length + offset) * ObjectSize; + dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], buf, dataBytes)); + } +}; + +template<size_t ObjectSize, bool Nullable> +class TFixedSizeBlockDeserializer final : public TBlockDeserializerBase { +public: + TFixedSizeBlockDeserializer() = default; +private: + void DoLoadMetadata(const TMetadataSource& metaSource) final { + LoadBufferSize(metaSource, DataSize_); + } + + bool IsNullable() const final { + return Nullable; + } + + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + auto data = MakeZeroBuffer((blockLen + offset) * ObjectSize); + return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset); + } + + std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + auto data = LoadBuffer(src, DataSize_); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset); + } + + void DoResetMetadata() final { + DataSize_ = {}; + } + + TMaybe<ui64> DataSize_; +}; + + +template<typename TStringType, bool Nullable> +class TStringBlockSerializer final : public IBlockSerializer { + using TOffset = typename TStringType::offset_type; +public: + TStringBlockSerializer() = default; +private: + size_t ArrayMetadataCount() const final { + return Nullable ? 4 : 2; + } + + void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { + YQL_ENSURE(offset <= data.offset); + if constexpr (Nullable) { + StoreNullsSizes(data, offset, metaSink); + if (data.GetNullCount() == data.length) { + metaSink(0); + metaSink(0); + return; + } + } + + size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset); + metaSink(offsetsSize); + metaSink(data.buffers[2]->size()); + } + + void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const { + YQL_ENSURE(offset <= data.offset); + if constexpr (Nullable) { + StoreNulls(data, offset, dst); + if (data.GetNullCount() == data.length) { + return; + } + } + + const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - offset); + size_t offsetsSize = ((size_t)data.length + 1 + offset) * sizeof(TOffset); + dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[1], offsets, offsetsSize)); + + const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data()); + size_t mainSize = data.buffers[2]->size(); + dst.Insert(dst.End(), NYql::MakeReadOnlyRope(data.buffers[2], mainData, mainSize)); + } +}; + +template<typename TStringType, bool Nullable> +class TStringBlockDeserializer final : public TBlockDeserializerBase { + using TOffset = typename TStringType::offset_type; +public: + TStringBlockDeserializer() = default; +private: + void DoLoadMetadata(const TMetadataSource& metaSource) final { + LoadBufferSize(metaSource, OffsetsSize_); + LoadBufferSize(metaSource, DataSize_); + } + + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + auto offsets = MakeZeroBuffer((blockLen + 1 + offset) * sizeof(TOffset)); + auto data = MakeEmptyBuffer(); + return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset); + } + + std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + auto offsets = LoadBuffer(src, OffsetsSize_); + auto data = LoadBuffer(src, DataSize_); + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset); + } + + bool IsNullable() const final { + return Nullable; + } + + void DoResetMetadata() final { + OffsetsSize_ = DataSize_ = {}; + } + + TMaybe<ui64> OffsetsSize_; + TMaybe<ui64> DataSize_; +}; + +class TExtOptionalBlockSerializer final : public IBlockSerializer { +public: + explicit TExtOptionalBlockSerializer(std::unique_ptr<IBlockSerializer>&& inner) + : Inner_(std::move(inner)) + { + } +private: + size_t ArrayMetadataCount() const final { + return 2 + Inner_->ArrayMetadataCount(); + } + + void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { + YQL_ENSURE(offset <= data.offset); + StoreNullsSizes(data, offset, metaSink); + if (data.GetNullCount() == data.length) { + auto innerCount = Inner_->ArrayMetadataCount(); + for (size_t i = 0; i < innerCount; ++i) { + metaSink(0); + } + } else { + Inner_->StoreMetadata(*data.child_data[0], offset, metaSink); + } + } + + void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const { + YQL_ENSURE(offset <= data.offset); + StoreNulls(data, offset, dst); + if (data.GetNullCount() != data.length) { + Inner_->StoreArray(*data.child_data[0], offset, dst); + } + } + + const std::unique_ptr<IBlockSerializer> Inner_; +}; + +class TExtOptionalBlockDeserializer final : public TBlockDeserializerBase { +public: + explicit TExtOptionalBlockDeserializer(std::unique_ptr<TBlockDeserializerBase>&& inner) + : Inner_(std::move(inner)) + { + } +private: + void DoLoadMetadata(const TMetadataSource& metaSource) final { + Inner_->LoadMetadata(metaSource); + } + + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset); + } + + std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset); + } + + bool IsNullable() const final { + return true; + } + + void DoResetMetadata() final { + Inner_->ResetMetadata(); + } + + void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final { + ArrowType_ = type; + YQL_ENSURE(type->fields().size() == 1); + Inner_->SetArrowType(type->fields().front()->type()); + } + + const std::unique_ptr<TBlockDeserializerBase> Inner_; +}; + +template<bool Nullable> +class TTupleBlockSerializer final : public IBlockSerializer { +public: + explicit TTupleBlockSerializer(TVector<std::unique_ptr<IBlockSerializer>>&& children) + : Children_(std::move(children)) + { + } +private: + size_t ArrayMetadataCount() const final { + size_t result = GetChildMetaCount(); + if constexpr (Nullable) { + result += 2; + } + return result; + } + + void StoreMetadata(const arrow::ArrayData& data, ui64 offset, const IBlockSerializer::TMetadataSink& metaSink) const final { + YQL_ENSURE(offset <= data.offset); + if constexpr (Nullable) { + StoreNullsSizes(data, offset, metaSink); + } + if (data.GetNullCount() == data.length) { + auto childCount = GetChildMetaCount(); + for (size_t i = 0; i < childCount; ++i) { + metaSink(0); + } + } else { + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->StoreMetadata(*data.child_data[i], offset, metaSink); + } + } + } + + void StoreArray(const arrow::ArrayData& data, ui64 offset, TRope& dst) const { + YQL_ENSURE(offset <= data.offset); + if constexpr (Nullable) { + StoreNulls(data, offset, dst); + } + if (data.GetNullCount() != data.length) { + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->StoreArray(*data.child_data[i], offset, dst); + } + } + } + + size_t GetChildMetaCount() const { + size_t result = 0; + for (const auto& child : Children_) { + result += child->ArrayMetadataCount(); + } + return result; + } + + const TVector<std::unique_ptr<IBlockSerializer>> Children_; +}; + +template<bool Nullable> +class TTupleBlockDeserializer final : public TBlockDeserializerBase { +public: + explicit TTupleBlockDeserializer(TVector<std::unique_ptr<TBlockDeserializerBase>>&& children) + : Children_(std::move(children)) + { + } +private: + void DoLoadMetadata(const TMetadataSource& metaSource) final { + for (auto& child : Children_) { + child->LoadMetadata(metaSource); + } + } + + std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final { + std::vector<std::shared_ptr<arrow::ArrayData>> childData; + for (auto& child : Children_) { + childData.emplace_back(child->MakeDefaultValue(blockLen, offset)); + } + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + } + + std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::vector<std::shared_ptr<arrow::ArrayData>> childData; + for (auto& child : Children_) { + childData.emplace_back(child->LoadArray(src, blockLen, offset)); + } + return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); + } + + void DoResetMetadata() final { + for (auto& child : Children_) { + child->ResetMetadata(); + } + } + + bool IsNullable() const final { + return Nullable; + } + + void SetArrowType(const std::shared_ptr<arrow::DataType>& type) final { + ArrowType_ = type; + YQL_ENSURE(type->fields().size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->SetArrowType(type->field(i)->type()); + } + } + + const TVector<std::unique_ptr<TBlockDeserializerBase>> Children_; +}; + +struct TSerializerTraits { + using TResult = IBlockSerializer; + template <bool Nullable> + using TTuple = TTupleBlockSerializer<Nullable>; + template <typename T, bool Nullable> + using TFixedSize = TFixedSizeBlockSerializer<sizeof(T), Nullable>; + template <typename TStringType, bool Nullable> + using TStrings = TStringBlockSerializer<TStringType, Nullable>; + using TExtOptional = TExtOptionalBlockSerializer; + + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { + Y_UNUSED(pgBuilder); + if (desc.PassByValue) { + return std::make_unique<TFixedSize<ui64, true>>(); + } + return std::make_unique<TStrings<arrow::BinaryType, true>>(); + } +}; + +struct TDeserializerTraits { + using TResult = TBlockDeserializerBase; + template <bool Nullable> + using TTuple = TTupleBlockDeserializer<Nullable>; + template <typename T, bool Nullable> + using TFixedSize = TFixedSizeBlockDeserializer<sizeof(T), Nullable>; + template <typename TStringType, bool Nullable> + using TStrings = TStringBlockDeserializer<TStringType, Nullable>; + using TExtOptional = TExtOptionalBlockDeserializer; + + static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { + Y_UNUSED(pgBuilder); + if (desc.PassByValue) { + return std::make_unique<TFixedSize<ui64, true>>(); + } + return std::make_unique<TStrings<arrow::BinaryType, true>>(); + } +}; + +} // namespace + + +std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { + return NYql::NUdf::MakeBlockReaderImpl<TSerializerTraits>(typeInfoHelper, type, nullptr); +} + +std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type) { + std::unique_ptr<TBlockDeserializerBase> result = NYql::NUdf::MakeBlockReaderImpl<TDeserializerTraits>(typeInfoHelper, type, nullptr); + result->SetArrowType(NYql::NUdf::GetArrowType(typeInfoHelper, type)); + return std::move(result); +} + + +} // namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.h b/ydb/library/yql/minikql/computation/mkql_block_transport.h new file mode 100644 index 00000000000..97f35241682 --- /dev/null +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.h @@ -0,0 +1,38 @@ +#pragma once + +#include <ydb/library/yql/minikql/mkql_node.h> + +#include <library/cpp/actors/util/rope.h> + +#include <arrow/datum.h> + +#include <functional> +#include <memory> + +namespace NKikimr::NMiniKQL { + +class IBlockSerializer { +public: + virtual ~IBlockSerializer() = default; + + virtual size_t ArrayMetadataCount() const = 0; + + using TMetadataSink = std::function<void(ui64 meta)>; + virtual void StoreMetadata(const arrow::ArrayData& data, ui64 desiredOffset, const TMetadataSink& metaSink) const = 0; + virtual void StoreArray(const arrow::ArrayData& data, ui64 desiredOffset, TRope& dst) const = 0; +}; + +class IBlockDeserializer { +public: + virtual ~IBlockDeserializer() = default; + + using TMetadataSource = std::function<ui64()>; + virtual void LoadMetadata(const TMetadataSource& metaSource) = 0; + virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) = 0; +}; + + +std::unique_ptr<IBlockSerializer> MakeBlockSerializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type); +std::unique_ptr<IBlockDeserializer> MakeBlockDeserializer(const NYql::NUdf::ITypeInfoHelper& typeInfoHelper, const NYql::NUdf::TType* type); + +} diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 776ee539bda..745feeccd5e 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -1,3 +1,4 @@ +#include "mkql_block_impl.h" #include "mkql_computation_node_pack.h" #include "mkql_computation_node_pack_impl.h" #include "mkql_computation_node_holders.h" @@ -225,6 +226,11 @@ bool HasOptionalFields(const TType* type) { return false; } + case TType::EKind::Block: { + auto blockType = static_cast<const TBlockType*>(type); + return HasOptionalFields(blockType->GetItemType()); + } + default: THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } @@ -855,6 +861,16 @@ void PackImpl(const TType* type, TBuf& buffer, const NUdf::TUnboxedValuePod& val } } +bool HasNonTrivialNulls(const arrow::ArrayData& array, i64 expectedOffset) { + MKQL_ENSURE(array.offset == expectedOffset, "Unexpected offset in child arrays"); + i64 nulls = array.GetNullCount(); + if (nulls > 0 && nulls != array.length) { + return true; + } + + return AnyOf(array.child_data, [&](const auto& child) { return HasNonTrivialNulls(*child, expectedOffset); }); +} + } // namespace @@ -939,20 +955,73 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) // Transport packer template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type) +TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool) : Type_(type) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) + , ArrowPool_(pool ? *pool : *arrow::default_memory_pool()) { MKQL_ENSURE(!stable, "Stable packing is not supported"); + InitBlocks(); } template<bool Fast> -TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type) +TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool) : Type_(type) , State_(ScanTypeProperties(Type_, false)) , IncrementalState_(ScanTypeProperties(Type_, true)) + , ArrowPool_(pool ? *pool : *arrow::default_memory_pool()) { + InitBlocks(); +} + +template<bool Fast> +void TValuePackerTransport<Fast>::InitBlocks() { + if (!Type_->IsMulti()) { + return; + } + + const TMultiType* multiType = static_cast<const TMultiType*>(Type_); + ui32 width = multiType->GetElementsCount(); + if (!width) { + return; + } + + const TType* last = multiType->GetElementType(width - 1); + if (!last->IsBlock()) { + return; + } + + const TBlockType* blockLenType = static_cast<const TBlockType*>(multiType->GetElementType(width - 1)); + if (blockLenType->GetShape() != TBlockType::EShape::Scalar) { + return; + } + + if (!blockLenType->GetItemType()->IsData()) { + return; + } + + if (static_cast<const TDataType*>(blockLenType->GetItemType())->GetDataSlot() != NUdf::EDataSlot::Uint64) { + return; + } + + if (AnyOf(multiType->GetElements(), [](const auto& t) { return !t->IsBlock(); })) { + return; + } + + IsBlock_ = true; + ConvertedScalars_.resize(width - 1); + for (ui32 i = 0; i < width - 1; ++i) { + const TBlockType* itemType = static_cast<const TBlockType*>(multiType->GetElementType(i)); + const bool isScalar = itemType->GetShape() == TBlockType::EShape::Scalar; + BlockSerializers_.emplace_back(MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType())); + BlockDeserializers_.emplace_back(MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType())); + if (itemType->GetShape() == TBlockType::EShape::Scalar) { + BlockReaders_.emplace_back(NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType())); + } else { + BlockReaders_.emplace_back(); + } + } } template<bool Fast> @@ -964,13 +1033,16 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THold template<bool Fast> void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { + if (IsBlock_) { + return UnpackBatchBlocks(std::move(buf), holderFactory, result); + } const size_t totalSize = buf.GetSize(); TChunkedInputBuffer chunked(std::move(buf)); DoUnpackBatch<Fast>(Type_, chunked, totalSize, holderFactory, IncrementalState_, result); } template<bool Fast> -TPagedBuffer::TPtr TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { +TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls"); TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(); if constexpr (Fast) { @@ -981,7 +1053,7 @@ TPagedBuffer::TPtr TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePo PackImpl<Fast, false>(Type_, *result, value, State_); BuildMeta(result, false); } - return result; + return TPagedBuffer::AsRope(result); } template<bool Fast> @@ -1013,6 +1085,10 @@ template<bool Fast> TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 width) { Y_VERIFY_DEBUG(Type_->IsMulti()); Y_VERIFY_DEBUG(static_cast<const TMultiType*>(Type_)->GetElementsCount() == width); + if (IsBlock_) { + return AddWideItemBlocks(values, width); + } + const TMultiType* itemType = static_cast<const TMultiType*>(Type_); if (!ItemCount_) { StartPack(); @@ -1026,13 +1102,133 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf } template<bool Fast> +TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 width) { + const ui64 len = TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; + + auto metadataBuffer = std::make_shared<TPagedBuffer>(); + PackData<false>(len, *metadataBuffer); + + TMaybe<ui64> originalOffset; + bool hasNonTrivialNulls = false; + // all offsets should be equal + for (size_t i = 0; i < width - 1; ++i) { + arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum(); + if (datum.is_array()) { + i64 offset = datum.array()->offset; + MKQL_ENSURE(offset >= 0, "Negative offset"); + if (!originalOffset.Defined()) { + originalOffset = offset; + } else { + MKQL_ENSURE(*originalOffset == offset, "All columns should have equal offsets"); + } + hasNonTrivialNulls = hasNonTrivialNulls || HasNonTrivialNulls(*datum.array(), offset); + } + } + + const ui64 desiredOffset = (originalOffset.Defined() && hasNonTrivialNulls) ? (*originalOffset % 8) : 0; + PackData<false>(desiredOffset, *metadataBuffer); + + // "scalars are present" + const ui64 metadataFlags = 1 << 0; + PackData<false>(metadataFlags, *metadataBuffer); + + ui32 totalMetadataCount = 0; + for (size_t i = 0; i < width - 1; ++i) { + totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount(); + } + PackData<false>(totalMetadataCount, *metadataBuffer); + + TVector<std::shared_ptr<arrow::ArrayData>> arrays; + arrays.reserve(width - 1); + for (ui32 i = 0; i < width - 1; ++i) { + arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum(); + if (datum.is_scalar()) { + if (!ConvertedScalars_[i]) { + const TType* itemType = static_cast<const TMultiType*>(Type_)->GetElementType(i); + datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_); + ConvertedScalars_[i] = datum.array(); + } + arrays.emplace_back(ConvertedScalars_[i]); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array or scalar"); + arrays.emplace_back(datum.array()); + } + } + + ui32 savedMetadata = 0; + for (size_t i = 0; i < width - 1; ++i) { + const bool isScalar = BlockReaders_[i] != nullptr; + BlockSerializers_[i]->StoreMetadata(*arrays[i], isScalar ? 0 : desiredOffset, [&](ui64 meta) { + PackData<false>(meta, *metadataBuffer); + ++savedMetadata; + }); + } + + MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); + + BlockBuffer_.Insert(BlockBuffer_.End(), TPagedBuffer::AsRope(metadataBuffer)); + for (size_t i = 0; i < width - 1; ++i) { + const bool isScalar = BlockReaders_[i] != nullptr; + BlockSerializers_[i]->StoreArray(*arrays[i], isScalar ? 0 : desiredOffset, BlockBuffer_); + } + ++ItemCount_; + return *this; +} + +template<bool Fast> +void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { + while (!buf.empty()) { + TChunkedInputBuffer chunked(std::move(buf)); + const ui64 len = UnpackData<false, ui64>(chunked); + if (len == 0) { + continue; + } + + const ui64 offset = UnpackData<false, ui64>(chunked); + const ui64 metadataFlags = UnpackData<false, ui64>(chunked); + MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags"); + + ui32 metaCount = UnpackData<false, ui32>(chunked); + for (auto& deserializer : BlockDeserializers_) { + deserializer->LoadMetadata([&]() -> ui64 { + MKQL_ENSURE(metaCount > 0, "No more metadata available"); + --metaCount; + return UnpackData<false, ui64>(chunked); + }); + } + MKQL_ENSURE(metaCount == 0, "Partial buffers read"); + TRope ropeTail = chunked.ReleaseRope(); + result.PushRow([&](ui32 i) { + if (i < BlockDeserializers_.size()) { + const bool isScalar = BlockReaders_[i] != nullptr; + auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, isScalar ? 0 : offset); + if (isScalar) { + TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); + const TBlockType* itemType = static_cast<const TBlockType*>(static_cast<const TMultiType*>(Type_)->GetElementType(i)); + return holderFactory.CreateArrowBlock(ConvertScalar(itemType->GetItemType(), item, ArrowPool_)); + } + return holderFactory.CreateArrowBlock(array); + } + MKQL_ENSURE(i == BlockDeserializers_.size(), "Unexpected row index"); + return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len))); + }); + buf = std::move(ropeTail); + } +} + +template<bool Fast> void TValuePackerTransport<Fast>::Clear() { Buffer_.reset(); + BlockBuffer_.clear(); ItemCount_ = 0; } template<bool Fast> -TPagedBuffer::TPtr TValuePackerTransport<Fast>::Finish() { +TRope TValuePackerTransport<Fast>::Finish() { + if (IsBlock_) { + return FinishBlocks(); + } + if (!ItemCount_) { StartPack(); } @@ -1045,6 +1241,13 @@ TPagedBuffer::TPtr TValuePackerTransport<Fast>::Finish() { } TPagedBuffer::TPtr result = std::move(Buffer_); Clear(); + return TPagedBuffer::AsRope(result); +} + +template<bool Fast> +TRope TValuePackerTransport<Fast>::FinishBlocks() { + TRope result = std::move(BlockBuffer_); + Clear(); return result; } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index 03255a09ba5..52c8b10a25a 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -3,6 +3,8 @@ #include "mkql_computation_node.h" #include "mkql_computation_node_holders.h" #include "mkql_optional_usage_mask.h" +#include "mkql_block_transport.h" +#include "mkql_block_reader.h" #include <ydb/library/yql/minikql/mkql_buffer.h> #include <ydb/library/yql/public/udf/udf_value.h> @@ -73,32 +75,47 @@ class TValuePackerTransport { public: using TSelf = TValuePackerTransport<Fast>; - explicit TValuePackerTransport(const TType* type); + explicit TValuePackerTransport(const TType* type, arrow::MemoryPool* pool = nullptr); // for compatibility with TValuePackerGeneric - stable packing is not supported - TValuePackerTransport(bool stable, const TType* type); + TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* ppol = nullptr); // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count); size_t PackedSizeEstimate() const { - return Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0; + return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0); } void Clear(); - TPagedBuffer::TPtr Finish(); + TRope Finish(); // Pack()/Unpack() will pack/unpack single value of type T - TPagedBuffer::TPtr Pack(const NUdf::TUnboxedValuePod& value) const; + TRope Pack(const NUdf::TUnboxedValuePod& value) const; NUdf::TUnboxedValue Unpack(TRope&& buf, const THolderFactory& holderFactory) const; void UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; private: void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const; void StartPack(); + void InitBlocks(); + TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count); + TRope FinishBlocks(); + void UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; + const TType* const Type_; ui64 ItemCount_ = 0; TPagedBuffer::TPtr Buffer_; mutable NDetails::TPackerState State_; mutable NDetails::TPackerState IncrementalState_; + + arrow::MemoryPool& ArrowPool_; + bool IsBlock_ = false; + + TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_; + TVector<std::unique_ptr<IBlockReader>> BlockReaders_; + TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_; + TRope BlockBuffer_; + + TVector<std::unique_ptr<IBlockDeserializer>> BlockDeserializers_; }; using TValuePacker = TValuePackerGeneric<false>; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h index 60d9597252f..d71d4692f6a 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h @@ -108,6 +108,18 @@ public: } } + inline TRope ReleaseRope() { + Y_VERIFY_DEBUG(OriginalLen_ >= Len_); + Rope_.EraseFront(OriginalLen_ - Len_); + TRope result = std::move(Rope_); + + Data_ = nullptr; + Len_ = OriginalLen_ = 0; + Rope_.clear(); + + return result; + } + void Next() { Y_VERIFY_DEBUG(Len_ == 0); Rope_.EraseFront(OriginalLen_); diff --git a/ydb/library/yql/minikql/computation/ya.make.inc b/ydb/library/yql/minikql/computation/ya.make.inc index 7785d0762e9..be501a3097a 100644 --- a/ydb/library/yql/minikql/computation/ya.make.inc +++ b/ydb/library/yql/minikql/computation/ya.make.inc @@ -4,6 +4,7 @@ SRCS( mkql_block_builder.cpp mkql_block_impl.cpp mkql_block_reader.cpp + mkql_block_transport.cpp mkql_computation_node.cpp mkql_computation_node_codegen.cpp mkql_computation_node_graph.cpp @@ -20,6 +21,7 @@ SRCS( PEERDIR( contrib/libs/apache/arrow + library/cpp/actors/util library/cpp/enumbitset library/cpp/packedtypes library/cpp/random_provider |