aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2024-11-07 22:39:23 +0300
committeraneporada <aneporada@yandex-team.com>2024-11-07 22:51:13 +0300
commit77acc99aab41efa1d6cf64879057312292355860 (patch)
tree7ddc3f2d6cf290872de90947fe275609b3783c88
parent4b6cbdc40a3c0b3cee2bf7af07482f02334ef62c (diff)
downloadydb-77acc99aab41efa1d6cf64879057312292355860.tar.gz
Minimize dependency on YDB's TRope
commit_hash:e3b74d6e744bd014dc1fa0f41053fe886f7f4a49
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp5
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp3
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.cpp86
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.h6
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.cpp45
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack.h18
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h24
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp35
-rw-r--r--yql/essentials/minikql/computation/mkql_spiller.h8
-rw-r--r--yql/essentials/minikql/computation/mkql_spiller_adapter.h4
-rw-r--r--yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h53
-rw-r--r--yql/essentials/minikql/computation/mock_spiller_factory_ut.h4
-rw-r--r--yql/essentials/minikql/computation/mock_spiller_ut.h14
-rw-r--r--yql/essentials/minikql/computation/ya.make.inc3
-rw-r--r--yql/essentials/minikql/mkql_buffer.cpp9
-rw-r--r--yql/essentials/minikql/mkql_buffer.h4
-rw-r--r--yql/essentials/minikql/ya.make1
-rw-r--r--yql/essentials/utils/chunked_buffer.cpp123
-rw-r--r--yql/essentials/utils/chunked_buffer.h56
-rw-r--r--yql/essentials/utils/rope/rope_over_buffer.cpp43
-rw-r--r--yql/essentials/utils/rope/rope_over_buffer.h11
-rw-r--r--yql/essentials/utils/rope/ya.make13
-rw-r--r--yql/essentials/utils/ya.make3
23 files changed, 354 insertions, 217 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index 97295cc2ed..7c2ac2dc18 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -20,6 +20,7 @@ namespace NKikimr {
namespace NMiniKQL {
using NYql::EnsureDynamicCast;
+using NYql::TChunkedBuffer;
extern TStatKey Combine_FlushesCount;
extern TStatKey Combine_MaxRowsCount;
@@ -340,7 +341,7 @@ private:
class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
typedef TComputationValue<TSpillingSupportState> TBase;
typedef std::optional<NThreading::TFuture<ISpiller::TKey>> TAsyncWriteOperation;
- typedef std::optional<NThreading::TFuture<std::optional<TRope>>> TAsyncReadOperation;
+ typedef std::optional<NThreading::TFuture<std::optional<TChunkedBuffer>>> TAsyncReadOperation;
struct TSpilledBucket {
std::unique_ptr<TWideUnboxedValuesSpillerAdapter> SpilledState; //state collected before switching to spilling mode
@@ -632,7 +633,7 @@ private:
if (!bucket.AsyncWriteOperation->HasValue()) return true;
bucket.SpilledState->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue());
bucket.AsyncWriteOperation = std::nullopt;
- }
+ }
bucket.AsyncWriteOperation = bucket.SpilledState->FinishWriting();
if (bucket.AsyncWriteOperation) return true;
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
index c735fff484..80f640f038 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
@@ -81,8 +81,9 @@ struct TMyValueCompare {
const std::vector<TRuntimeKeyInfo> Keys;
};
+using NYql::TChunkedBuffer;
using TAsyncWriteOperation = std::optional<NThreading::TFuture<ISpiller::TKey>>;
-using TAsyncReadOperation = std::optional<NThreading::TFuture<std::optional<TRope>>>;
+using TAsyncReadOperation = std::optional<NThreading::TFuture<std::optional<TChunkedBuffer>>>;
using TStorage = std::vector<NUdf::TUnboxedValue, TMKQLAllocator<NUdf::TUnboxedValue, EMemorySubPool::Temporary>>;
struct TSpilledData {
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp
index 0e62fc6d24..d68029c315 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp
@@ -4,22 +4,23 @@
#include <yql/essentials/minikql/mkql_type_builder.h>
#include <yql/essentials/public/udf/arrow/block_reader.h>
#include <yql/essentials/public/udf/arrow/memory_pool.h>
-#include <yql/essentials/utils/rope/rope_over_buffer.h>
#include <yql/essentials/utils/yql_panic.h>
namespace NKikimr::NMiniKQL {
namespace {
-TRope MakeReadOnlyRopeAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
+using NYql::TChunkedBuffer;
+
+TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
MKQLArrowUntrack(owner->data());
- return NYql::MakeReadOnlyRope(owner, data, size);
+ return TChunkedBuffer(TStringBuf{data, size}, owner);
}
class TOwnedArrowBuffer : public arrow::Buffer {
public:
- TOwnedArrowBuffer(TContiguousSpan span, const std::shared_ptr<const void>& owner)
- : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.Data()), span.Size())
+ TOwnedArrowBuffer(TStringBuf span, const std::shared_ptr<const void>& owner)
+ : arrow::Buffer(reinterpret_cast<const uint8_t*>(span.data()), span.size())
, Owner_(owner)
{
}
@@ -86,7 +87,7 @@ void LoadNullsSizes(const IBlockDeserializer::TMetadataSource& metaSource, TMayb
nullsSize = metaSource();
}
-void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
+void StoreNulls(const arrow::ArrayData& data, TChunkedBuffer& dst) {
if (!NeedStoreBitmap(data)) {
return;
}
@@ -95,7 +96,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
- dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[0], nulls, nullBytes));
+ dst.Append(MakeChunkedBufferAndUntrack(data.buffers[0], nulls, nullBytes));
}
void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMaybe<ui64>& result) {
@@ -103,30 +104,41 @@ void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMayb
result = metaSource();
}
-std::shared_ptr<arrow::Buffer> LoadBuffer(TRope& source, TMaybe<ui64> size) {
+std::shared_ptr<arrow::Buffer> LoadBuffer(TChunkedBuffer& source, TMaybe<ui64> size) {
using namespace NYql::NUdf;
YQL_ENSURE(size.Defined(), "Buffer size is not loaded");
if (!*size) {
return MakeEmptyBuffer();
}
- YQL_ENSURE(source.size() >= *size, "Premature end of data");
- auto owner = std::make_shared<TRope>(source.Begin(), source.Begin() + *size);
- source.EraseFront(*size);
+ size_t toAppend = *size;
+ const TChunkedBuffer::TChunk& front = source.Front();
+ if (front.Buf.size() >= toAppend && HasArrrowAlignment(front.Buf.data())) {
+ TStringBuf data = source.Front().Buf;
+ data.Trunc(toAppend);
+ auto result = std::make_shared<TOwnedArrowBuffer>(data, source.Front().Owner);
+ source.Erase(toAppend);
+ return result;
+ }
- owner->Compact();
- auto span = owner->GetContiguousSpan();
- if (HasArrrowAlignment(span.Data())) {
- return std::make_shared<TOwnedArrowBuffer>(span, owner);
+ auto result = AllocateResizableBuffer(toAppend, NYql::NUdf::GetYqlMemoryPool());
+ ARROW_OK(result->Resize((int64_t)toAppend));
+ uint8_t* dst = result->mutable_data();
+ while (toAppend) {
+ const TChunkedBuffer::TChunk& front = source.Front();
+ TStringBuf buf = front.Buf;
+ YQL_ENSURE(!buf.empty(), "Premature end of buffer");
+ size_t chunk = std::min(toAppend, buf.size());
+ std::memcpy(dst, buf.data(), chunk);
+ dst += chunk;
+ toAppend -= chunk;
+ source.Erase(chunk);
}
- auto result = AllocateResizableBuffer(span.Size(), NYql::NUdf::GetYqlMemoryPool());
- ARROW_OK(result->Resize((int64_t)span.Size()));
- std::memcpy(result->mutable_data(), span.Data(), span.Size());
return result;
}
-std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TRope& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) {
+std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TChunkedBuffer& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) {
YQL_ENSURE(nullCount.Defined(), "Bitmap null count is not loaded");
YQL_ENSURE(bitmapSize.Defined(), "Bitmap size is not loaded");
if (*nullCount == 0) {
@@ -152,7 +164,7 @@ public:
DoLoadMetadata(metaSource);
}
- virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) final {
+ virtual std::shared_ptr<arrow::ArrayData> LoadArray(TChunkedBuffer& src, ui64 blockLen, ui64 offset) final {
YQL_ENSURE(blockLen > 0, "Should be handled earlier");
std::shared_ptr<arrow::Buffer> nulls;
i64 nullsCount = 0;
@@ -193,7 +205,7 @@ protected:
virtual void DoResetMetadata() = 0;
virtual bool IsNullable() const = 0;
virtual std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const = 0;
- virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0;
+ virtual std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) = 0;
std::shared_ptr<arrow::DataType> ArrowType_;
TMaybe<ui64> NullsCount_;
@@ -222,7 +234,7 @@ public:
metaSink(dataBytes);
}
- void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
+ void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
if constexpr (Nullable) {
StoreNulls(data, dst);
if (data.GetNullCount() == data.length) {
@@ -233,7 +245,7 @@ public:
const ui64 desiredOffset = data.offset % 8;
const char* buf = reinterpret_cast<const char*>(data.buffers[1]->data()) + (data.offset - desiredOffset) * ObjectSize;
size_t dataBytes = ((size_t)data.length + desiredOffset) * ObjectSize;
- dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], buf, dataBytes));
+ dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], buf, dataBytes));
}
};
@@ -255,7 +267,7 @@ private:
return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, data }, nullsCount, offset);
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
auto data = LoadBuffer(src, DataSize_);
return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, data}, nullsCount, offset);
}
@@ -294,7 +306,7 @@ private:
metaSink(data.buffers[2]->size());
}
- void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
+ void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
if constexpr (Nullable) {
StoreNulls(data, dst);
if (data.GetNullCount() == data.length) {
@@ -305,11 +317,11 @@ private:
const ui64 desiredOffset = data.offset % 8;
const char* offsets = reinterpret_cast<const char*>(data.GetValues<TOffset>(1) - desiredOffset);
size_t offsetsSize = ((size_t)data.length + 1 + desiredOffset) * sizeof(TOffset);
- dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[1], offsets, offsetsSize));
+ dst.Append(MakeChunkedBufferAndUntrack(data.buffers[1], offsets, offsetsSize));
const char* mainData = reinterpret_cast<const char*>(data.buffers[2]->data());
size_t mainSize = data.buffers[2]->size();
- dst.Insert(dst.End(), MakeReadOnlyRopeAndUntrack(data.buffers[2], mainData, mainSize));
+ dst.Append(MakeChunkedBufferAndUntrack(data.buffers[2], mainData, mainSize));
}
};
@@ -330,7 +342,7 @@ private:
return arrow::ArrayData::Make(ArrowType_, blockLen, { nulls, offsets, data }, nullsCount, offset);
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
auto offsets = LoadBuffer(src, OffsetsSize_);
auto data = LoadBuffer(src, DataSize_);
return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls, offsets, data }, nullsCount, offset);
@@ -371,7 +383,7 @@ private:
}
}
- void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
+ void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
StoreNulls(data, dst);
if (data.GetNullCount() != data.length) {
Inner_->StoreArray(*data.child_data[0], dst);
@@ -396,7 +408,7 @@ private:
return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->MakeDefaultValue(blockLen, offset) }, nullsCount, offset);
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, { Inner_->LoadArray(src, blockLen, offset) }, nullsCount, offset);
}
@@ -441,7 +453,7 @@ class TTupleBlockSerializerBase : public IBlockSerializer {
}
}
- void StoreArray(const arrow::ArrayData& data, TRope& dst) const final {
+ void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
if constexpr (Nullable) {
StoreNulls(data, dst);
}
@@ -466,7 +478,7 @@ public:
return result;
}
- void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
+ void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
const IBlockSerializer::TMetadataSink& metaSink) const {
for (size_t i = 0; i < Children_.size(); ++i) {
@@ -474,7 +486,7 @@ public:
}
}
- void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const {
+ void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TChunkedBuffer& dst) const {
for (size_t i = 0; i < Children_.size(); ++i) {
Children_[i]->StoreArray(*child_data[i], dst);
}
@@ -493,13 +505,13 @@ public:
return DateSerialiser_.ArrayMetadataCount() + TzSerialiser_.ArrayMetadataCount();
}
- void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
+ void StoreChildrenMetadata(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data,
const IBlockSerializer::TMetadataSink& metaSink) const {
DateSerialiser_.StoreMetadata(*child_data[0], metaSink);
TzSerialiser_.StoreMetadata(*child_data[1], metaSink);
}
- void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TRope& dst) const {
+ void StoreChildrenArrays(const std::vector<std::shared_ptr<arrow::ArrayData>>& child_data, TChunkedBuffer& dst) const {
DateSerialiser_.StoreArray(*child_data[0], dst);
TzSerialiser_.StoreArray(*child_data[1], dst);
}
@@ -533,7 +545,7 @@ private:
return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
std::vector<std::shared_ptr<arrow::ArrayData>> childData;
for (auto& child : Children_) {
childData.emplace_back(child->LoadArray(src, blockLen, offset));
@@ -580,7 +592,7 @@ private:
return arrow::ArrayData::Make(ArrowType_, blockLen, {nulls}, std::move(childData), nullsCount, offset);
}
- std::shared_ptr<arrow::ArrayData> DoLoadArray(TRope& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
std::vector<std::shared_ptr<arrow::ArrayData>> childData;
childData.emplace_back(DateDeserialiser_.LoadArray(src, blockLen, offset));
childData.emplace_back(TzDeserialiser_.LoadArray(src, blockLen, offset));
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.h b/yql/essentials/minikql/computation/mkql_block_transport.h
index c93b439f25..bda06d5a49 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.h
+++ b/yql/essentials/minikql/computation/mkql_block_transport.h
@@ -2,7 +2,7 @@
#include <yql/essentials/minikql/mkql_node.h>
-#include <contrib/ydb/library/actors/util/rope.h>
+#include <yql/essentials/utils/chunked_buffer.h>
#include <arrow/datum.h>
@@ -19,7 +19,7 @@ public:
using TMetadataSink = std::function<void(ui64 meta)>;
virtual void StoreMetadata(const arrow::ArrayData& data, const TMetadataSink& metaSink) const = 0;
- virtual void StoreArray(const arrow::ArrayData& data, TRope& dst) const = 0;
+ virtual void StoreArray(const arrow::ArrayData& data, NYql::TChunkedBuffer& dst) const = 0;
};
class IBlockDeserializer {
@@ -28,7 +28,7 @@ public:
using TMetadataSource = std::function<ui64()>;
virtual void LoadMetadata(const TMetadataSource& metaSource) = 0;
- virtual std::shared_ptr<arrow::ArrayData> LoadArray(TRope& src, ui64 blockLen, ui64 offset) = 0;
+ virtual std::shared_ptr<arrow::ArrayData> LoadArray(NYql::TChunkedBuffer& src, ui64 blockLen, ui64 offset) = 0;
};
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
index 4e86b974d9..00bd3fdad2 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.cpp
@@ -11,13 +11,14 @@
#include <yql/essentials/minikql/pack_num.h>
#include <yql/essentials/minikql/mkql_string_util.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
-#include <yql/essentials/utils/rope/rope_over_buffer.h>
#include <library/cpp/resource/resource.h>
#include <yql/essentials/utils/fp_bits.h>
#include <util/system/yassert.h>
#include <util/system/sanitizers.h>
+using NYql::TChunkedBuffer;
+
namespace NKikimr {
namespace NMiniKQL {
@@ -353,7 +354,7 @@ NUdf::TUnboxedValue UnpackFromChunkedBuffer(const TType* type, TChunkedInputBuff
auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, i64>(buf));
ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
return ret;
- }
+ }
case NUdf::EDataSlot::Uuid: {
return UnpackString(buf, 16);
}
@@ -921,7 +922,7 @@ bool IsUi64Scalar(const TBlockType* blockType) {
if (!blockType->GetItemType()->IsData()) {
return false;
}
-
+
return static_cast<const TDataType*>(blockType->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Uint64;
}
@@ -1113,25 +1114,25 @@ void TValuePackerTransport<Fast>::InitBlocks() {
}
template<bool Fast>
-NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THolderFactory& holderFactory) const {
+NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TChunkedBuffer&& buf, const THolderFactory& holderFactory) const {
MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks");
- const size_t totalSize = buf.GetSize();
+ const size_t totalSize = buf.Size();
TChunkedInputBuffer chunked(std::move(buf));
return DoUnpack<Fast>(Type_, chunked, totalSize, holderFactory, State_);
}
template<bool Fast>
-void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
+void TValuePackerTransport<Fast>::UnpackBatch(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
if (IsBlock_) {
return UnpackBatchBlocks(std::move(buf), holderFactory, result);
}
- const size_t totalSize = buf.GetSize();
+ const size_t totalSize = buf.Size();
TChunkedInputBuffer chunked(std::move(buf));
DoUnpackBatch<Fast>(Type_, chunked, totalSize, holderFactory, IncrementalState_, result);
}
template<bool Fast>
-TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
+TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls");
MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks");
TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>();
@@ -1143,7 +1144,7 @@ TRope TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) con
PackImpl<Fast, false>(Type_, *result, value, State_);
BuildMeta(result, false);
}
- return TPagedBuffer::AsRope(result);
+ return TPagedBuffer::AsChunkedBuffer(result);
}
template<bool Fast>
@@ -1224,8 +1225,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
PackData<false>(len, *metadataBuffer);
if (!len) {
// only block len should be serialized in this case
- BlockBuffer_.Insert(BlockBuffer_.End(),
- NYql::MakeReadOnlyRope(metadataBuffer, metadataBuffer->data(), metadataBuffer->size()));
+ BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer);
++ItemCount_;
return *this;
}
@@ -1253,7 +1253,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
} else {
MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar");
if (!ConvertedScalars_[i]) {
- const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
+ const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
static_cast<const TMultiType*>(Type_)->GetElementType(i);
datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_);
MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array");
@@ -1280,8 +1280,7 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error");
- BlockBuffer_.Insert(BlockBuffer_.End(),
- NYql::MakeReadOnlyRope(metadataBuffer, metadataBuffer->data(), metadataBuffer->size()));
+ BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer);
// save buffers
for (size_t i = 0; i < width; ++i) {
if (i != BlockLenIndex_) {
@@ -1293,8 +1292,8 @@ TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(cons
}
template<bool Fast>
-void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
- while (!buf.empty()) {
+void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
+ while (!buf.Empty()) {
TChunkedInputBuffer chunked(std::move(buf));
// unpack block length
@@ -1330,7 +1329,7 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa
}
}
MKQL_ENSURE(metaCount == 0, "Partial buffers read");
- TRope ropeTail = chunked.ReleaseRope();
+ TChunkedBuffer ropeTail = chunked.ReleaseRope();
// unpack buffers
auto producer = [&](ui32 i) {
@@ -1341,7 +1340,7 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa
auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]);
if (isScalar) {
TBlockItem item = BlockReaders_[i]->GetItem(*array, 0);
- const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
+ const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
static_cast<const TMultiType*>(Type_)->GetElementType(i);
return holderFactory.CreateArrowBlock(ConvertScalar(static_cast<const TBlockType*>(itemType)->GetItemType(), item, ArrowPool_));
}
@@ -1367,12 +1366,12 @@ void TValuePackerTransport<Fast>::UnpackBatchBlocks(TRope&& buf, const THolderFa
template<bool Fast>
void TValuePackerTransport<Fast>::Clear() {
Buffer_.reset();
- BlockBuffer_.clear();
+ BlockBuffer_.Clear();
ItemCount_ = 0;
}
template<bool Fast>
-TRope TValuePackerTransport<Fast>::Finish() {
+TChunkedBuffer TValuePackerTransport<Fast>::Finish() {
if (IsBlock_) {
return FinishBlocks();
}
@@ -1389,12 +1388,12 @@ TRope TValuePackerTransport<Fast>::Finish() {
}
TPagedBuffer::TPtr result = std::move(Buffer_);
Clear();
- return TPagedBuffer::AsRope(result);
+ return TPagedBuffer::AsChunkedBuffer(result);
}
template<bool Fast>
-TRope TValuePackerTransport<Fast>::FinishBlocks() {
- TRope result = std::move(BlockBuffer_);
+TChunkedBuffer TValuePackerTransport<Fast>::FinishBlocks() {
+ TChunkedBuffer result = std::move(BlockBuffer_);
Clear();
return result;
}
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack.h b/yql/essentials/minikql/computation/mkql_computation_node_pack.h
index e09679b470..5dc47ada2d 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack.h
@@ -10,7 +10,7 @@
#include <yql/essentials/public/udf/udf_value.h>
#include <library/cpp/enumbitset/enumbitset.h>
-#include <contrib/ydb/library/actors/util/rope.h>
+#include <yql/essentials/utils/chunked_buffer.h>
#include <util/stream/output.h>
#include <util/generic/buffer.h>
@@ -83,7 +83,7 @@ public:
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
TSelf& AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 count);
size_t PackedSizeEstimate() const {
- return IsBlock_ ? BlockBuffer_.size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
+ return IsBlock_ ? BlockBuffer_.Size() : (Buffer_ ? (Buffer_->Size() + Buffer_->ReservedHeaderSize()) : 0);
}
bool IsEmpty() const {
@@ -91,20 +91,20 @@ public:
}
void Clear();
- TRope Finish();
+ NYql::TChunkedBuffer Finish();
// Pack()/Unpack() will pack/unpack single value of type T
- TRope Pack(const NUdf::TUnboxedValuePod& value) const;
- NUdf::TUnboxedValue Unpack(TRope&& buf, const THolderFactory& holderFactory) const;
- void UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
+ NYql::TChunkedBuffer Pack(const NUdf::TUnboxedValuePod& value) const;
+ NUdf::TUnboxedValue Unpack(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory) const;
+ void UnpackBatch(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
private:
void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const;
void StartPack();
void InitBlocks();
TSelf& AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 count);
- TRope FinishBlocks();
- void UnpackBatchBlocks(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
+ NYql::TChunkedBuffer FinishBlocks();
+ void UnpackBatchBlocks(NYql::TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
const TType* const Type_;
ui64 ItemCount_ = 0;
@@ -120,7 +120,7 @@ private:
TVector<std::unique_ptr<IBlockSerializer>> BlockSerializers_;
TVector<std::unique_ptr<IBlockReader>> BlockReaders_;
TVector<std::shared_ptr<arrow::ArrayData>> ConvertedScalars_;
- TRope BlockBuffer_;
+ NYql::TChunkedBuffer BlockBuffer_;
TVector<std::unique_ptr<IBlockDeserializer>> BlockDeserializers_;
};
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h b/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h
index bb675423fe..856898c697 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_impl.h
@@ -5,9 +5,9 @@
#include <yql/essentials/minikql/pack_num.h>
#include <yql/essentials/public/decimal/yql_decimal_serialize.h>
#include <yql/essentials/public/udf/udf_value.h>
+#include <yql/essentials/utils/chunked_buffer.h>
#include <library/cpp/packedtypes/zigzag.h>
-#include <contrib/ydb/library/actors/util/rope.h>
#include <util/generic/buffer.h>
#include <util/generic/strbuf.h>
@@ -69,14 +69,14 @@ void PackDecimal(NYql::NDecimal::TInt128 val, TBuf& buf) {
class TChunkedInputBuffer : private TNonCopyable {
public:
- explicit TChunkedInputBuffer(TRope&& rope)
+ explicit TChunkedInputBuffer(NYql::TChunkedBuffer&& rope)
: Rope_(std::move(rope))
{
Next();
}
explicit TChunkedInputBuffer(TStringBuf input)
- : Rope_(TRope{})
+ : Rope_(NYql::TChunkedBuffer{})
, Data_(input.data())
, Len_(input.size())
{
@@ -117,24 +117,24 @@ public:
}
}
- inline TRope ReleaseRope() {
+ inline NYql::TChunkedBuffer ReleaseRope() {
Y_DEBUG_ABORT_UNLESS(OriginalLen_ >= Len_);
- Rope_.EraseFront(OriginalLen_ - Len_);
- TRope result = std::move(Rope_);
+ Rope_.Erase(OriginalLen_ - Len_);
+ NYql::TChunkedBuffer result = std::move(Rope_);
Data_ = nullptr;
Len_ = OriginalLen_ = 0;
- Rope_.clear();
+ Rope_.Clear();
return result;
}
void Next() {
Y_DEBUG_ABORT_UNLESS(Len_ == 0);
- Rope_.EraseFront(OriginalLen_);
- if (!Rope_.IsEmpty()) {
- Len_ = OriginalLen_ = Rope_.begin().ContiguousSize();
- Data_ = Rope_.begin().ContiguousData();
+ Rope_.Erase(OriginalLen_);
+ if (!Rope_.Empty()) {
+ Len_ = OriginalLen_ = Rope_.Front().Buf.size();
+ Data_ = Rope_.Front().Buf.data();
Y_DEBUG_ABORT_UNLESS(Len_ > 0);
} else {
Len_ = OriginalLen_ = 0;
@@ -157,7 +157,7 @@ private:
}
}
- TRope Rope_;
+ NYql::TChunkedBuffer Rope_;
const char* Data_ = nullptr;
size_t Len_ = 0;
size_t OriginalLen_ = 0;
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
index 1747fd0ea7..b689e4cf8b 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -27,6 +27,8 @@
namespace NKikimr {
namespace NMiniKQL {
+using NYql::TChunkedBuffer;
+
#ifdef WITH_VALGRIND
constexpr static size_t PERFORMANCE_COUNT = 0x1000;
#elif defined(NDEBUG)
@@ -314,8 +316,8 @@ protected:
TestVariantTypeImpl(PgmBuilder.NewVariantType(tupleType));
}
- void ValidateEmbeddedLength(TRope buf, const TString& info) {
- size_t size = buf.GetSize();
+ void ValidateEmbeddedLength(TChunkedBuffer buf, const TString& info) {
+ size_t size = buf.Size();
TChunkedInputBuffer chunked(std::move(buf));
return ValidateEmbeddedLength(chunked, size, info);
}
@@ -352,13 +354,13 @@ protected:
auto packedValue = packer.Pack(uValue);
if constexpr (Transport) {
if (expectedLength) {
- UNIT_ASSERT_VALUES_EQUAL_C(packedValue.size(), *expectedLength, additionalMsg);
+ UNIT_ASSERT_VALUES_EQUAL_C(packedValue.Size(), *expectedLength, additionalMsg);
}
ValidateEmbeddedLength(packedValue, additionalMsg);
return packer.Unpack(std::move(packedValue), HolderFactory);
} else {
if (expectedLength) {
- UNIT_ASSERT_VALUES_EQUAL_C(packedValue.size(), *expectedLength, additionalMsg);
+ UNIT_ASSERT_VALUES_EQUAL_C(packedValue.Size(), *expectedLength, additionalMsg);
}
ValidateEmbeddedLength(packedValue, additionalMsg);
return packer.Unpack(packedValue, HolderFactory);
@@ -591,7 +593,10 @@ protected:
auto buffer = packer.Pack(value);
- TString packed = buffer.ConvertToString();
+ TString packed;
+ TStringOutput sout(packed);
+ buffer.CopyTo(sout);
+ TStringBuf packedBuf(packed);
if constexpr (Fast) {
UNIT_ASSERT_VALUES_EQUAL(packed.size(), 73);
@@ -600,14 +605,14 @@ protected:
}
for (size_t chunk = 1; chunk < packed.size(); ++chunk) {
- TString first = packed.substr(0, chunk);
- TString second = packed.substr(chunk);
+ TStringBuf first = packedBuf.substr(0, chunk);
+ TStringBuf second = packedBuf.substr(chunk);
- TRope result(std::move(first));
- result.Insert(result.End(), TRope(std::move(second)));
+ TChunkedBuffer result(first, {});
+ result.Append(second, {});
- UNIT_ASSERT_VALUES_EQUAL(result.size(), packed.size());
- UNIT_ASSERT(!result.IsContiguous());
+ UNIT_ASSERT_VALUES_EQUAL(result.Size(), packed.size());
+ UNIT_ASSERT(result.Size() != result.ContigousSize());
ValidateTupleValue(packer.Unpack(std::move(result), HolderFactory));
}
@@ -632,7 +637,7 @@ protected:
auto serialized = packer.Finish();
- auto listObj = listPacker.Unpack(TRope(serialized), HolderFactory);
+ auto listObj = listPacker.Unpack(TChunkedBuffer(serialized), HolderFactory);
UNIT_ASSERT_VALUES_EQUAL(listObj.GetListLength(), count);
const auto iter = listObj.GetListIterator();
for (NUdf::TUnboxedValue uVal; iter.Next(uVal);) {
@@ -666,7 +671,7 @@ protected:
auto scalarOptStrType = PgmBuilder.NewBlockType(optStrType, TBlockType::EShape::Scalar);
auto blockOptTupleOptUi32StrType = PgmBuilder.NewBlockType(optTupleOptUi32StrType, TBlockType::EShape::Many);
auto scalarUi64Type = PgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
-
+
auto tzDateType = PgmBuilder.NewDataType(NUdf::EDataSlot::TzDate);
auto blockTzDateType = PgmBuilder.NewBlockType(tzDateType, TBlockType::EShape::Many);
@@ -703,7 +708,7 @@ protected:
TBlockItem b3items[] = { (i % 2) ? TBlockItem(i) : TBlockItem(), TBlockItem(a) };
TBlockItem b3 = (i % 7) ? TBlockItem(b3items) : TBlockItem();
builder3->Add(b3);
-
+
TBlockItem tzDate {i};
tzDate.SetTimezoneId(i % 100);
builder4->Add(tzDate);
@@ -749,7 +754,7 @@ protected:
} else {
packer.AddWideItem(columns.data(), columns.size());
}
- TRope packed = packer.Finish();
+ TChunkedBuffer packed = packer.Finish();
TUnboxedValueBatch unpacked(rowType);
packer.UnpackBatch(std::move(packed), HolderFactory, unpacked);
diff --git a/yql/essentials/minikql/computation/mkql_spiller.h b/yql/essentials/minikql/computation/mkql_spiller.h
index 0516109c04..e0ce2706d8 100644
--- a/yql/essentials/minikql/computation/mkql_spiller.h
+++ b/yql/essentials/minikql/computation/mkql_spiller.h
@@ -1,7 +1,7 @@
#pragma once
#include <library/cpp/threading/future/core/future.h>
-#include <contrib/ydb/library/actors/util/rope.h>
+#include <yql/essentials/utils/chunked_buffer.h>
namespace NKikimr::NMiniKQL {
@@ -10,16 +10,16 @@ struct ISpiller
using TPtr = std::shared_ptr<ISpiller>;
virtual ~ISpiller(){}
using TKey = ui64;
- virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0;
+ virtual NThreading::TFuture<TKey> Put(NYql::TChunkedBuffer&& blob) = 0;
///\return
/// nullopt for absent keys
/// TFuture
- virtual NThreading::TFuture<std::optional<TRope>> Get(TKey key) = 0;
+ virtual NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Get(TKey key) = 0;
virtual NThreading::TFuture<void> Delete(TKey) = 0;
///Get + Delete
///Stored value may be moved to future
- virtual NThreading::TFuture<std::optional<TRope>> Extract(TKey key) = 0;
+ virtual NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Extract(TKey key) = 0;
};
}//namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h
index a9a7323853..462a2a7c5e 100644
--- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h
+++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h
@@ -50,7 +50,7 @@ public:
bool Empty() const {
return StoredChunks.empty() && !CurrentBatch;
}
- std::optional<NThreading::TFuture<std::optional<TRope>>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) {
+ std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) {
MKQL_ENSURE(!Empty(), "Internal logic error");
if (CurrentBatch) {
auto row = CurrentBatch->Head();
@@ -69,7 +69,7 @@ public:
}
}
- void AsyncReadCompleted(TRope&& rope,const THolderFactory& holderFactory ) {
+ void AsyncReadCompleted(NYql::TChunkedBuffer&& rope,const THolderFactory& holderFactory ) {
//Implementation detail: deserialization is performed in a processing thread
TUnboxedValueBatch batch(ItemType);
Packer.UnpackBatch(std::move(rope), holderFactory, batch);
diff --git a/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h
index 1663397107..8c9896cf98 100644
--- a/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h
+++ b/yql/essentials/minikql/computation/mkql_vector_spiller_adapter.h
@@ -4,7 +4,6 @@
#include <yql/essentials/minikql/defs.h>
#include <yql/essentials/minikql/computation/mkql_spiller.h>
-#include <contrib/ydb/library/actors/util/rope.h>
#include <yql/essentials/minikql/mkql_alloc.h>
namespace NKikimr::NMiniKQL {
@@ -63,7 +62,7 @@ public:
SaveNextPartOfVector();
}
- ///Should be used to update async operatrions statuses.
+ ///Should be used to update async operatrions statuses.
///For SpillingData state it will try to spill more content of inner buffer.
///ForRestoringData state it will try to load more content of requested vector.
void Update() {
@@ -125,7 +124,7 @@ public:
///Is case if buffer is not ready async write operation will be started.
void Finalize() {
MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
- if (Buffer.empty()) {
+ if (Buffer.Empty()) {
State = EState::AcceptingDataRequests;
return;
}
@@ -135,12 +134,24 @@ public:
}
private:
-
- void CopyRopeToTheEndOfVector(std::vector<T, Alloc>& vec, TRope& rope) {
- for (auto it = rope.begin(); it != rope.end(); ++it) {
- const T* data = reinterpret_cast<const T*>(it.ContiguousData());
- vec.insert(vec.end(), data, data + it.ContiguousSize() / sizeof(T)); // size is always multiple of sizeof(T)
+ class TVectorStream : public IOutputStream {
+ public:
+ explicit TVectorStream(std::vector<T, Alloc>& vec)
+ : Dst_(vec)
+ {
+ }
+ private:
+ virtual void DoWrite(const void* buf, size_t len) override {
+ MKQL_ENSURE(len % sizeof(T) == 0, "size should always by multiple of sizeof(T)");
+ const T* data = reinterpret_cast<const T*>(buf);
+ Dst_.insert(Dst_.end(), data, data + len / sizeof(T));
}
+ std::vector<T, Alloc>& Dst_;
+ };
+
+ void CopyRopeToTheEndOfVector(std::vector<T, Alloc>& vec, const NYql::TChunkedBuffer& rope, size_t toCopy = std::numeric_limits<size_t>::max()) {
+ TVectorStream out(vec);
+ rope.CopyTo(out, toCopy);
}
void LoadNextVector() {
@@ -148,10 +159,10 @@ private:
MKQL_ENSURE(requestedVectorSize >= CurrentVector.size(), "Internal logic error");
size_t sizeToLoad = (requestedVectorSize - CurrentVector.size()) * sizeof(T);
- if (Buffer.size() >= sizeToLoad) {
+ if (Buffer.Size() >= sizeToLoad) {
// if all the data for requested vector is ready
- TRope remainingPartOfVector = Buffer.Extract(Buffer.Position(0), Buffer.Position(sizeToLoad));
- CopyRopeToTheEndOfVector(CurrentVector, remainingPartOfVector);
+ CopyRopeToTheEndOfVector(CurrentVector, Buffer, sizeToLoad);
+ Buffer.Erase(sizeToLoad);
State = EState::DataReady;
} else {
CopyRopeToTheEndOfVector(CurrentVector, Buffer);
@@ -165,13 +176,13 @@ private:
}
void AddDataToRope(const T* data, size_t count) {
- TRope tmp = TRope::Uninitialized(count * sizeof(T));
- TRopeUtils::Memcpy(tmp.begin(), reinterpret_cast<const char*>(data), count * sizeof(T));
- Buffer.Insert(Buffer.End(), std::move(tmp));
+ auto owner = std::make_shared<std::vector<T>>(data, data + count);
+ TStringBuf buf(reinterpret_cast<const char *>(owner->data()), count * sizeof(T));
+ Buffer.Append(buf, owner);
}
void SaveNextPartOfVector() {
- size_t maxFittingElemets = (SizeLimit - Buffer.size()) / sizeof(T);
+ size_t maxFittingElemets = (SizeLimit - Buffer.Size()) / sizeof(T);
size_t remainingElementsInVector = CurrentVector.size() - NextVectorPositionToSave;
size_t elementsToCopyFromVector = std::min(maxFittingElemets, remainingElementsInVector);
@@ -183,7 +194,7 @@ private:
NextVectorPositionToSave = 0;
}
- if (SizeLimit - Buffer.size() < sizeof(T)) {
+ if (SizeLimit - Buffer.Size() < sizeof(T)) {
SaveBuffer();
return;
}
@@ -193,22 +204,22 @@ private:
private:
EState State = EState::AcceptingData;
-
+
ISpiller::TPtr Spiller;
const size_t SizeLimit;
- TRope Buffer;
+ NYql::TChunkedBuffer Buffer;
// Used to store vector while spilling and also used while restoring the data
std::vector<T, Alloc> CurrentVector;
size_t NextVectorPositionToSave = 0;
- std::queue<ISpiller::TKey> StoredChunks;
+ std::queue<ISpiller::TKey> StoredChunks;
std::queue<size_t> StoredChunksElementsCount;
std::optional<NThreading::TFuture<ISpiller::TKey>> WriteOperation = std::nullopt;
- std::optional<NThreading::TFuture<std::optional<TRope>>> ReadOperation = std::nullopt;
+ std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ReadOperation = std::nullopt;
bool IsFinalizing = false;
};
-}//namespace NKikimr::NMiniKQL \ No newline at end of file
+}//namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
index 2264d6d215..9ea74569b1 100644
--- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
+++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
@@ -5,8 +5,6 @@
namespace NKikimr::NMiniKQL {
-using namespace NActors;
-
class TMockSpillerFactory : public ISpillerFactory
{
public:
@@ -18,4 +16,4 @@ public:
}
};
-} // namespace NKikimr::NMiniKQL \ No newline at end of file
+} // namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/mock_spiller_ut.h b/yql/essentials/minikql/computation/mock_spiller_ut.h
index daf2ab6067..42846eab1f 100644
--- a/yql/essentials/minikql/computation/mock_spiller_ut.h
+++ b/yql/essentials/minikql/computation/mock_spiller_ut.h
@@ -14,7 +14,7 @@ public:
: NextKey(0)
{}
- NThreading::TFuture<TKey> Put(TRope&& blob) override {
+ NThreading::TFuture<TKey> Put(NYql::TChunkedBuffer&& blob) override {
auto promise = NThreading::NewPromise<ISpiller::TKey>();
auto key = NextKey;
@@ -24,8 +24,8 @@ public:
return promise.GetFuture();;
}
- NThreading::TFuture<std::optional<TRope>> Get(TKey key) override {
- auto promise = NThreading::NewPromise<std::optional<TRope>>();
+ NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Get(TKey key) override {
+ auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>();
if (auto it = Storage.find(key); it != Storage.end()) {
promise.SetValue(it->second);
} else {
@@ -35,8 +35,8 @@ public:
return promise.GetFuture();
}
- NThreading::TFuture<std::optional<TRope>> Extract(TKey key) override {
- auto promise = NThreading::NewPromise<std::optional<TRope>>();
+ NThreading::TFuture<std::optional<NYql::TChunkedBuffer>> Extract(TKey key) override {
+ auto promise = NThreading::NewPromise<std::optional<NYql::TChunkedBuffer>>();
if (auto it = Storage.find(key); it != Storage.end()) {
promise.SetValue(std::move(it->second));
Storage.erase(it);
@@ -54,10 +54,10 @@ public:
}
private:
ISpiller::TKey NextKey;
- std::unordered_map<ISpiller::TKey, TRope> Storage;
+ std::unordered_map<ISpiller::TKey, NYql::TChunkedBuffer> Storage;
};
inline ISpiller::TPtr CreateMockSpiller() {
return std::make_shared<TMockSpiller>();
}
-} //namespace NKikimr::NMiniKQL \ No newline at end of file
+} //namespace NKikimr::NMiniKQL
diff --git a/yql/essentials/minikql/computation/ya.make.inc b/yql/essentials/minikql/computation/ya.make.inc
index ccd6b78115..7a663f1a46 100644
--- a/yql/essentials/minikql/computation/ya.make.inc
+++ b/yql/essentials/minikql/computation/ya.make.inc
@@ -24,7 +24,6 @@ PEERDIR(
yql/essentials/parser/pg_wrapper/interface
yql/essentials/public/udf
yql/essentials/utils
- yql/essentials/utils/rope
library/cpp/threading/future
)
@@ -39,7 +38,7 @@ COPY(
AUTO
FROM ${ORIG_SRC_DIR}
${ORIG_SOURCES}
- OUTPUT_INCLUDES
+ OUTPUT_INCLUDES
${BINDIR}/yql/essentials/minikql/computation/mkql_llvm_base.h
${BINDIR}/yql/essentials/minikql/computation/mkql_computation_node_codegen.h
)
diff --git a/yql/essentials/minikql/mkql_buffer.cpp b/yql/essentials/minikql/mkql_buffer.cpp
index ed64d5b412..c47b8648b2 100644
--- a/yql/essentials/minikql/mkql_buffer.cpp
+++ b/yql/essentials/minikql/mkql_buffer.cpp
@@ -1,7 +1,5 @@
#include "mkql_buffer.h"
-#include <yql/essentials/utils/rope/rope_over_buffer.h>
-
namespace NKikimr {
namespace NMiniKQL {
@@ -46,10 +44,11 @@ void TPagedBuffer::AppendPage() {
Tail_ = page->Data();
}
-TRope TPagedBuffer::AsRope(const TConstPtr& buffer) {
- TRope result;
+using NYql::TChunkedBuffer;
+TChunkedBuffer TPagedBuffer::AsChunkedBuffer(const TConstPtr& buffer) {
+ TChunkedBuffer result;
buffer->ForEachPage([&](const char* data, size_t size) {
- result.Insert(result.End(), NYql::MakeReadOnlyRope(buffer, data, size));
+ result.Append(TStringBuf(data, size), buffer);
});
return result;
diff --git a/yql/essentials/minikql/mkql_buffer.h b/yql/essentials/minikql/mkql_buffer.h
index 9d96d0350f..a1cf4d5aab 100644
--- a/yql/essentials/minikql/mkql_buffer.h
+++ b/yql/essentials/minikql/mkql_buffer.h
@@ -2,7 +2,7 @@
#include "defs.h"
-#include <contrib/ydb/library/actors/util/rope.h>
+#include <yql/essentials/utils/chunked_buffer.h>
#include <util/generic/noncopyable.h>
#include <util/stream/output.h>
@@ -214,7 +214,7 @@ class TPagedBuffer : private TNonCopyable {
}
}
- static TRope AsRope(const TConstPtr& buf);
+ static NYql::TChunkedBuffer AsChunkedBuffer(const TConstPtr& buf);
private:
void AppendPage();
diff --git a/yql/essentials/minikql/ya.make b/yql/essentials/minikql/ya.make
index e1d1f83ccb..7060a6a86b 100644
--- a/yql/essentials/minikql/ya.make
+++ b/yql/essentials/minikql/ya.make
@@ -72,7 +72,6 @@ PEERDIR(
yql/essentials/public/udf
yql/essentials/public/udf/tz
yql/essentials/utils
- yql/essentials/utils/rope
yql/essentials/core/sql_types
)
diff --git a/yql/essentials/utils/chunked_buffer.cpp b/yql/essentials/utils/chunked_buffer.cpp
new file mode 100644
index 0000000000..906da8dcae
--- /dev/null
+++ b/yql/essentials/utils/chunked_buffer.cpp
@@ -0,0 +1,123 @@
+#include "chunked_buffer.h"
+
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql {
+
+TChunkedBuffer::TChunkedBuffer(TChunkedBuffer&& other) {
+ Items_ = std::move(other.Items_);
+}
+
+TChunkedBuffer& TChunkedBuffer::operator=(TChunkedBuffer&& other) {
+ Items_ = std::move(other.Items_);
+ return *this;
+}
+
+TChunkedBuffer::TChunkedBuffer(TStringBuf buf, const std::shared_ptr<const void>& owner) {
+ Append(buf, owner);
+}
+
+TChunkedBuffer::TChunkedBuffer(TString&& str) {
+ Append(std::move(str));
+}
+
+const TChunkedBuffer::TChunk& TChunkedBuffer::Front() const {
+ YQL_ENSURE(!Items_.empty());
+ return Items_.front();
+}
+
+size_t TChunkedBuffer::CopyTo(IOutputStream& dst, size_t toCopy) const {
+ size_t copied = 0;
+ for (auto& chunk : Items_) {
+ if (!toCopy) {
+ break;
+ }
+ size_t copyChunk = std::min(chunk.Buf.size(), toCopy);
+ dst.Write(chunk.Buf.data(), copyChunk);
+ toCopy -= copyChunk;
+ copied += copyChunk;
+ }
+ return copied;
+}
+
+size_t TChunkedBuffer::ContigousSize() const {
+ return Items_.empty() ? 0 : Front().Buf.size();
+}
+
+size_t TChunkedBuffer::Size() const {
+ size_t result = 0;
+ for (auto& item : Items_) {
+ result += item.Buf.size();
+ }
+ return result;
+}
+
+bool TChunkedBuffer::Empty() const {
+ return Items_.empty();
+}
+
+TChunkedBuffer& TChunkedBuffer::Append(TStringBuf buf, const std::shared_ptr<const void>& owner) {
+ if (!buf.empty()) {
+ Items_.emplace_back(TChunk{buf, owner});
+ }
+ return *this;
+}
+
+TChunkedBuffer& TChunkedBuffer::Append(TString&& str) {
+ if (!str.empty()) {
+ auto owner = std::make_shared<TString>(std::move(str));
+ Items_.emplace_back(TChunk{*owner, owner});
+ }
+ return *this;
+}
+
+TChunkedBuffer& TChunkedBuffer::Append(TChunkedBuffer&& other) {
+ while (!other.Items_.empty()) {
+ Items_.emplace_back(std::move(other.Items_.front()));
+ other.Items_.pop_front();
+ }
+ return *this;
+}
+
+TChunkedBuffer& TChunkedBuffer::Clear() {
+ Items_.clear();
+ return *this;
+}
+
+TChunkedBuffer& TChunkedBuffer::Erase(size_t size) {
+ while (size && !Items_.empty()) {
+ TStringBuf& buf = Items_.front().Buf;
+ size_t toErase = std::min(buf.size(), size);
+ buf.Skip(toErase);
+ size -= toErase;
+ if (buf.empty()) {
+ Items_.pop_front();
+ }
+ }
+ return *this;
+}
+
+TChunkedBufferOutput::TChunkedBufferOutput(TChunkedBuffer& dst)
+ : Dst_(dst)
+{
+}
+
+void TChunkedBufferOutput::DoWrite(const void* buf, size_t len) {
+ TString str(static_cast<const char*>(buf), len);
+ Dst_.Append(std::move(str));
+}
+
+TChunkedBuffer CopyData(const TChunkedBuffer& src) {
+ TChunkedBuffer result;
+ TChunkedBufferOutput out(result);
+ src.CopyTo(out);
+ return result;
+}
+
+TChunkedBuffer CopyData(TChunkedBuffer&& src) {
+ TChunkedBuffer result = CopyData(src);
+ src.Clear();
+ return result;
+}
+
+}
diff --git a/yql/essentials/utils/chunked_buffer.h b/yql/essentials/utils/chunked_buffer.h
new file mode 100644
index 0000000000..6f5b2a8ea7
--- /dev/null
+++ b/yql/essentials/utils/chunked_buffer.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+#include <util/generic/string.h>
+#include <util/stream/output.h>
+
+#include <memory>
+#include <deque>
+
+namespace NYql {
+
+class TChunkedBuffer {
+public:
+ TChunkedBuffer() = default;
+ TChunkedBuffer(const TChunkedBuffer&) = default;
+ TChunkedBuffer(TChunkedBuffer&& other);
+ TChunkedBuffer& operator=(TChunkedBuffer&& other);
+ explicit TChunkedBuffer(TStringBuf buf, const std::shared_ptr<const void>& owner);
+ explicit TChunkedBuffer(TString&& str);
+
+ size_t ContigousSize() const;
+ size_t Size() const;
+ bool Empty() const;
+
+ struct TChunk {
+ TStringBuf Buf;
+ std::shared_ptr<const void> Owner;
+ };
+
+ const TChunk& Front() const;
+ size_t CopyTo(IOutputStream& dst, size_t toCopy = std::numeric_limits<size_t>::max()) const;
+
+ TChunkedBuffer& Append(TStringBuf buf, const std::shared_ptr<const void>& owner);
+ TChunkedBuffer& Append(TString&& str);
+ TChunkedBuffer& Append(TChunkedBuffer&& other);
+
+ TChunkedBuffer& Clear();
+ TChunkedBuffer& Erase(size_t size);
+
+private:
+ std::deque<TChunk> Items_;
+};
+
+class TChunkedBufferOutput : public IOutputStream {
+public:
+ explicit TChunkedBufferOutput(TChunkedBuffer& dst);
+private:
+ virtual void DoWrite(const void *buf, size_t len) override;
+
+ TChunkedBuffer& Dst_;
+};
+
+TChunkedBuffer CopyData(const TChunkedBuffer& src);
+TChunkedBuffer CopyData(TChunkedBuffer&& src);
+
+}
diff --git a/yql/essentials/utils/rope/rope_over_buffer.cpp b/yql/essentials/utils/rope/rope_over_buffer.cpp
deleted file mode 100644
index e8074372fd..0000000000
--- a/yql/essentials/utils/rope/rope_over_buffer.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-#include "rope_over_buffer.h"
-
-#include <yql/essentials/utils/yql_panic.h>
-
-namespace NYql {
-
-namespace {
-
-class TContigousChunkOverBuf : public IContiguousChunk {
-public:
- TContigousChunkOverBuf(const std::shared_ptr<const void>& owner, TContiguousSpan span)
- : Owner_(owner)
- , Span_(span)
- {
- }
-private:
- TContiguousSpan GetData() const override {
- return Span_;
- }
-
- TMutableContiguousSpan GetDataMut() override {
- YQL_ENSURE(false, "Payload mutation is not supported");
- }
-
- size_t GetOccupiedMemorySize() const override {
- return Span_.GetSize();
- }
-
- const std::shared_ptr<const void> Owner_;
- const TContiguousSpan Span_;
-};
-
-} // namespace
-
-
-TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size) {
- if (!size) {
- return TRope();
- }
- return TRope(new TContigousChunkOverBuf(owner, {data, size}));
-}
-
-} // namespace NYql
diff --git a/yql/essentials/utils/rope/rope_over_buffer.h b/yql/essentials/utils/rope/rope_over_buffer.h
deleted file mode 100644
index 9d18e52263..0000000000
--- a/yql/essentials/utils/rope/rope_over_buffer.h
+++ /dev/null
@@ -1,11 +0,0 @@
-#pragma once
-
-#include <contrib/ydb/library/actors/util/rope.h>
-
-#include <memory>
-
-namespace NYql {
-
-TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size);
-
-} \ No newline at end of file
diff --git a/yql/essentials/utils/rope/ya.make b/yql/essentials/utils/rope/ya.make
deleted file mode 100644
index 9b92a50c48..0000000000
--- a/yql/essentials/utils/rope/ya.make
+++ /dev/null
@@ -1,13 +0,0 @@
-LIBRARY()
-
-SRCS(
- rope_over_buffer.cpp
-)
-
-PEERDIR(
- contrib/ydb/library/actors/util
- yql/essentials/utils
-)
-
-END()
-
diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make
index 8e40de1843..8f0181520a 100644
--- a/yql/essentials/utils/ya.make
+++ b/yql/essentials/utils/ya.make
@@ -2,6 +2,8 @@ LIBRARY()
SRCS(
cast.h
+ chunked_buffer.cpp
+ chunked_buffer.h
debug_info.cpp
debug_info.h
exceptions.cpp
@@ -59,7 +61,6 @@ IF (OPENSOURCE_PROJECT != "yt")
fetch
log
network
- rope
signals
sys
test_http_server