diff options
author | aneporada <aneporada@yandex-team.com> | 2024-11-07 22:39:23 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2024-11-07 22:51:13 +0300 |
commit | 77acc99aab41efa1d6cf64879057312292355860 (patch) | |
tree | 7ddc3f2d6cf290872de90947fe275609b3783c88 | |
parent | 4b6cbdc40a3c0b3cee2bf7af07482f02334ef62c (diff) | |
download | ydb-77acc99aab41efa1d6cf64879057312292355860.tar.gz |
Minimize dependency on YDB's TRope
commit_hash:e3b74d6e744bd014dc1fa0f41053fe886f7f4a49
23 files changed, 354 insertions, 217 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 97295cc2ed..7c2ac2dc18 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -20,6 +20,7 @@ namespace NKikimr { namespace NMiniKQL { using NYql::EnsureDynamicCast; +using NYql::TChunkedBuffer; extern TStatKey Combine_FlushesCount; extern TStatKey Combine_MaxRowsCount; @@ -340,7 +341,7 @@ private: class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { typedef TComputationValue<TSpillingSupportState> TBase; typedef std::optional<NThreading::TFuture<ISpiller::TKey>> TAsyncWriteOperation; - typedef std::optional<NThreading::TFuture<std::optional<TRope>>> TAsyncReadOperation; + typedef std::optional<NThreading::TFuture<std::optional<TChunkedBuffer>>> TAsyncReadOperation; struct TSpilledBucket { std::unique_ptr<TWideUnboxedValuesSpillerAdapter> SpilledState; //state collected before switching to spilling mode @@ -632,7 +633,7 @@ private: if (!bucket.AsyncWriteOperation->HasValue()) return true; bucket.SpilledState->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue()); bucket.AsyncWriteOperation = std::nullopt; - } + } bucket.AsyncWriteOperation = bucket.SpilledState->FinishWriting(); if (bucket.AsyncWriteOperation) return true; diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp index c735fff484..80f640f038 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -81,8 +81,9 @@ struct TMyValueCompare { const std::vector<TRuntimeKeyInfo> Keys; }; +using NYql::TChunkedBuffer; using TAsyncWriteOperation = std::optional<NThreading::TFuture<ISpiller::TKey>>; -using TAsyncReadOperation = std::optional<NThreading::TFuture<std::optional<TRope>>>; +using TAsyncReadOperation = std::optional<NThreading::TFuture<std::optional<TChunkedBuffer>>>; using TStorage = std::vector<NUdf::TUnboxedValue, TMKQLAllocator<NUdf::TUnboxedValue, EMemorySubPool::Temporary>>; struct TSpilledData { diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp index 0e62fc6d24..d68029c315 100644 --- a/yql/essentials/minikql/computation/mkql_block_transport.cpp +++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp @@ -4,22 +4,23 @@ #include <yql/essentials/minikql/mkql_type_builder.h> #include <yql/essentials/public/udf/arrow/block_reader.h> #include <yql/essentials/public/udf/arrow/memory_pool.h> -#include <yql/essentials/utils/rope/rope_over_buffer.h> #include <yql/essentials/utils/yql_panic.h> namespace NKikimr::NMiniKQL { namespace { -TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) { +using NYql::TChunkedBuffer; + +TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) { MKQLArrowUntrack(owner->data()); - return NYql::MakeReadOnlyRope(owner, data, size); + return TChunkedBuffer(TStringBuf{data, size}, owner); } class TOwnedArrowBuffer : public arrow::Buffer { public: - TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner) - : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.Data()), span.Size()) + TOwnedArrowBuffer(TStringBuf span, const std::shared_ptr<const void>& owner) + : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.data()), span.size()) , Owner_(owner) { } @@ -86,7 +87,7 @@ void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMayb nullsSize = metaSource(); } -void StoreNulls(const arrow::ArrayData& data, TRope& dst) { +void StoreNulls(const arrow::ArrayData& data, TChunkedBuffer& dst) { if (!NeedStoreBitmap(data)) { return; } @@ -95,7 +96,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) { YQL_ENSURE(desiredOffset <= (size_t)data.offset); YQL_ENSURE((data.offset - desiredOffset) % 8 == 0); const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8; - dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes)); + dst.Append(MakeChunkedBufferAndUntrack(data.buffers[0], nulls, nullBytes)); } void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) { @@ -103,30 +104,41 @@ void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMayb result = metaSource(); } -std::shared_ptr<arrow::Buffer> LoadBuffer(TRope& source, TMaybe<ui64> size) { +std::shared_ptr<arrow::Buffer> LoadBuffer(TChunkedBuffer& source, TMaybe<ui64> size) { using namespace NYql::NUdf; YQL_ENSURE(size.Defined(), "Buffer size is not loaded"); if (!*size) { return MakeEmptyBuffer(); } - YQL_ENSURE(source.size() >= *size, "Premature end of data"); - auto owner = std::make_shared<TRope>(source.Begin(), source.Begin() + *size); - source.EraseFront(*size); + size_t toAppend = *size; + const TChunkedBuffer::TChunk& front = source.Front(); + if (front.Buf.size() >= toAppend && HasArrrowAlignment(front.Buf.data())) { + TStringBuf data = source.Front().Buf; + data.Trunc(toAppend); + auto result = std::make_shared<TOwnedArrowBuffer>(data, source.Front().Owner); + source.Erase(toAppend); + return result; + } - owner->Compact(); - auto span = owner->GetContiguousSpan(); - if (HasArrrowAlignment(span.Data())) { - return std::make_shared<TOwnedArrowBuffer>(span, owner); + auto result = AllocateResizableBuffer(toAppend, NYql::NUdf::GetYqlMemoryPool()); + ARROW_OK(result->Resize((int64_t)toAppend)); + uint8_t* dst = result->mutable_data(); + while (toAppend) { + const TChunkedBuffer::TChunk& front = source.Front(); + TStringBuf buf = front.Buf; + YQL_ENSURE(!buf.empty(), "Premature end of buffer"); + size_t chunk = std::min(toAppend, buf.size()); + std::memcpy(dst, buf.data(), chunk); + dst += chunk; + toAppend -= chunk; + source.Erase(chunk); } - auto result = AllocateResizableBuffer(span.Size(), NYql::NUdf::GetYqlMemoryPool()); - ARROW_OK(result->Resize((int64_t)span.Size())); - std::memcpy(result->mutable_data(), span.Data(), span.Size()); return result; } -std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TRope& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) { +std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TChunkedBuffer& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) { YQL_ENSURE(nullCount.Defined(), "Bitmap null count is not loaded"); YQL_ENSURE(bitmapSize.Defined(), "Bitmap size is not loaded"); if (*nullCount == 0) { @@ -152,7 +164,7 @@ public: DoLoadMetadata(metaSource); } - virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) final { + virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, ui64 offset) final { YQL_ENSURE(blockLen > 0, "Should be handled earlier"); std::shared_ptr<arrow::Buffer> nulls; i64 nullsCount = 0; @@ -193,7 +205,7 @@ protected: virtual void DoResetMetadata() = 0; virtual bool IsNullable() const = 0; virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0; - virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0; + virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0; std::shared_ptr<arrow::DataType> ArrowType_; TMaybe<ui64> NullsCount_; @@ -222,7 +234,7 @@ public: metaSink(dataBytes); } - void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { + void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { if constexpr (Nullable) { StoreNulls(data, dst); if (data.GetNullCount() == data.length) { @@ -233,7 +245,7 @@ public: const ui64 desiredOffset = data.offset % 8; const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize; size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize; - dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes)); + dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], buf, dataBytes)); } }; @@ -255,7 +267,7 @@ private: return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { auto data = LoadBuffer(src, DataSize_); return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset); } @@ -294,7 +306,7 @@ private: metaSink(data.buffers[2]->size()); } - void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { + void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { if constexpr (Nullable) { StoreNulls(data, dst); if (data.GetNullCount() == data.length) { @@ -305,11 +317,11 @@ private: const ui64 desiredOffset = data.offset % 8; const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset); size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset); - dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize)); + dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], offsets, offsetsSize)); const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data()); size_t mainSize = data.buffers[2]->size(); - dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize)); + dst.Append(MakeChunkedBufferAndUntrack(data.buffers[2], mainData, mainSize)); } }; @@ -330,7 +342,7 @@ private: return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { auto offsets = LoadBuffer(src, OffsetsSize_); auto data = LoadBuffer(src, DataSize_); return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset); @@ -371,7 +383,7 @@ private: } } - void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { + void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { StoreNulls(data, dst); if (data.GetNullCount() != data.length) { Inner_->StoreArray(*data.child_data[0], dst); @@ -396,7 +408,7 @@ private: return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset); } @@ -441,7 +453,7 @@ class TTupleBlockSerializerBase : public IBlockSerializer { } } - void StoreArray(const arrow::ArrayData& data, TRope& dst) const final { + void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final { if constexpr (Nullable) { StoreNulls(data, dst); } @@ -466,7 +478,7 @@ public: return result; } - void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, + void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, const IBlockSerializer::TMetadataSink& metaSink) const { for (size_t i = 0; i < Children_.size(); ++i) { @@ -474,7 +486,7 @@ public: } } - void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const { + void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TChunkedBuffer& dst) const { for (size_t i = 0; i < Children_.size(); ++i) { Children_[i]->StoreArray(*child_data[i], dst); } @@ -493,13 +505,13 @@ public: return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount(); } - void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, + void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, const IBlockSerializer::TMetadataSink& metaSink) const { DateSerialiser_.StoreMetadata(*child_data[0], metaSink); TzSerialiser_.StoreMetadata(*child_data[1], metaSink); } - void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const { + void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TChunkedBuffer& dst) const { DateSerialiser_.StoreArray(*child_data[0], dst); TzSerialiser_.StoreArray(*child_data[1], dst); } @@ -533,7 +545,7 @@ private: return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { std::vector<std::shared_ptr<arrow::ArrayData>> childData; for (auto& child : Children_) { childData.emplace_back(child->LoadArray(src, blockLen, offset)); @@ -580,7 +592,7 @@ private: return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset); } - std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { + std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final { std::vector<std::shared_ptr<arrow::ArrayData>> childData; childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset)); childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset)); diff --git a/yql/essentials/minikql/computation/mkql_block_transport.h b/yql/essentials/minikql/computation/mkql_block_transport.h index c93b439f25..bda06d5a49 100644 --- a/yql/essentials/minikql/computation/mkql_block_transport.h +++ b/yql/essentials/minikql/computation/mkql_block_transport.h @@ -2,7 +2,7 @@ #include <yql/essentials/minikql/mkql_node.h> -#include <contrib/ydb/library/actors/util/rope.h> +#include <yql/essentials/utils/chunked_buffer.h> #include <arrow/datum.h> @@ -19,7 +19,7 @@ public: using TMetadataSink = std::function<void(ui64 meta)>; virtual void StoreMetadata(const arrow::ArrayData& data, const TMetadataSink& metaSink) const = 0; - virtual void StoreArray(const arrow::ArrayData& data, TRope& dst) const = 0; + virtual void StoreArray(const arrow::ArrayData& data, NYql::TChunkedBuffer& dst) const = 0; }; class IBlockDeserializer { @@ -28,7 +28,7 @@ public: using TMetadataSource = std::function<ui64()>; virtual void LoadMetadata(const TMetadataSource& metaSource) = 0; - virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) = 0; + virtual std::shared_ptr<arrow::ArrayData> LoadArray(NYql::TChunkedBuffer& src, ui64 blockLen, ui64 offset) = 0; }; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp index 4e86b974d9..00bd3fdad2 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp @@ -11,13 +11,14 @@ #include <yql/essentials/minikql/pack_num.h> #include <yql/essentials/minikql/mkql_string_util.h> #include <yql/essentials/minikql/mkql_type_builder.h> -#include <yql/essentials/utils/rope/rope_over_buffer.h> #include <library/cpp/resource/resource.h> #include <yql/essentials/utils/fp_bits.h> #include <util/system/yassert.h> #include <util/system/sanitizers.h> +using NYql::TChunkedBuffer; + namespace NKikimr { namespace NMiniKQL { @@ -353,7 +354,7 @@ NUdf::TUnboxedValue UnpackFromChunkedBuffer(const TType* type, TChunkedInputBuff auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, i64>(buf)); ret.SetTimezoneId(UnpackData<Fast, ui16>(buf)); return ret; - } + } case NUdf::EDataSlot::Uuid: { return UnpackString(buf, 16); } @@ -921,7 +922,7 @@ bool IsUi64Scalar(const TBlockType* blockType) { if (!blockType->GetItemType()->IsData()) { return false; } - + return static_cast<const TDataType*>(blockType->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Uint64; } @@ -1113,25 +1114,25 @@ void TValuePackerTransport<Fast>::InitBlocks() { } template<bool Fast> -NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THolderFactory& holderFactory) const { +NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TChunkedBuffer&& buf, const THolderFactory& holderFactory) const { MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks"); - const size_t totalSize = buf.GetSize(); + const size_t totalSize = buf.Size(); TChunkedInputBuffer chunked(std::move(buf)); return DoUnpack<Fast>(Type_, chunked, totalSize, holderFactory, State_); } template<bool Fast> -void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { +void TValuePackerTransport<Fast>::UnpackBatch(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { if (IsBlock_) { return UnpackBatchBlocks(std::move(buf), holderFactory, result); } - const size_t totalSize = buf.GetSize(); + const size_t totalSize = buf.Size(); TChunkedInputBuffer chunked(std::move(buf)); DoUnpackBatch<Fast>(Type_, chunked, totalSize, holderFactory, IncrementalState_, result); } template<bool Fast> -TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { +TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const { MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls"); MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks"); TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>(); @@ -1143,7 +1144,7 @@ TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) con PackImpl<Fast, false>(Type_, *result, value, State_); BuildMeta(result, false); } - return TPagedBuffer::AsRope(result); + return TPagedBuffer::AsChunkedBuffer(result); } template<bool Fast> @@ -1224,8 +1225,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons PackData<false>(len, *metadataBuffer); if (!len) { // only block len should be serialized in this case - BlockBuffer_.Insert(BlockBuffer_.End(), - NYql::MakeReadOnlyRope(metadataBuffer, metadataBuffer->data(), metadataBuffer->size())); + BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer); ++ItemCount_; return *this; } @@ -1253,7 +1253,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons } else { MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar"); if (!ConvertedScalars_[i]) { - const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : + const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : static_cast<const TMultiType*>(Type_)->GetElementType(i); datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_); MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array"); @@ -1280,8 +1280,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error"); - BlockBuffer_.Insert(BlockBuffer_.End(), - NYql::MakeReadOnlyRope(metadataBuffer, metadataBuffer->data(), metadataBuffer->size())); + BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer); // save buffers for (size_t i = 0; i < width; ++i) { if (i != BlockLenIndex_) { @@ -1293,8 +1292,8 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons } template<bool Fast> -void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { - while (!buf.empty()) { +void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { + while (!buf.Empty()) { TChunkedInputBuffer chunked(std::move(buf)); // unpack block length @@ -1330,7 +1329,7 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa } } MKQL_ENSURE(metaCount == 0, "Partial buffers read"); - TRope ropeTail = chunked.ReleaseRope(); + TChunkedBuffer ropeTail = chunked.ReleaseRope(); // unpack buffers auto producer = [&](ui32 i) { @@ -1341,7 +1340,7 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]); if (isScalar) { TBlockItem item = BlockReaders_[i]->GetItem(*array, 0); - const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : + const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) : static_cast<const TMultiType*>(Type_)->GetElementType(i); return holderFactory.CreateArrowBlock(ConvertScalar(static_cast<const TBlockType*>(itemType)->GetItemType(), item, ArrowPool_)); } @@ -1367,12 +1366,12 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa template<bool Fast> void TValuePackerTransport<Fast>::Clear() { Buffer_.reset(); - BlockBuffer_.clear(); + BlockBuffer_.Clear(); ItemCount_ = 0; } template<bool Fast> -TRope TValuePackerTransport<Fast>::Finish() { +TChunkedBuffer TValuePackerTransport<Fast>::Finish() { if (IsBlock_) { return FinishBlocks(); } @@ -1389,12 +1388,12 @@ TRope TValuePackerTransport<Fast>::Finish() { } TPagedBuffer::TPtr result = std::move(Buffer_); Clear(); - return TPagedBuffer::AsRope(result); + return TPagedBuffer::AsChunkedBuffer(result); } template<bool Fast> -TRope TValuePackerTransport<Fast>::FinishBlocks() { - TRope result = std::move(BlockBuffer_); +TChunkedBuffer TValuePackerTransport<Fast>::FinishBlocks() { + TChunkedBuffer result = std::move(BlockBuffer_); Clear(); return result; } diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h index e09679b470..5dc47ada2d 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h @@ -10,7 +10,7 @@ #include <yql/essentials/public/udf/udf_value.h> #include <library/cpp/enumbitset/enumbitset.h> -#include <contrib/ydb/library/actors/util/rope.h> +#include <yql/essentials/utils/chunked_buffer.h> #include <util/stream/output.h> #include <util/generic/buffer.h> @@ -83,7 +83,7 @@ public: TSelf& AddItem(const NUdf::TUnboxedValuePod& value); TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count); size_t PackedSizeEstimate() const { - return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0); + return IsBlock_ ? BlockBuffer_.Size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0); } bool IsEmpty() const { @@ -91,20 +91,20 @@ public: } void Clear(); - TRope Finish(); + NYql::TChunkedBuffer Finish(); // Pack()/Unpack() will pack/unpack single value of type T - TRope Pack(const NUdf::TUnboxedValuePod& value) const; - NUdf::TUnboxedValue Unpack(TRope&& buf, const THolderFactory& holderFactory) const; - void UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; + NYql::TChunkedBuffer Pack(const NUdf::TUnboxedValuePod& value) const; + NUdf::TUnboxedValue Unpack(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory) const; + void UnpackBatch(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; private: void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const; void StartPack(); void InitBlocks(); TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count); - TRope FinishBlocks(); - void UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; + NYql::TChunkedBuffer FinishBlocks(); + void UnpackBatchBlocks(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; const TType* const Type_; ui64 ItemCount_ = 0; @@ -120,7 +120,7 @@ private: TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_; TVector<std::unique_ptr<IBlockReader>> BlockReaders_; TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_; - TRope BlockBuffer_; + NYql::TChunkedBuffer BlockBuffer_; TVector<std::unique_ptr<IBlockDeserializer>> BlockDeserializers_; }; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h b/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h index bb675423fe..856898c697 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h @@ -5,9 +5,9 @@ #include <yql/essentials/minikql/pack_num.h> #include <yql/essentials/public/decimal/yql_decimal_serialize.h> #include <yql/essentials/public/udf/udf_value.h> +#include <yql/essentials/utils/chunked_buffer.h> #include <library/cpp/packedtypes/zigzag.h> -#include <contrib/ydb/library/actors/util/rope.h> #include <util/generic/buffer.h> #include <util/generic/strbuf.h> @@ -69,14 +69,14 @@ void PackDecimal(NYql::NDecimal::TInt128 val, TBuf& buf) { class TChunkedInputBuffer : private TNonCopyable { public: - explicit TChunkedInputBuffer(TRope&& rope) + explicit TChunkedInputBuffer(NYql::TChunkedBuffer&& rope) : Rope_(std::move(rope)) { Next(); } explicit TChunkedInputBuffer(TStringBuf input) - : Rope_(TRope{}) + : Rope_(NYql::TChunkedBuffer{}) , Data_(input.data()) , Len_(input.size()) { @@ -117,24 +117,24 @@ public: } } - inline TRope ReleaseRope() { + inline NYql::TChunkedBuffer ReleaseRope() { Y_DEBUG_ABORT_UNLESS(OriginalLen_ >= Len_); - Rope_.EraseFront(OriginalLen_ - Len_); - TRope result = std::move(Rope_); + Rope_.Erase(OriginalLen_ - Len_); + NYql::TChunkedBuffer result = std::move(Rope_); Data_ = nullptr; Len_ = OriginalLen_ = 0; - Rope_.clear(); + Rope_.Clear(); return result; } void Next() { Y_DEBUG_ABORT_UNLESS(Len_ == 0); - Rope_.EraseFront(OriginalLen_); - if (!Rope_.IsEmpty()) { - Len_ = OriginalLen_ = Rope_.begin().ContiguousSize(); - Data_ = Rope_.begin().ContiguousData(); + Rope_.Erase(OriginalLen_); + if (!Rope_.Empty()) { + Len_ = OriginalLen_ = Rope_.Front().Buf.size(); + Data_ = Rope_.Front().Buf.data(); Y_DEBUG_ABORT_UNLESS(Len_ > 0); } else { Len_ = OriginalLen_ = 0; @@ -157,7 +157,7 @@ private: } } - TRope Rope_; + NYql::TChunkedBuffer Rope_; const char* Data_ = nullptr; size_t Len_ = 0; size_t OriginalLen_ = 0; diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp index 1747fd0ea7..b689e4cf8b 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -27,6 +27,8 @@ namespace NKikimr { namespace NMiniKQL { +using NYql::TChunkedBuffer; + #ifdef WITH_VALGRIND constexpr static size_t PERFORMANCE_COUNT = 0x1000; #elif defined(NDEBUG) @@ -314,8 +316,8 @@ protected: TestVariantTypeImpl(PgmBuilder.NewVariantType(tupleType)); } - void ValidateEmbeddedLength(TRope buf, const TString& info) { - size_t size = buf.GetSize(); + void ValidateEmbeddedLength(TChunkedBuffer buf, const TString& info) { + size_t size = buf.Size(); TChunkedInputBuffer chunked(std::move(buf)); return ValidateEmbeddedLength(chunked, size, info); } @@ -352,13 +354,13 @@ protected: auto packedValue = packer.Pack(uValue); if constexpr (Transport) { if (expectedLength) { - UNIT_ASSERT_VALUES_EQUAL_C(packedValue.size(), *expectedLength, additionalMsg); + UNIT_ASSERT_VALUES_EQUAL_C(packedValue.Size(), *expectedLength, additionalMsg); } ValidateEmbeddedLength(packedValue, additionalMsg); return packer.Unpack(std::move(packedValue), HolderFactory); } else { if (expectedLength) { - UNIT_ASSERT_VALUES_EQUAL_C(packedValue.size(), *expectedLength, additionalMsg); + UNIT_ASSERT_VALUES_EQUAL_C(packedValue.Size(), *expectedLength, additionalMsg); } ValidateEmbeddedLength(packedValue, additionalMsg); return packer.Unpack(packedValue, HolderFactory); @@ -591,7 +593,10 @@ protected: auto buffer = packer.Pack(value); - TString packed = buffer.ConvertToString(); + TString packed; + TStringOutput sout(packed); + buffer.CopyTo(sout); + TStringBuf packedBuf(packed); if constexpr (Fast) { UNIT_ASSERT_VALUES_EQUAL(packed.size(), 73); @@ -600,14 +605,14 @@ protected: } for (size_t chunk = 1; chunk < packed.size(); ++chunk) { - TString first = packed.substr(0, chunk); - TString second = packed.substr(chunk); + TStringBuf first = packedBuf.substr(0, chunk); + TStringBuf second = packedBuf.substr(chunk); - TRope result(std::move(first)); - result.Insert(result.End(), TRope(std::move(second))); + TChunkedBuffer result(first, {}); + result.Append(second, {}); - UNIT_ASSERT_VALUES_EQUAL(result.size(), packed.size()); - UNIT_ASSERT(!result.IsContiguous()); + UNIT_ASSERT_VALUES_EQUAL(result.Size(), packed.size()); + UNIT_ASSERT(result.Size() != result.ContigousSize()); ValidateTupleValue(packer.Unpack(std::move(result), HolderFactory)); } @@ -632,7 +637,7 @@ protected: auto serialized = packer.Finish(); - auto listObj = listPacker.Unpack(TRope(serialized), HolderFactory); + auto listObj = listPacker.Unpack(TChunkedBuffer(serialized), HolderFactory); UNIT_ASSERT_VALUES_EQUAL(listObj.GetListLength(), count); const auto iter = listObj.GetListIterator(); for (NUdf::TUnboxedValue uVal; iter.Next(uVal);) { @@ -666,7 +671,7 @@ protected: auto scalarOptStrType = PgmBuilder.NewBlockType(optStrType, TBlockType::EShape::Scalar); auto blockOptTupleOptUi32StrType = PgmBuilder.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many); auto scalarUi64Type = PgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); - + auto tzDateType = PgmBuilder.NewDataType(NUdf::EDataSlot::TzDate); auto blockTzDateType = PgmBuilder.NewBlockType(tzDateType, TBlockType::EShape::Many); @@ -703,7 +708,7 @@ protected: TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) }; TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem(); builder3->Add(b3); - + TBlockItem tzDate {i}; tzDate.SetTimezoneId(i % 100); builder4->Add(tzDate); @@ -749,7 +754,7 @@ protected: } else { packer.AddWideItem(columns.data(), columns.size()); } - TRope packed = packer.Finish(); + TChunkedBuffer packed = packer.Finish(); TUnboxedValueBatch unpacked(rowType); packer.UnpackBatch(std::move(packed), HolderFactory, unpacked); diff --git a/yql/essentials/minikql/computation/mkql_spiller.h b/yql/essentials/minikql/computation/mkql_spiller.h index 0516109c04..e0ce2706d8 100644 --- a/yql/essentials/minikql/computation/mkql_spiller.h +++ b/yql/essentials/minikql/computation/mkql_spiller.h @@ -1,7 +1,7 @@ #pragma once #include <library/cpp/threading/future/core/future.h> -#include <contrib/ydb/library/actors/util/rope.h> +#include <yql/essentials/utils/chunked_buffer.h> namespace NKikimr::NMiniKQL { @@ -10,16 +10,16 @@ struct ISpiller using TPtr = std::shared_ptr<ISpiller>; virtual ~ISpiller(){} using TKey = ui64; - virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0; + virtual NThreading::TFuture<TKey> Put(NYql::TChunkedBuffer&& blob) = 0; ///\return /// nullopt for absent keys /// TFuture - virtual NThreading::TFuture<std::optional<TRope>> Get(TKey key) = 0; + virtual NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Get(TKey key) = 0; virtual NThreading::TFuture<void> Delete(TKey) = 0; ///Get + Delete ///Stored value may be moved to future - virtual NThreading::TFuture<std::optional<TRope>> Extract(TKey key) = 0; + virtual NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Extract(TKey key) = 0; }; }//namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h index a9a7323853..462a2a7c5e 100644 --- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h +++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h @@ -50,7 +50,7 @@ public: bool Empty() const { return StoredChunks.empty() && !CurrentBatch; } - std::optional<NThreading::TFuture<std::optional<TRope>>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) { + std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) { MKQL_ENSURE(!Empty(), "Internal logic error"); if (CurrentBatch) { auto row = CurrentBatch->Head(); @@ -69,7 +69,7 @@ public: } } - void AsyncReadCompleted(TRope&& rope,const THolderFactory& holderFactory ) { + void AsyncReadCompleted(NYql::TChunkedBuffer&& rope,const THolderFactory& holderFactory ) { //Implementation detail: deserialization is performed in a processing thread TUnboxedValueBatch batch(ItemType); Packer.UnpackBatch(std::move(rope), holderFactory, batch); diff --git a/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h index 1663397107..8c9896cf98 100644 --- a/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h +++ b/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h @@ -4,7 +4,6 @@ #include <yql/essentials/minikql/defs.h> #include <yql/essentials/minikql/computation/mkql_spiller.h> -#include <contrib/ydb/library/actors/util/rope.h> #include <yql/essentials/minikql/mkql_alloc.h> namespace NKikimr::NMiniKQL { @@ -63,7 +62,7 @@ public: SaveNextPartOfVector(); } - ///Should be used to update async operatrions statuses. + ///Should be used to update async operatrions statuses. ///For SpillingData state it will try to spill more content of inner buffer. ///ForRestoringData state it will try to load more content of requested vector. void Update() { @@ -125,7 +124,7 @@ public: ///Is case if buffer is not ready async write operation will be started. void Finalize() { MKQL_ENSURE(CurrentVector.empty(), "Internal logic error"); - if (Buffer.empty()) { + if (Buffer.Empty()) { State = EState::AcceptingDataRequests; return; } @@ -135,12 +134,24 @@ public: } private: - - void CopyRopeToTheEndOfVector(std::vector<T, Alloc>& vec, TRope& rope) { - for (auto it = rope.begin(); it != rope.end(); ++it) { - const T* data = reinterpret_cast<const T*>(it.ContiguousData()); - vec.insert(vec.end(), data, data + it.ContiguousSize() / sizeof(T)); // size is always multiple of sizeof(T) + class TVectorStream : public IOutputStream { + public: + explicit TVectorStream(std::vector<T, Alloc>& vec) + : Dst_(vec) + { + } + private: + virtual void DoWrite(const void* buf, size_t len) override { + MKQL_ENSURE(len % sizeof(T) == 0, "size should always by multiple of sizeof(T)"); + const T* data = reinterpret_cast<const T*>(buf); + Dst_.insert(Dst_.end(), data, data + len / sizeof(T)); } + std::vector<T, Alloc>& Dst_; + }; + + void CopyRopeToTheEndOfVector(std::vector<T, Alloc>& vec, const NYql::TChunkedBuffer& rope, size_t toCopy = std::numeric_limits<size_t>::max()) { + TVectorStream out(vec); + rope.CopyTo(out, toCopy); } void LoadNextVector() { @@ -148,10 +159,10 @@ private: MKQL_ENSURE(requestedVectorSize >= CurrentVector.size(), "Internal logic error"); size_t sizeToLoad = (requestedVectorSize - CurrentVector.size()) * sizeof(T); - if (Buffer.size() >= sizeToLoad) { + if (Buffer.Size() >= sizeToLoad) { // if all the data for requested vector is ready - TRope remainingPartOfVector = Buffer.Extract(Buffer.Position(0), Buffer.Position(sizeToLoad)); - CopyRopeToTheEndOfVector(CurrentVector, remainingPartOfVector); + CopyRopeToTheEndOfVector(CurrentVector, Buffer, sizeToLoad); + Buffer.Erase(sizeToLoad); State = EState::DataReady; } else { CopyRopeToTheEndOfVector(CurrentVector, Buffer); @@ -165,13 +176,13 @@ private: } void AddDataToRope(const T* data, size_t count) { - TRope tmp = TRope::Uninitialized(count * sizeof(T)); - TRopeUtils::Memcpy(tmp.begin(), reinterpret_cast<const char*>(data), count * sizeof(T)); - Buffer.Insert(Buffer.End(), std::move(tmp)); + auto owner = std::make_shared<std::vector<T>>(data, data + count); + TStringBuf buf(reinterpret_cast<const char *>(owner->data()), count * sizeof(T)); + Buffer.Append(buf, owner); } void SaveNextPartOfVector() { - size_t maxFittingElemets = (SizeLimit - Buffer.size()) / sizeof(T); + size_t maxFittingElemets = (SizeLimit - Buffer.Size()) / sizeof(T); size_t remainingElementsInVector = CurrentVector.size() - NextVectorPositionToSave; size_t elementsToCopyFromVector = std::min(maxFittingElemets, remainingElementsInVector); @@ -183,7 +194,7 @@ private: NextVectorPositionToSave = 0; } - if (SizeLimit - Buffer.size() < sizeof(T)) { + if (SizeLimit - Buffer.Size() < sizeof(T)) { SaveBuffer(); return; } @@ -193,22 +204,22 @@ private: private: EState State = EState::AcceptingData; - + ISpiller::TPtr Spiller; const size_t SizeLimit; - TRope Buffer; + NYql::TChunkedBuffer Buffer; // Used to store vector while spilling and also used while restoring the data std::vector<T, Alloc> CurrentVector; size_t NextVectorPositionToSave = 0; - std::queue<ISpiller::TKey> StoredChunks; + std::queue<ISpiller::TKey> StoredChunks; std::queue<size_t> StoredChunksElementsCount; std::optional<NThreading::TFuture<ISpiller::TKey>> WriteOperation = std::nullopt; - std::optional<NThreading::TFuture<std::optional<TRope>>> ReadOperation = std::nullopt; + std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ReadOperation = std::nullopt; bool IsFinalizing = false; }; -}//namespace NKikimr::NMiniKQL
\ No newline at end of file +}//namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h index 2264d6d215..9ea74569b1 100644 --- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h @@ -5,8 +5,6 @@ namespace NKikimr::NMiniKQL { -using namespace NActors; - class TMockSpillerFactory : public ISpillerFactory { public: @@ -18,4 +16,4 @@ public: } }; -} // namespace NKikimr::NMiniKQL
\ No newline at end of file +} // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mock_spiller_ut.h b/yql/essentials/minikql/computation/mock_spiller_ut.h index daf2ab6067..42846eab1f 100644 --- a/yql/essentials/minikql/computation/mock_spiller_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_ut.h @@ -14,7 +14,7 @@ public: : NextKey(0) {} - NThreading::TFuture<TKey> Put(TRope&& blob) override { + NThreading::TFuture<TKey> Put(NYql::TChunkedBuffer&& blob) override { auto promise = NThreading::NewPromise<ISpiller::TKey>(); auto key = NextKey; @@ -24,8 +24,8 @@ public: return promise.GetFuture();; } - NThreading::TFuture<std::optional<TRope>> Get(TKey key) override { - auto promise = NThreading::NewPromise<std::optional<TRope>>(); + NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Get(TKey key) override { + auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>(); if (auto it = Storage.find(key); it != Storage.end()) { promise.SetValue(it->second); } else { @@ -35,8 +35,8 @@ public: return promise.GetFuture(); } - NThreading::TFuture<std::optional<TRope>> Extract(TKey key) override { - auto promise = NThreading::NewPromise<std::optional<TRope>>(); + NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Extract(TKey key) override { + auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>(); if (auto it = Storage.find(key); it != Storage.end()) { promise.SetValue(std::move(it->second)); Storage.erase(it); @@ -54,10 +54,10 @@ public: } private: ISpiller::TKey NextKey; - std::unordered_map<ISpiller::TKey, TRope> Storage; + std::unordered_map<ISpiller::TKey, NYql::TChunkedBuffer> Storage; }; inline ISpiller::TPtr CreateMockSpiller() { return std::make_shared<TMockSpiller>(); } -} //namespace NKikimr::NMiniKQL
\ No newline at end of file +} //namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/ya.make.inc b/yql/essentials/minikql/computation/ya.make.inc index ccd6b78115..7a663f1a46 100644 --- a/yql/essentials/minikql/computation/ya.make.inc +++ b/yql/essentials/minikql/computation/ya.make.inc @@ -24,7 +24,6 @@ PEERDIR( yql/essentials/parser/pg_wrapper/interface yql/essentials/public/udf yql/essentials/utils - yql/essentials/utils/rope library/cpp/threading/future ) @@ -39,7 +38,7 @@ COPY( AUTO FROM ${ORIG_SRC_DIR} ${ORIG_SOURCES} - OUTPUT_INCLUDES + OUTPUT_INCLUDES ${BINDIR}/yql/essentials/minikql/computation/mkql_llvm_base.h ${BINDIR}/yql/essentials/minikql/computation/mkql_computation_node_codegen.h ) diff --git a/yql/essentials/minikql/mkql_buffer.cpp b/yql/essentials/minikql/mkql_buffer.cpp index ed64d5b412..c47b8648b2 100644 --- a/yql/essentials/minikql/mkql_buffer.cpp +++ b/yql/essentials/minikql/mkql_buffer.cpp @@ -1,7 +1,5 @@ #include "mkql_buffer.h" -#include <yql/essentials/utils/rope/rope_over_buffer.h> - namespace NKikimr { namespace NMiniKQL { @@ -46,10 +44,11 @@ void TPagedBuffer::AppendPage() { Tail_ = page->Data(); } -TRope TPagedBuffer::AsRope(const TConstPtr& buffer) { - TRope result; +using NYql::TChunkedBuffer; +TChunkedBuffer TPagedBuffer::AsChunkedBuffer(const TConstPtr& buffer) { + TChunkedBuffer result; buffer->ForEachPage([&](const char* data, size_t size) { - result.Insert(result.End(), NYql::MakeReadOnlyRope(buffer, data, size)); + result.Append(TStringBuf(data, size), buffer); }); return result; diff --git a/yql/essentials/minikql/mkql_buffer.h b/yql/essentials/minikql/mkql_buffer.h index 9d96d0350f..a1cf4d5aab 100644 --- a/yql/essentials/minikql/mkql_buffer.h +++ b/yql/essentials/minikql/mkql_buffer.h @@ -2,7 +2,7 @@ #include "defs.h" -#include <contrib/ydb/library/actors/util/rope.h> +#include <yql/essentials/utils/chunked_buffer.h> #include <util/generic/noncopyable.h> #include <util/stream/output.h> @@ -214,7 +214,7 @@ class TPagedBuffer : private TNonCopyable { } } - static TRope AsRope(const TConstPtr& buf); + static NYql::TChunkedBuffer AsChunkedBuffer(const TConstPtr& buf); private: void AppendPage(); diff --git a/yql/essentials/minikql/ya.make b/yql/essentials/minikql/ya.make index e1d1f83ccb..7060a6a86b 100644 --- a/yql/essentials/minikql/ya.make +++ b/yql/essentials/minikql/ya.make @@ -72,7 +72,6 @@ PEERDIR( yql/essentials/public/udf yql/essentials/public/udf/tz yql/essentials/utils - yql/essentials/utils/rope yql/essentials/core/sql_types ) diff --git a/yql/essentials/utils/chunked_buffer.cpp b/yql/essentials/utils/chunked_buffer.cpp new file mode 100644 index 0000000000..906da8dcae --- /dev/null +++ b/yql/essentials/utils/chunked_buffer.cpp @@ -0,0 +1,123 @@ +#include "chunked_buffer.h" + +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql { + +TChunkedBuffer::TChunkedBuffer(TChunkedBuffer&& other) { + Items_ = std::move(other.Items_); +} + +TChunkedBuffer& TChunkedBuffer::operator=(TChunkedBuffer&& other) { + Items_ = std::move(other.Items_); + return *this; +} + +TChunkedBuffer::TChunkedBuffer(TStringBuf buf, const std::shared_ptr<const void>& owner) { + Append(buf, owner); +} + +TChunkedBuffer::TChunkedBuffer(TString&& str) { + Append(std::move(str)); +} + +const TChunkedBuffer::TChunk& TChunkedBuffer::Front() const { + YQL_ENSURE(!Items_.empty()); + return Items_.front(); +} + +size_t TChunkedBuffer::CopyTo(IOutputStream& dst, size_t toCopy) const { + size_t copied = 0; + for (auto& chunk : Items_) { + if (!toCopy) { + break; + } + size_t copyChunk = std::min(chunk.Buf.size(), toCopy); + dst.Write(chunk.Buf.data(), copyChunk); + toCopy -= copyChunk; + copied += copyChunk; + } + return copied; +} + +size_t TChunkedBuffer::ContigousSize() const { + return Items_.empty() ? 0 : Front().Buf.size(); +} + +size_t TChunkedBuffer::Size() const { + size_t result = 0; + for (auto& item : Items_) { + result += item.Buf.size(); + } + return result; +} + +bool TChunkedBuffer::Empty() const { + return Items_.empty(); +} + +TChunkedBuffer& TChunkedBuffer::Append(TStringBuf buf, const std::shared_ptr<const void>& owner) { + if (!buf.empty()) { + Items_.emplace_back(TChunk{buf, owner}); + } + return *this; +} + +TChunkedBuffer& TChunkedBuffer::Append(TString&& str) { + if (!str.empty()) { + auto owner = std::make_shared<TString>(std::move(str)); + Items_.emplace_back(TChunk{*owner, owner}); + } + return *this; +} + +TChunkedBuffer& TChunkedBuffer::Append(TChunkedBuffer&& other) { + while (!other.Items_.empty()) { + Items_.emplace_back(std::move(other.Items_.front())); + other.Items_.pop_front(); + } + return *this; +} + +TChunkedBuffer& TChunkedBuffer::Clear() { + Items_.clear(); + return *this; +} + +TChunkedBuffer& TChunkedBuffer::Erase(size_t size) { + while (size && !Items_.empty()) { + TStringBuf& buf = Items_.front().Buf; + size_t toErase = std::min(buf.size(), size); + buf.Skip(toErase); + size -= toErase; + if (buf.empty()) { + Items_.pop_front(); + } + } + return *this; +} + +TChunkedBufferOutput::TChunkedBufferOutput(TChunkedBuffer& dst) + : Dst_(dst) +{ +} + +void TChunkedBufferOutput::DoWrite(const void* buf, size_t len) { + TString str(static_cast<const char*>(buf), len); + Dst_.Append(std::move(str)); +} + +TChunkedBuffer CopyData(const TChunkedBuffer& src) { + TChunkedBuffer result; + TChunkedBufferOutput out(result); + src.CopyTo(out); + return result; +} + +TChunkedBuffer CopyData(TChunkedBuffer&& src) { + TChunkedBuffer result = CopyData(src); + src.Clear(); + return result; +} + +} diff --git a/yql/essentials/utils/chunked_buffer.h b/yql/essentials/utils/chunked_buffer.h new file mode 100644 index 0000000000..6f5b2a8ea7 --- /dev/null +++ b/yql/essentials/utils/chunked_buffer.h @@ -0,0 +1,56 @@ +#pragma once + +#include <util/generic/strbuf.h> +#include <util/generic/string.h> +#include <util/stream/output.h> + +#include <memory> +#include <deque> + +namespace NYql { + +class TChunkedBuffer { +public: + TChunkedBuffer() = default; + TChunkedBuffer(const TChunkedBuffer&) = default; + TChunkedBuffer(TChunkedBuffer&& other); + TChunkedBuffer& operator=(TChunkedBuffer&& other); + explicit TChunkedBuffer(TStringBuf buf, const std::shared_ptr<const void>& owner); + explicit TChunkedBuffer(TString&& str); + + size_t ContigousSize() const; + size_t Size() const; + bool Empty() const; + + struct TChunk { + TStringBuf Buf; + std::shared_ptr<const void> Owner; + }; + + const TChunk& Front() const; + size_t CopyTo(IOutputStream& dst, size_t toCopy = std::numeric_limits<size_t>::max()) const; + + TChunkedBuffer& Append(TStringBuf buf, const std::shared_ptr<const void>& owner); + TChunkedBuffer& Append(TString&& str); + TChunkedBuffer& Append(TChunkedBuffer&& other); + + TChunkedBuffer& Clear(); + TChunkedBuffer& Erase(size_t size); + +private: + std::deque<TChunk> Items_; +}; + +class TChunkedBufferOutput : public IOutputStream { +public: + explicit TChunkedBufferOutput(TChunkedBuffer& dst); +private: + virtual void DoWrite(const void *buf, size_t len) override; + + TChunkedBuffer& Dst_; +}; + +TChunkedBuffer CopyData(const TChunkedBuffer& src); +TChunkedBuffer CopyData(TChunkedBuffer&& src); + +} diff --git a/yql/essentials/utils/rope/rope_over_buffer.cpp b/yql/essentials/utils/rope/rope_over_buffer.cpp deleted file mode 100644 index e8074372fd..0000000000 --- a/yql/essentials/utils/rope/rope_over_buffer.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include "rope_over_buffer.h" - -#include <yql/essentials/utils/yql_panic.h> - -namespace NYql { - -namespace { - -class TContigousChunkOverBuf : public IContiguousChunk { -public: - TContigousChunkOverBuf(const std::shared_ptr<const void>& owner, TContiguousSpan span) - : Owner_(owner) - , Span_(span) - { - } -private: - TContiguousSpan GetData() const override { - return Span_; - } - - TMutableContiguousSpan GetDataMut() override { - YQL_ENSURE(false, "Payload mutation is not supported"); - } - - size_t GetOccupiedMemorySize() const override { - return Span_.GetSize(); - } - - const std::shared_ptr<const void> Owner_; - const TContiguousSpan Span_; -}; - -} // namespace - - -TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size) { - if (!size) { - return TRope(); - } - return TRope(new TContigousChunkOverBuf(owner, {data, size})); -} - -} // namespace NYql diff --git a/yql/essentials/utils/rope/rope_over_buffer.h b/yql/essentials/utils/rope/rope_over_buffer.h deleted file mode 100644 index 9d18e52263..0000000000 --- a/yql/essentials/utils/rope/rope_over_buffer.h +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -#include <contrib/ydb/library/actors/util/rope.h> - -#include <memory> - -namespace NYql { - -TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size); - -}
\ No newline at end of file diff --git a/yql/essentials/utils/rope/ya.make b/yql/essentials/utils/rope/ya.make deleted file mode 100644 index 9b92a50c48..0000000000 --- a/yql/essentials/utils/rope/ya.make +++ /dev/null @@ -1,13 +0,0 @@ -LIBRARY() - -SRCS( - rope_over_buffer.cpp -) - -PEERDIR( - contrib/ydb/library/actors/util - yql/essentials/utils -) - -END() - diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make index 8e40de1843..8f0181520a 100644 --- a/yql/essentials/utils/ya.make +++ b/yql/essentials/utils/ya.make @@ -2,6 +2,8 @@ LIBRARY() SRCS( cast.h + chunked_buffer.cpp + chunked_buffer.h debug_info.cpp debug_info.h exceptions.cpp @@ -59,7 +61,6 @@ IF (OPENSOURCE_PROJECT != "yt") fetch log network - rope signals sys test_http_server |