aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinnokentii <innokentii@yandex-team.com>2022-09-10 22:49:57 +0300
committerinnokentii <innokentii@yandex-team.com>2022-09-10 22:49:57 +0300
commita8945929eb162f6910b228697c3d2db3b7cb2ca9 (patch)
tree4f6d161bffc1e27ec7bfe54827b31fa2365b7f96
parent5a89de48da3bd0e557702e048c37f690f4263f3b (diff)
downloadydb-a8945929eb162f6910b228697c3d2db3b7cb2ca9.tar.gz
Replace TString with TRope in TEvPut
replace TString with TRope in TEvPut
-rw-r--r--library/cpp/actors/util/rope.h2
-rw-r--r--ydb/core/base/blobstorage.h38
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp17
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp18
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h12
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h10
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp7
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h8
-rw-r--r--ydb/core/blobstorage/dsproxy/mock/model.h14
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h6
-rw-r--r--ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp4
-rw-r--r--ydb/core/blobstorage/testload/test_load_vdisk_write.cpp5
-rw-r--r--ydb/core/blobstorage/testload/test_load_write.cpp9
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/main.cpp10
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/osiris.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h2
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/blob_recovery.h2
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp8
-rw-r--r--ydb/core/erasure/erasure.cpp80
-rw-r--r--ydb/core/erasure/erasure.h63
-rw-r--r--ydb/core/erasure/erasure_perf_test.cpp16
-rw-r--r--ydb/core/erasure/erasure_ut.cpp20
-rw-r--r--ydb/core/util/defs.h4
-rw-r--r--ydb/core/util/fragmented_buffer.cpp14
-rw-r--r--ydb/core/util/fragmented_buffer.h7
-rw-r--r--ydb/core/util/fragmented_buffer_ut.cpp16
-rw-r--r--ydb/core/util/lz4_data_generator.h14
35 files changed, 251 insertions, 184 deletions
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h
index 54b8e344124..fd4a5acdba1 100644
--- a/library/cpp/actors/util/rope.h
+++ b/library/cpp/actors/util/rope.h
@@ -1060,7 +1060,7 @@ public:
Erase(Begin(), End());
}
- bool IsContiguous() {
+ bool IsContiguous() const {
if(Begin() == End() || (++Begin() == End())) {
return true;
}
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 0748eb6f780..d8bc5b3529c 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -16,6 +16,8 @@
#include <library/cpp/actors/wilson/wilson_trace.h>
#include <library/cpp/lwtrace/shuttle.h>
+#include <library/cpp/actors/util/rope.h>
+#include <library/cpp/actors/util/shared_data_rope_backend.h>
#include <util/stream/str.h>
#include <util/generic/xrange.h>
@@ -903,7 +905,7 @@ struct TEvBlobStorage {
};
const TLogoBlobID Id;
- const TString Buffer;
+ const TRope Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere
const TInstant Deadline;
const NKikimrBlobStorage::EPutHandleClass HandleClass;
const ETactic Tactic;
@@ -912,35 +914,49 @@ struct TEvBlobStorage {
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs
bool Decommission = false; // is it generated by decommission and should bypass all block checks?
- TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline,
+ TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
ETactic tactic = TacticDefault)
: Id(id)
- , Buffer(buffer)
+ , Buffer(std::move(buffer))
, Deadline(deadline)
, HandleClass(handleClass)
, Tactic(tactic)
{
Y_VERIFY(Id, "EvPut invalid: LogoBlobId must have non-zero tablet field, id# %s", Id.ToString().c_str());
- Y_VERIFY(buffer.size() < (40 * 1024 * 1024),
+ Y_VERIFY(Buffer.size() < (40 * 1024 * 1024),
"EvPut invalid: LogoBlobId# %s buffer.Size# %zu",
- id.ToString().data(), buffer.size());
- Y_VERIFY(buffer.size() == id.BlobSize(),
+ id.ToString().data(), Buffer.size());
+ Y_VERIFY(Buffer.size() == id.BlobSize(),
"EvPut invalid: LogoBlobId# %s buffer.Size# %zu",
- id.ToString().data(), buffer.size());
+ id.ToString().data(), Buffer.size());
+ Y_VERIFY(Buffer.IsContiguous(), "EvPut invalid: buffer must be contiguous");
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&id, sizeof(id));
- REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(buffer.Data(), buffer.size());
+ REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(Buffer.GetContiguousSpan.Data(), Buffer.size());
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&deadline, sizeof(deadline));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&handleClass, sizeof(handleClass));
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&tactic, sizeof(tactic));
}
+ TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline,
+ NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
+ ETactic tactic = TacticDefault)
+ : TEvPut(id, TRope(buffer), deadline, handleClass, tactic)
+ {}
+
+
+ TEvPut(const TLogoBlobID &id, const TSharedData &buffer, TInstant deadline,
+ NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
+ ETactic tactic = TacticDefault)
+ : TEvPut(id, TRope(MakeIntrusive<TRopeSharedDataBackend>(buffer)), deadline, handleClass, tactic)
+ {}
+
TString Print(bool isFull) const {
TStringStream str;
str << "TEvPut {Id# " << Id.ToString();
- str << " Size# " << Buffer.size();
+ str << " Size# " << Buffer.GetSize();
if (isFull) {
- str << " Buffer# " << Buffer.Quote();
+ str << " Buffer# " << Buffer.ConvertToString().Quote();
}
str << " Deadline# " << Deadline.MilliSeconds();
str << " HandleClass# " << HandleClass;
@@ -954,7 +970,7 @@ struct TEvBlobStorage {
}
ui32 CalculateSize() const {
- return sizeof(*this) + Buffer.capacity();
+ return sizeof(*this) + Buffer.GetSize();
}
std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 875af36a284..3c7c79cc466 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -77,11 +77,11 @@ namespace NKikimr::NBlobDepot {
footer.StoredBlobId = msg.Id;
TStringBuf footerData(reinterpret_cast<const char*>(&footer), sizeof(footer));
- auto put = [&](EBlobType type, const TString& buffer) {
+ auto put = [&](EBlobType type, TRope&& buffer) {
const auto& [id, groupId] = kind.MakeBlobId(Agent, BlobSeqId, type, 0, buffer.size());
Y_VERIFY(!locator->HasGroupId() || locator->GetGroupId() == groupId);
locator->SetGroupId(groupId);
- auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, msg.Deadline, msg.HandleClass, msg.Tactic);
+ auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), msg.Deadline, msg.HandleClass, msg.Tactic);
ev->ExtraBlockChecks = msg.ExtraBlockChecks;
if (!msg.Decommission) { // do not check original blob against blocks when writing decommission copy
ev->ExtraBlockChecks.emplace_back(msg.Id.TabletID(), msg.Id.Generation());
@@ -92,15 +92,18 @@ namespace NKikimr::NBlobDepot {
if (SuppressFooter) {
// write the blob as is, we don't need footer for this kind
- put(EBlobType::VG_DATA_BLOB, msg.Buffer);
+ put(EBlobType::VG_DATA_BLOB, TRope(msg.Buffer));
} else if (msg.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) {
// write single blob with footer
- put(EBlobType::VG_COMPOSITE_BLOB, msg.Buffer + footerData);
+ TRope buffer = msg.Buffer;
+ buffer.Insert(buffer.End(), TRope(TString(footerData))); //FIXME(innokentii)
+ buffer.Compact();
+ put(EBlobType::VG_COMPOSITE_BLOB, std::move(buffer));
} else {
// write data blob and blob with footer
- put(EBlobType::VG_DATA_BLOB, msg.Buffer);
- put(EBlobType::VG_FOOTER_BLOB, TString(footerData));
- }
+ put(EBlobType::VG_DATA_BLOB, TRope(msg.Buffer));
+ put(EBlobType::VG_FOOTER_BLOB, TRope(TString(footerData)));
+ }
}
void OnUpdateBlock(bool success) override {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
index a602caf8ff6..8b7af89d97d 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
@@ -15,7 +15,7 @@ void TBlobState::TState::AddResponseData(ui32 fullSize, ui32 shift, TString &dat
Here.Add(shift, shift + data.size());
}
-void TBlobState::TState::AddPartToPut(TString &data) {
+void TBlobState::TState::AddPartToPut(TRope &data) {
Y_VERIFY(data.size());
Data.SetMonolith(data);
Here.Assign(0, data.size());
@@ -45,7 +45,7 @@ void TBlobState::AddNeeded(ui64 begin, ui64 size) {
IsChanged = true;
}
-void TBlobState::AddPartToPut(ui32 partIdx, TString &partData) {
+void TBlobState::AddPartToPut(ui32 partIdx, TRope &partData) {
Y_VERIFY(bool(Id));
Y_VERIFY(partIdx < Parts.size());
Parts[partIdx].AddPartToPut(partData);
@@ -84,17 +84,17 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
if (const ui32 partSize = info.Type.PartSize(TLogoBlobID(Id, i + 1))) {
if (TIntervalVec<i32>(0, partSize).IsSubsetOf(Parts[i].Here)) {
partSet.PartsMask |= (1 << i);
- TString tmp = TString::Uninitialized(partSize);
- Parts[i].Data.Read(0, const_cast<char*>(tmp.data()), partSize);
- partSet.Parts[i].ReferenceTo(tmp);
+ TRope data(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Uninitialized(partSize)));
+ Parts[i].Data.Read(0, data.UnsafeGetContiguousSpanMut().data(), partSize);
+ partSet.Parts[i].ReferenceTo(data);
}
}
}
partSet.FullDataSize = Id.BlobSize();
- TString whole;
+ TRope whole;
info.Type.RestoreData((TErasureType::ECrcMode)Id.CrcMode(), partSet, whole, false, true, false);
- Whole.Data.Write(0, whole.data(), Id.BlobSize());
+ Whole.Data.Write(0, whole.GetContiguousSpan().data(), Id.BlobSize());
Whole.Here.Add(fullBlobInterval);
Whole.NotHere.Subtract(fullBlobInterval);
return true;
@@ -350,7 +350,7 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i
DiskRequestsForOrderNumber[diskOrderNumber].GetsToSend.emplace_back(id, shift, size);
}
-void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer,
+void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
NWilson::TSpan *span, ui8 blobIdx) {
Y_VERIFY(diskOrderNumber < DiskRequestsForOrderNumber.size());
@@ -381,7 +381,7 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) {
}
}
-void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData) {
+void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope &partData) {
Y_VERIFY(bool(id));
Y_VERIFY(id.PartId() == 0);
Y_VERIFY(id.BlobSize() != 0);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
index 72bfd73800c..3108cf0b691 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
@@ -51,7 +51,7 @@ struct TBlobState {
TIntervalSet<i32> Here; // Present in the Data buffer
void AddResponseData(ui32 fullSize, ui32 shift, TString &data);
- void AddPartToPut(TString &data);
+ void AddPartToPut(TRope &data);
TString ToString() const;
};
struct TWholeState : TState {
@@ -88,7 +88,7 @@ struct TBlobState {
void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
- void AddPartToPut(ui32 partIdx, TString &partData);
+ void AddPartToPut(ui32 partIdx, TRope &partData);
void MarkBlobReadyToPut(ui8 blobIdx = 0);
bool Restore(const TBlobStorageGroupInfo &info);
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
@@ -128,14 +128,14 @@ struct TDiskPutRequest {
ReasonAccelerate
};
const TLogoBlobID Id;
- TString Buffer;
+ TRope Buffer;
EPutReason Reason;
bool IsHandoff;
ui8 BlobIdx;
std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks;
NWilson::TSpan *Span;
- TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff,
+ TDiskPutRequest(const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff,
std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx)
: Id(id)
, Buffer(std::move(buffer))
@@ -160,7 +160,7 @@ struct TGroupDiskRequests {
TGroupDiskRequests(ui32 disks);
void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size);
- void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer,
+ void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
NWilson::TSpan *span, ui8 blobIdx);
};
@@ -205,7 +205,7 @@ struct TBlackboard {
{}
void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize);
- void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData);
+ void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope &partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data, bool keep, bool doNotKeep);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
index cb66db83460..8122e89e909 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
@@ -364,7 +364,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx,
}
bytes += put.Buffer.size();
lastItemCount++;
- vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); // FIXME: trace
+ vMultiPut->AddVPut(put.Id, put.Buffer.ConvertToString(), &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); // FIXME: trace
}
vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests));
++VMultiPutRequests;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index db13013f02b..01dd0a34fa7 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -387,14 +387,14 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
TEvBlobStorage::TEvPut *put;
- TString buffer = item.Buffer;
- char *data = buffer.Detach();
+ TRope buffer = item.Buffer;
+ char* data = buffer.GetContiguousSpanMut().data();
Decrypt(data, data, 0, buffer.size(), item.BlobId, *Info);
ev->Bunch.emplace_back(new IEventHandle(
TActorId() /*recipient*/,
item.Recipient,
- put = new TEvBlobStorage::TEvPut(item.BlobId, buffer, Deadline, HandleClass, Tactic),
+ put = new TEvBlobStorage::TEvPut(item.BlobId, std::move(buffer), Deadline, HandleClass, Tactic),
0 /*flags*/,
item.Cookie,
nullptr /*forwardOnNondelivery*/,
@@ -545,7 +545,7 @@ public:
const ui64 partSize = Info->Type.PartSize(blobId);
ui64 bufferSize = PutImpl.Blobs[idx].BufferSize;
- char *data = PutImpl.Blobs[idx].Buffer.Detach();
+ char *data = PutImpl.Blobs[idx].Buffer.GetContiguousSpanMut().data();
Encrypt(data, data, 0, bufferSize, blobId, *Info);
TDataPartSet &partSet = resume->PartSets[idx];
@@ -630,9 +630,8 @@ public:
bool splitDone = true;
for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) {
TDataPartSet &partSet = resume->PartSets[idx];
- TString &buffer = PutImpl.Blobs[idx].Buffer;
TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId;
- Info->Type.IncrementalSplitData((TErasureType::ECrcMode)blobId.CrcMode(), buffer, partSet);
+ Info->Type.IncrementalSplitData((TErasureType::ECrcMode)blobId.CrcMode(), PutImpl.Blobs[idx].Buffer, partSet);
if (partSet.IsSplitDone()) {
ReportBytes(partSet.MemoryConsumed - PutImpl.Blobs[idx].BufferSize);
} else {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index 2e3ab60fc30..5636a3daf14 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -46,7 +46,7 @@ private:
struct TBlobInfo {
TLogoBlobID BlobId;
- TString Buffer;
+ TRope Buffer;
ui64 BufferSize;
TActorId Recipient;
ui64 Cookie;
@@ -55,7 +55,7 @@ private:
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
NWilson::TSpan Span;
- TBlobInfo(TLogoBlobID id, TString buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId,
+ TBlobInfo(TLogoBlobID id, TRope&& buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId,
NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single)
: BlobId(id)
, Buffer(std::move(buffer))
@@ -115,7 +115,7 @@ public:
, EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy)
, Tactic(ev->Tactic)
{
- Blobs.emplace_back(ev->Id, std::move(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit),
+ Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit),
std::move(ev->ExtraBlockChecks), true);
auto& blob = Blobs.back();
@@ -143,7 +143,7 @@ public:
auto& msg = *ev->Get();
Y_VERIFY(msg.HandleClass == putHandleClass);
Y_VERIFY(msg.Tactic == tactic);
- Blobs.emplace_back(msg.Id, std::move(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId),
+ Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId),
std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false);
Deadline = Max(Deadline, msg.Deadline);
@@ -496,7 +496,7 @@ protected:
} else if constexpr (isVMultiPut) {
// this request MUST originate from the TEvPut, so the Span field must be filled in
Y_VERIFY(put.Span);
- outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, put.Span->GetTraceId());
+ outVPutEvents.back()->AddVPut(put.Id, put.Buffer.ConvertToString(), &cookie, put.ExtraBlockChecks, put.Span->GetTraceId());
}
if (put.IsHandoff) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
index efeedb94465..050b70846ef 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
@@ -291,14 +291,15 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState
Y_VERIFY(fullInterval.IsSubsetOf(state.Whole.Here),
"Can't put unrestored blob! Unexpected blob state# %s", state.ToString().c_str());
- TString wholeBuffer = TString::Uninitialized(state.Id.BlobSize());
- state.Whole.Data.Read(0, const_cast<char*>(wholeBuffer.data()), state.Id.BlobSize());
+ TRope wholeBuffer(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Uninitialized(state.Id.BlobSize())));
+ state.Whole.Data.Read(0, wholeBuffer.UnsafeGetContiguousSpanMut().data(), state.Id.BlobSize());
TDataPartSet partSet;
info.Type.SplitData((TErasureType::ECrcMode)state.Id.CrcMode(), wholeBuffer, partSet);
Y_VERIFY(partSet.Parts.size() == state.Parts.size());
for (ui32 partIdx = 0; partIdx < partSet.Parts.size(); ++partIdx) {
- state.AddPartToPut(partIdx, partSet.Parts[partIdx].OwnedString);
+ TRope data = partSet.Parts[partIdx].OwnedString;
+ state.AddPartToPut(partIdx, data);
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h
index d191f110060..9297db7a807 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h
@@ -223,7 +223,7 @@ public:
}
}
partSet.FullDataSize = state.Id.BlobSize();
- TString whole;
+ TRope whole;
info.Type.RestoreData((TErasureType::ECrcMode)state.Id.CrcMode(), partSet, whole, false, true, false);
TIntervalSet<i32> missing(state.Whole.NotHere);
@@ -231,7 +231,7 @@ public:
auto [begin, end] = *it;
i32 offset = begin;
i32 size = end - begin;
- const char *source = whole.data() + offset;
+ const char *source = whole.GetContiguousSpan().data() + offset;
// Cerr << "LINE " << __LINE__ << " copy whole [" << offset << ", " << (offset + size) << ")" << Endl;
state.Whole.Data.Write(offset, source, size);
state.Whole.Here.Add(offset, offset + size);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
index 4442c48218b..0fca82976e0 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
@@ -39,7 +39,7 @@ public:
}
protected:
- TString GetDataBuffer(TBlobState& state, const TBlobStorageGroupInfo& info) {
+ TRope GetDataBuffer(TBlobState& state, const TBlobStorageGroupInfo& info) {
for (ui32 i = 0; i < info.Type.TotalPartCount(); ++i) {
if (info.Type.PartSize(TLogoBlobID(state.Id, i + 1)) && state.Parts[i].Data.IsMonolith()) {
return state.Parts[i].Data.GetMonolith();
@@ -47,11 +47,11 @@ protected:
}
const TIntervalVec<i32> interval(0, state.Id.BlobSize());
Y_VERIFY(interval.IsSubsetOf(state.Whole.Here), "missing blob data State# %s", state.ToString().data());
- TString wholeBuffer = TString::Uninitialized(state.Id.BlobSize());
- state.Whole.Data.Read(0, const_cast<char*>(wholeBuffer.data()), state.Id.BlobSize());
+ TRope wholeBuffer(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Uninitialized(state.Id.BlobSize())));
+ state.Whole.Data.Read(0, wholeBuffer.UnsafeGetContiguousSpanMut().data(), state.Id.BlobSize());
TDataPartSet partSet;
info.Type.SplitData((TErasureType::ECrcMode)state.Id.CrcMode(), wholeBuffer, partSet);
- TString s = partSet.Parts[0].OwnedString;
+ TRope s = partSet.Parts[0].OwnedString;
state.Parts[0].Data.SetMonolith(s);
return s;
}
diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h
index 16783993f28..ead4f90bb6b 100644
--- a/ydb/core/blobstorage/dsproxy/mock/model.h
+++ b/ydb/core/blobstorage/dsproxy/mock/model.h
@@ -13,11 +13,15 @@ namespace NFake {
using TStep = ui32;
struct TBlob {
+ TBlob(TRope buffer)
+ : Buffer(std::move(buffer))
+ {}
+
TBlob(TString buffer)
: Buffer(std::move(buffer))
{}
- TString Buffer;
+ TRope Buffer;
bool Keep = false;
bool DoNotKeep = false;
};
@@ -125,7 +129,7 @@ namespace NFake {
const ui32 size = Min<ui32>(maxSize, !query.Size ? Max<ui32>() : query.Size);
// calculate substring; use 0 instead of query.Shift because it may exceed the buffer
- response.Buffer = data.Buffer.substr(size ? query.Shift : 0, size);
+ response.Buffer = data.Buffer.ConvertToString().substr(size ? query.Shift : 0, size);
} else {
// ensure this blob is not under GC
Y_VERIFY(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data());
@@ -173,7 +177,7 @@ namespace NFake {
lastBlobId.Generation() >= msg->MinGeneration) {
TString buffer;
if (msg->ReadBody) {
- buffer = it->second.Buffer;
+ buffer = it->second.Buffer.ConvertToString();
}
result = std::make_unique<TEvBlobStorage::TEvDiscoverResult>(lastBlobId, msg->MinGeneration, buffer,
@@ -206,7 +210,7 @@ namespace NFake {
if (from <= to) {
// forward scan
for (auto it = Blobs.lower_bound(from); it != Blobs.end() && it->first <= to; ++it) {
- process(it->first, it->second.Buffer);
+ process(it->first, it->second.Buffer.ConvertToString());
}
} else {
// reverse scan
@@ -215,7 +219,7 @@ namespace NFake {
if (it->first < to) {
break;
} else {
- process(it->first, it->second.Buffer);
+ process(it->first, it->second.Buffer.ConvertToString());
}
}
}
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h
index 4c7bd5f1eef..fd7628ef398 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h
@@ -305,7 +305,7 @@ public:
layout.ForEachPartOfDisk(type, [&](ui32 partIdx, ui32 idxInSubgroup) {
const TLogoBlobID partId(id, partIdx + 1);
const NKikimrProto::EReplyStatus res = PutToVDisk(orderNums[idxInSubgroup], partId,
- parts.Parts[partIdx].OwnedString);
+ parts.Parts[partIdx].OwnedString.ConvertToString());
UNIT_ASSERT_VALUES_EQUAL(res, NKikimrProto::OK);
disksWrittenTo |= {topology, topology->GetVDiskId(idxInSubgroup)};
});
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
index 0e96d053298..deef7756d65 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
@@ -336,7 +336,7 @@ void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa
UNIT_ASSERT_VALUES_EQUAL(put->Id, args.PatchedId);
UNIT_ASSERT_VALUES_EQUAL(handle->Cookie, args.OriginalId.Hash());
TString patchedBuffer = MakePatchedBuffer(args);
- UNIT_ASSERT_VALUES_EQUAL(put->Buffer, patchedBuffer);
+ UNIT_ASSERT_VALUES_EQUAL(put->Buffer.ConvertToString(), patchedBuffer);
std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult = std::make_unique<TEvBlobStorage::TEvPutResult>(
resultStatus, args.PatchedId, args.StatusFlags, args.CurrentGroupId, args.ApproximateFreeSpaceShare);
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h
index 0a17c046e71..dc095900af2 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h
@@ -486,12 +486,12 @@ public:
Info->Type.SplitData((TErasureType::ECrcMode)id.CrcMode(), encryptedData, partSet);
for (ui32 i = 0; i < totalParts; ++i) {
TLogoBlobID pId(id, i + 1);
- TString pData = partSet.Parts[i].OwnedString;
+ TRope pData = partSet.Parts[i].OwnedString;
if (i < handoffsToUse) {
Y_VERIFY(totalParts + i < totalvd);
- GetVDisk(vDisksId[totalParts + i].FailDomain, vDisksId[totalParts + i].VDisk).Put(pId, pData);
+ GetVDisk(vDisksId[totalParts + i].FailDomain, vDisksId[totalParts + i].VDisk).Put(pId, pData.ConvertToString());
} else {
- GetVDisk(vDisksId[i].FailDomain, vDisksId[i].VDisk).Put(pId, pData);
+ GetVDisk(vDisksId[i].FailDomain, vDisksId[i].VDisk).Put(pId, pData.ConvertToString());
}
}
}
diff --git a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
index 43183478c37..07f5948f3ae 100644
--- a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
@@ -1420,7 +1420,7 @@ class TTestBlobStorageProxyVPutVGet : public TTestBlobStorageProxy {
TBlobStorageGroupType type(ErasureSpecies);
NKikimr::TDataPartSet partSet;
type.SplitData((TErasureType::ECrcMode)blobId.CrcMode(), encryptedTestData2, partSet);
- TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString);
+ TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString.ConvertToString());
VERBOSE_COUT("Done");
Env->DoneEvent.Signal();
@@ -1713,7 +1713,7 @@ class TTestBlobStorageProxyVGet : public TTestBlobStorageProxy {
TBlobStorageGroupType type(ErasureSpecies);
NKikimr::TDataPartSet partSet;
type.SplitData((TErasureType::ECrcMode)blobId.CrcMode(), encryptedTestData2, partSet);
- TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString);
+ TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString.ConvertToString());
isNoData = false;
VERBOSE_COUT("Done");
diff --git a/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp b/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp
index 6cf73e7276b..24e0ccca196 100644
--- a/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp
+++ b/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/interconnect_channels.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/actors/util/rope.h>
namespace NKikimr {
@@ -184,7 +185,9 @@ namespace NKikimr {
}
void IssuePutRequest(const TLogoBlobID& logoBlobId, ui64 cookie, const TActorContext& ctx) {
- TString whole(logoBlobId.BlobSize(), 'X');
+ TSharedData data = TSharedData::Uninitialized(logoBlobId.BlobSize());
+ std::fill_n(data.mutable_data(), logoBlobId.BlobSize(), 'X');
+ TRope whole(MakeIntrusive<TRopeSharedDataBackend>(data));
TDataPartSet parts;
GType.SplitData((TErasureType::ECrcMode)logoBlobId.CrcMode(), whole, parts);
auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(logoBlobId,
diff --git a/ydb/core/blobstorage/testload/test_load_write.cpp b/ydb/core/blobstorage/testload/test_load_write.cpp
index 01e6839b10e..37de3a35a00 100644
--- a/ydb/core/blobstorage/testload/test_load_write.cpp
+++ b/ydb/core/blobstorage/testload/test_load_write.cpp
@@ -291,7 +291,7 @@ class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActo
const ui32 size = 1;
const ui32 lastStep = Max<ui32>();
const TLogoBlobID id(TabletId, Generation, lastStep, Channel, size, 0);
- const TString buffer = GenerateBuffer(id);
+ const TSharedData buffer = GenerateBuffer<TSharedData>(id);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass);
auto callback = [this] (IEventBase *event, const TActorContext& ctx) {
@@ -531,7 +531,7 @@ class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActo
putHandleClass = PutHandleClass;
}
const TLogoBlobID id(TabletId, Generation, WriteStep, Channel, size, Cookie);
- const TString buffer = GenerateBuffer(id);
+ const TSharedData buffer = GenerateBuffer<TSharedData>(id);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), putHandleClass);
const ui64 writeQueryId = ++WriteQueryId;
@@ -741,8 +741,9 @@ class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActo
NextReadInQueue = false;
}
- static TString GenerateBuffer(const TLogoBlobID& id) {
- return GenDataForLZ4(id.BlobSize());
+ template <class ResultContainer = TString>
+ static ResultContainer GenerateBuffer(const TLogoBlobID& id) {
+ return GenDataForLZ4<ResultContainer>(id.BlobSize());
}
};
diff --git a/ydb/core/blobstorage/ut_blobstorage/main.cpp b/ydb/core/blobstorage/ut_blobstorage/main.cpp
index 1aa4999c8c8..73b302815fd 100644
--- a/ydb/core/blobstorage/ut_blobstorage/main.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/main.cpp
@@ -66,7 +66,7 @@ Y_UNIT_TEST_SUITE(DonorMode) {
break;
}
}
- TString part = partSet.Parts[blobId.PartId() - 1].OwnedString;
+ TRope part = partSet.Parts[blobId.PartId() - 1].OwnedString;
// scan through existing stored blobs and ensure they are intact
for (const auto& [blobId, part] : stored) {
@@ -75,19 +75,19 @@ Y_UNIT_TEST_SUITE(DonorMode) {
}
// add it to stored set
- stored.emplace(blobId, part);
+ stored.emplace(blobId, part.ConvertToString());
// get the blob location for this group
const auto& [vdiskActorId, vdiskId] = getBlobLocation(info, blobId);
// first, check that there is no such blob in the disk
- env.CheckBlob(vdiskActorId, vdiskId, blobId, part, NKikimrProto::NODATA);
+ env.CheckBlob(vdiskActorId, vdiskId, blobId, part.ConvertToString(), NKikimrProto::NODATA);
// put the blob to the disk
- env.PutBlob(vdiskId, blobId, part);
+ env.PutBlob(vdiskId, blobId, part.ConvertToString());
// check it appeared
- env.CheckBlob(vdiskActorId, vdiskId, blobId, part);
+ env.CheckBlob(vdiskActorId, vdiskId, blobId, part.ConvertToString());
// wait for sync
env.WaitForSync(info, blobId);
diff --git a/ydb/core/blobstorage/ut_blobstorage/osiris.cpp b/ydb/core/blobstorage/ut_blobstorage/osiris.cpp
index 989452e3087..432d078bf58 100644
--- a/ydb/core/blobstorage/ut_blobstorage/osiris.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/osiris.cpp
@@ -43,7 +43,7 @@ bool DoTestCase(TBlobStorageGroupType::EErasureSpecies erasure, const std::set<s
const TActorId& queueId = env.CreateQueueActor(info->GetVDiskId(orderNum), NKikimrBlobStorage::PutTabletLog, 0);
const TActorId& sender = env.Runtime->AllocateEdgeActor(queueId.NodeId());
const TLogoBlobID blobId(id, partIdx + 1);
- const TString& buffer = type.PartSize(blobId) ? parts.Parts[partIdx].OwnedString : TString();
+ TRope buffer = type.PartSize(blobId) ? parts.Parts[partIdx].OwnedString : TString();
env.Runtime->Send(new IEventHandle(queueId, sender, new TEvBlobStorage::TEvVPut(blobId, buffer,
info->GetVDiskId(orderNum), false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog)),
sender.NodeId());
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
index 9568165e383..3c13238ad54 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
@@ -206,7 +206,7 @@ namespace NKikimr {
Y_VERIFY(partSet.PartSet.FullDataSize == id.BlobSize());
// PartSet contains some data, other data will be restored and written in the same PartSet
- TString recoveredData;
+ TRope recoveredData;
const ui32 incomingMask = partSet.PartSet.PartsMask;
if (canRestore && needToRestore) {
groupType.RestoreData((TErasureType::ECrcMode)id.CrcMode(), partSet.PartSet, recoveredData,
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
index 02e8a077893..83d376047b0 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
@@ -40,7 +40,7 @@ namespace NKikimr {
for (ui32 i = 0; i < info->Type.TotalPartCount(); ++i) {
for (ui32 k = 0; k < vdisks.size(); ++k) {
if (varray[i] == vdisks[k]) {
- diskvec[k] = parts.Parts[i].OwnedString;
+ diskvec[k] = parts.Parts[i].OwnedString.ConvertToString();
break;
}
}
diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h b/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h
index 2a06f7df1ae..57e892ada4d 100644
--- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h
+++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h
@@ -48,7 +48,7 @@ namespace NKikimr {
Y_VERIFY(id.PartId());
const ui32 partIdx = id.PartId() - 1;
Y_VERIFY(PartSet.PartsMask & (1 << partIdx));
- return PartSet.Parts[partIdx].OwnedString;
+ return PartSet.Parts[partIdx].OwnedString.ConvertToString();
}
NMatrix::TVectorType GetAvailableParts() const {
diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp
index 0348ada82de..d78757e78ac 100644
--- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp
+++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp
@@ -123,7 +123,7 @@ namespace NKikimr {
}
const ui32 numParts = PopCount(item.PartSet.PartsMask);
if (numParts >= Info->Type.MinimalRestorablePartCount()) {
- TString buffer;
+ TRope buffer;
Info->Type.RestoreData((TErasureType::ECrcMode)item.BlobId.CrcMode(), item.PartSet, buffer, true,
false, true);
item.PartSet.PartsMask = (1u << item.PartSet.Parts.size()) - 1;
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h
index 2b04da97606..489a8e97a78 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h
@@ -65,7 +65,7 @@ namespace NKikimr {
key.LogoBlobID());
TRope holder;
TRope part = blob.GetPart(i, &holder);
- PartSet.Parts[i].ReferenceTo(part.ConvertToString());
+ PartSet.Parts[i].ReferenceTo(part);
} else {
CorruptedParts.push_back(*part);
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp
index cb9f9708a11..7e8e7244756 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp
@@ -229,6 +229,12 @@ namespace NKikimr {
{
}
+ TBlob(const TLogoBlobID &blob, ui8 partId, const TRope &buffer)
+ : BlobId(blob, partId)
+ , Buffer(buffer.ConvertToString())
+ {
+ }
+
TBlob(const TLogoBlobID &blob, ui8 partId, ui32 bufferSize)
: BlobId(blob, partId)
{
@@ -759,7 +765,7 @@ namespace NKikimr {
ui32 patchedPartId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId()).PartId();
TVector<TDiff> xorDiffs;
- const ui8 *buffer = reinterpret_cast<const ui8*>(partSet.Parts[fromPartId - 1].OwnedString.data());
+ const ui8 *buffer = reinterpret_cast<const ui8*>(partSet.Parts[fromPartId - 1].OwnedString.GetContiguousSpan().data());
testData.GType.MakeXorDiff(TErasureType::CrcModeNone, data.size(), buffer, diffSet.PartDiffs[fromPartId - 1].Diffs, &xorDiffs);
UNIT_ASSERT_VALUES_EQUAL_C(xorDiffs.size(), record.DiffsSize(), "from# " << (ui32)fromPartId);
diff --git a/ydb/core/erasure/erasure.cpp b/ydb/core/erasure/erasure.cpp
index a41b0279328..fc1d8e8a206 100644
--- a/ydb/core/erasure/erasure.cpp
+++ b/ydb/core/erasure/erasure.cpp
@@ -44,7 +44,7 @@ static TString DebugFormatBits(ui64 value) {
namespace NKikimr {
-static void Refurbish(TString &str, ui64 size) {
+static void Refurbish(TRope &str, ui64 size) {
if (str.size() != size) {
str = TString::Uninitialized(size);
}
@@ -1378,12 +1378,12 @@ void PadAndCrcParts(TErasureType::ECrcMode crcMode, const TBlockParams &p, TData
}
template <bool isStripe>
-void StarBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString &buffer,
+void StarBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope &buffer,
TDataPartSet &outPartSet) {
TBlockParams p(crcMode, type, buffer.size());
// Prepare input data pointers
- p.PrepareInputDataPointers<isStripe>(const_cast<char*>(buffer.data()));
+ p.PrepareInputDataPointers<isStripe>(buffer.UnsafeGetContiguousSpanMut().data());
outPartSet.FullDataSize = buffer.size();
outPartSet.PartsMask = ~((~(ui32)0) << p.TotalParts);
@@ -1399,12 +1399,12 @@ void StarBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, co
}
template <bool isStripe>
-void EoBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString &buffer,
+void EoBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope &buffer,
TDataPartSet &outPartSet) {
TBlockParams p(crcMode, type, buffer.size());
// Prepare input data pointers
- p.PrepareInputDataPointers<isStripe>(const_cast<char*>(buffer.data()));
+ p.PrepareInputDataPointers<isStripe>(buffer.UnsafeGetContiguousSpanMut().data());
// Prepare if not yet
if (!outPartSet.IsSplitStarted()) {
@@ -1429,12 +1429,12 @@ void EoBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, cons
}
template <bool isStripe>
-void XorBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString& buffer,
+void XorBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope& buffer,
TDataPartSet& outPartSet) {
TBlockParams p(crcMode, type, buffer.size());
// Prepare input data pointers
- p.PrepareInputDataPointers<isStripe>(const_cast<char*>(buffer.data()));
+ p.PrepareInputDataPointers<isStripe>(buffer.UnsafeGetContiguousSpanMut().data());
outPartSet.FullDataSize = buffer.size();
outPartSet.PartsMask = ~((~(ui32)0) << p.TotalParts);
@@ -1451,7 +1451,7 @@ void XorBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, con
template <bool isStripe, bool restoreParts, bool restoreFullData, bool restoreParityParts>
void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDataPartSet& partSet) {
- TString &outBuffer = partSet.FullDataFragment.OwnedString;
+ TRope &outBuffer = partSet.FullDataFragment.OwnedString;
ui32 totalParts = type.TotalPartCount();
Y_VERIFY(partSet.Parts.size() >= totalParts);
@@ -1515,17 +1515,17 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD
(!restoreParts && missingDataPartIdxA >= p.TotalParts - 2)) {
VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl);
if (isStripe) {
- p.PrepareInputDataPointers<isStripe>(outBuffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data());
p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts);
} else {
- p.GlueBlockParts(outBuffer.Detach(), partSet);
+ p.GlueBlockParts(outBuffer.GetContiguousSpanMut().data(), partSet);
}
return;
}
// Prepare output data pointers
if (restoreFullData) {
- p.PrepareInputDataPointers<isStripe>(outBuffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data());
}
// Consider failed disk cases
@@ -1541,15 +1541,15 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD
// TODO: use 2-nd part of 'eo-split' to restore m+1 part
// TODO: 1-pass
if (missingDataPartIdxA <= p.DataParts && missingDataPartIdxB >= p.TotalParts - 1) {
- TString temp;
- TString &buffer = restoreFullData ? outBuffer : temp;
+ TRope temp;
+ TRope &buffer = restoreFullData ? outBuffer : temp;
if (!restoreFullData && restoreParts && missingDataPartIdxB == p.TotalParts - 1) {
// The (f1) case, but no full data needed, only parts
TRACE("case# f1" << Endl);
VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl);
if (isStripe) {
Refurbish(buffer, dataSize);
- p.PrepareInputDataPointers<isStripe>(buffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(buffer.GetContiguousSpanMut().data());
}
p.XorRestorePart<isStripe, true, false, false>(partSet, missingDataPartIdxA);
TRACE("case# f1 split" << Endl);
@@ -1583,8 +1583,8 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD
if (missingDataPartIdxA == p.TotalParts - 1 && missingDataPartIdxB == p.TotalParts) {
TRACE("case# c" << Endl);
VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl);
- TString temp;
- TString &buffer = restoreFullData ? outBuffer : temp;
+ TRope temp;
+ TRope &buffer = restoreFullData ? outBuffer : temp;
if (!restoreFullData) {
TRACE(__LINE__ << Endl);
if (!restoreParityParts) {
@@ -1598,12 +1598,12 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD
}
if (isStripe) {
TRACE(__LINE__ << Endl);
- p.PrepareInputDataPointers<isStripe>(buffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(buffer.GetContiguousSpanMut().data());
p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts);
} else {
TRACE(__LINE__ << Endl);
if (restoreFullData) {
- p.GlueBlockParts(buffer.Detach(), partSet);
+ p.GlueBlockParts(buffer.GetContiguousSpanMut().data(), partSet);
}
}
if (restoreParts) {
@@ -1650,7 +1650,7 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD
// restorePartiyParts may be set only togehter with restore parts
template <bool isStripe, bool restoreParts, bool restoreFullData, bool restoreParityParts>
void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDataPartSet& partSet) {
- TString &outBuffer = partSet.FullDataFragment.OwnedString;
+ TRope &outBuffer = partSet.FullDataFragment.OwnedString;
ui32 totalParts = type.TotalPartCount();
Y_VERIFY(partSet.Parts.size() == totalParts);
@@ -1728,7 +1728,7 @@ void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type,
TBlockParams p(crcMode, type, dataSize);
if (restoreFullData) {
Refurbish(outBuffer, dataSize);
- p.PrepareInputDataPointers<isStripe>(outBuffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data());
} else if (missingDataPartCount == 0) {
return;
}
@@ -1738,10 +1738,10 @@ void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type,
(!restoreParts && missingDataPartIdxA >= p.DataParts)) {
VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl);
if (isStripe) {
- p.PrepareInputDataPointers<isStripe>(outBuffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data());
p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts);
} else {
- p.GlueBlockParts(outBuffer.Detach(), partSet);
+ p.GlueBlockParts(outBuffer.GetContiguousSpanMut().data(), partSet);
}
return;
}
@@ -1880,7 +1880,7 @@ void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type,
template <bool isStripe, bool restoreParts, bool restoreFullData, bool restoreParityParts>
void XorBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDataPartSet &partSet) {
- TString &outBuffer = partSet.FullDataFragment.OwnedString;
+ TRope &outBuffer = partSet.FullDataFragment.OwnedString;
ui32 totalParts = type.TotalPartCount();
Y_VERIFY(partSet.Parts.size() == totalParts,
"partSet.Parts.size(): %" PRIu64 " totalParts: %" PRIu32 " erasure: %s",
@@ -1919,16 +1919,16 @@ void XorBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, T
if (missingDataPartCount == 0 ||
(missingDataPartCount == 1 && !restoreParts && missingDataPartIdx == p.TotalParts - 1)) {
if (isStripe) {
- p.PrepareInputDataPointers<isStripe>(outBuffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data());
p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts);
} else {
- p.GlueBlockParts(outBuffer.Detach(), partSet);
+ p.GlueBlockParts(outBuffer.GetContiguousSpanMut().data(), partSet);
}
return;
}
// Prepare output data pointers
if (restoreFullData) {
- p.PrepareInputDataPointers<isStripe>(outBuffer.Detach());
+ p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data());
}
p.XorRestorePart<isStripe, restoreParts, restoreFullData, restoreParityParts>(partSet, missingDataPartIdx);
@@ -2502,7 +2502,7 @@ ui64 TErasureType::BlockSplitPartUsedSize(ui64 dataSize, ui32 partIdx) const {
return lastPartSize;
}
-void MirrorSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString& buffer,
+void MirrorSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope& buffer,
TDataPartSet& outPartSet) {
outPartSet.FullDataSize = buffer.size();
outPartSet.Parts.resize(type.TotalPartCount());
@@ -2519,12 +2519,12 @@ void MirrorSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const
case TErasureType::CrcModeWholePart:
{
ui64 partSize = type.PartSize(crcMode, buffer.size());
- TString part = TString::Uninitialized(partSize);
- char *dst = part.Detach();
+ TRope part = TString::Uninitialized(partSize);
+ char *dst = part.GetContiguousSpanMut().data();
if (buffer.size() || part.size()) {
Y_VERIFY(part.size() >= buffer.size() + sizeof(ui32), "Part size too small, buffer size# %" PRIu64
" partSize# %" PRIu64, (ui64)buffer.size(), (ui64)partSize);
- memcpy(dst, buffer.data(), buffer.size());
+ memcpy(dst, buffer.GetContiguousSpan().data(), buffer.size());
PadAndCrcAtTheEnd(dst, buffer.size(), part.size());
}
for (ui32 partIdx = 0; partIdx <= parityParts; ++partIdx) {
@@ -2558,11 +2558,15 @@ void MirrorRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDa
partSet.FullDataFragment.ReferenceTo(partSet.Parts[partIdx].OwnedString);
return;
case TErasureType::CrcModeWholePart:
- TString outBuffer = partSet.Parts[partIdx].OwnedString;
- outBuffer.Detach();
- Y_VERIFY(outBuffer.size() >= partSet.FullDataSize, "Unexpected outBuffer.size# %" PRIu64
- " fullDataSize# %" PRIu64, (ui64)outBuffer.size(), (ui64)partSet.FullDataSize);
- outBuffer.resize(partSet.FullDataSize); // To pad with zeroes!
+ TRope outBuffer = partSet.Parts[partIdx].OwnedString;
+ outBuffer.GetContiguousSpanMut(); // Detach
+ if(outBuffer.size() != partSet.FullDataSize) {
+ TString newOutBuffer(outBuffer.GetContiguousSpan().data(), outBuffer.size()); //FIXME(innokentii) potentially redundant allocation for resize
+ Y_VERIFY(outBuffer.size() >= partSet.FullDataSize, "Unexpected outBuffer.size# %" PRIu64
+ " fullDataSize# %" PRIu64, (ui64)outBuffer.size(), (ui64)partSet.FullDataSize);
+ newOutBuffer.resize(partSet.FullDataSize); // To pad with zeroes!
+ outBuffer = newOutBuffer;
+ }
partSet.FullDataFragment.ReferenceTo(outBuffer);
return;
}
@@ -2586,14 +2590,14 @@ static void VerifyPartSizes(TDataPartSet& partSet, size_t definedPartEndIdx) {
}
}
-void TErasureType::SplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const {
+void TErasureType::SplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const {
outPartSet.ResetSplit();
do {
IncrementalSplitData(crcMode, buffer, outPartSet);
} while (!outPartSet.IsSplitDone());
}
-void TErasureType::IncrementalSplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const {
+void TErasureType::IncrementalSplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const {
const TErasureParameters& erasure = ErasureSpeciesParameters[ErasureSpecies];
switch (erasure.ErasureFamily) {
case TErasureType::ErasureMirror:
@@ -2968,7 +2972,7 @@ void TErasureType::ApplyXorDiff(ECrcMode crcMode, ui32 dataSize, ui8 *dst,
}
}
-void TErasureType::RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TString& outBuffer, bool restoreParts,
+void TErasureType::RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TRope& outBuffer, bool restoreParts,
bool restoreFullData, bool restoreParityParts) const {
partSet.FullDataFragment.ReferenceTo(outBuffer);
RestoreData(crcMode, partSet, restoreParts, restoreFullData, restoreParityParts);
diff --git a/ydb/core/erasure/erasure.h b/ydb/core/erasure/erasure.h
index 35bd68c2d2f..d7d4da114b8 100644
--- a/ydb/core/erasure/erasure.h
+++ b/ydb/core/erasure/erasure.h
@@ -12,6 +12,7 @@
#include <util/generic/list.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <library/cpp/actors/util/rope.h>
namespace NKikimr {
@@ -57,7 +58,7 @@ struct TPartDiffSet {
// Part fragment, contains only some data
struct TPartFragment {
- TString OwnedString; // Used for ownership only
+ TRope OwnedString; // Used for ownership only
char *Bytes = nullptr;
ui64 Offset = 0; // Relative to part beginning
ui64 Size = 0;
@@ -76,32 +77,34 @@ struct TPartFragment {
}
void UninitializedOwnedWhole(ui64 size) {
- OwnedString = TString::Uninitialized(size);
- Bytes = OwnedString.Detach();
+ OwnedString = TString::Uninitialized(size); //FIXME(innokentii)
+ Bytes = OwnedString.GetContiguousSpanMut().data();
Offset = 0;
Size = size;
PartSize = size;
}
- void ResetToWhole(const TString &whole) {
+ void ResetToWhole(const TRope &whole) {
OwnedString = whole;
- Bytes = OwnedString.Detach();
+ Bytes = OwnedString.GetContiguousSpanMut().data();
Offset = 0;
- Size = whole.size();
+ Size = OwnedString.size();
PartSize = Size;
}
- void ReferenceTo(const TString &whole) {
+ void ReferenceTo(const TRope &whole) {
+ Y_VERIFY(whole.IsContiguous());
OwnedString = whole;
- Bytes = const_cast<char*>(whole.data());
+ Bytes = OwnedString.UnsafeGetContiguousSpanMut().data();
Offset = 0;
- Size = whole.size();
+ Size = OwnedString.size();
PartSize = Size;
}
- void ReferenceTo(const TString &piece, ui64 offset, ui64 size, ui64 partSize) {
+ void ReferenceTo(const TRope &piece, ui64 offset, ui64 size, ui64 partSize) {
+ Y_VERIFY(piece.IsContiguous());
OwnedString = piece;
- Bytes = const_cast<char*>(piece.data());
+ Bytes = OwnedString.UnsafeGetContiguousSpanMut().data();
Offset = offset;
Y_VERIFY(size <= piece.size());
Size = size;
@@ -109,6 +112,17 @@ struct TPartFragment {
PartSize = partSize;
}
+ void ReferenceTo(const TString &whole) {
+ TRope rope(whole);
+ ReferenceTo(rope);
+ }
+
+ void ReferenceTo(const TString &piece, ui64 offset, ui64 size, ui64 partSize) {
+ TRope rope(piece);
+ ReferenceTo(rope, offset, size, partSize);
+ }
+
+
char *GetDataAt(ui64 get_offset) const {
Y_VERIFY_DEBUG(Size);
Y_VERIFY_DEBUG(get_offset >= Offset, "%s", (TStringBuilder() << "get_offset# " << get_offset
@@ -130,19 +144,12 @@ struct TPartFragment {
}
void Detach() {
- char *newBytes = nullptr;
- if (Bytes) {
+ if(Bytes) {
char *oldBytes = Bytes;
- char *oldData = const_cast<char*>(OwnedString.data());
+ char *oldData = OwnedString.UnsafeGetContiguousSpanMut().data();
intptr_t bytesOffset = oldBytes - oldData;
- OwnedString.Detach();
- if (OwnedString.data() != oldData) {
- newBytes = const_cast<char*>(OwnedString.data()) + bytesOffset;
- } else {
- newBytes = oldBytes;
- }
+ Bytes = OwnedString.GetContiguousSpanMut().data() + bytesOffset;
}
- Bytes = newBytes;
}
};
@@ -317,8 +324,16 @@ struct TErasureType {
ui64 SuggestDataSize(ECrcMode crcMode, ui64 partSize, bool roundDown) const;
ui32 Prime() const;
- void SplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const;
- void IncrementalSplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const;
+ void SplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const;
+ void SplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const {
+ TRope rope = buffer;
+ SplitData(crcMode, rope, outPartSet);
+ }
+ void IncrementalSplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const;
+ void IncrementalSplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const {
+ TRope rope = buffer;
+ IncrementalSplitData(crcMode, rope, outPartSet);
+ }
void SplitDiffs(ECrcMode crcMode, ui32 dataSize, const TVector<TDiff> &diffs, TPartDiffSet& outDiffSet) const;
void ApplyDiff(ECrcMode crcMode, ui8 *dst, const TVector<TDiff> &diffs) const;
@@ -327,7 +342,7 @@ struct TErasureType {
void ApplyXorDiff(ECrcMode crcMode, ui32 dataSize, ui8 *dst,
const TVector<TDiff> &diffs, ui8 fromPart, ui8 toPart) const;
- void RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TString& outBuffer, bool restoreParts,
+ void RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TRope& outBuffer, bool restoreParts,
bool restoreFullData, bool restoreParityParts) const;
void RestoreData(ECrcMode crcMode, TDataPartSet& partSet, bool restoreParts, bool restoreFullData,
bool restoreParityParts) const;
diff --git a/ydb/core/erasure/erasure_perf_test.cpp b/ydb/core/erasure/erasure_perf_test.cpp
index 37b1a3bab64..ae4c79c946f 100644
--- a/ydb/core/erasure/erasure_perf_test.cpp
+++ b/ydb/core/erasure/erasure_perf_test.cpp
@@ -99,7 +99,10 @@ std::pair<double, double> MeasureTime(TErasureType &type, TVector<ui32> &missedP
const ui32 partSize = type.PartSize(TErasureType::CrcModeNone, dataSize);
partSet.Parts.resize(type.TotalPartCount());
for (ui32 i = 0; i < type.TotalPartCount(); ++i) {
- partSet.Parts[i].ReferenceTo(partSet.Parts[i].OwnedString.resize(partSize));
+ auto data = partSet.Parts[i].OwnedString.GetContiguousSpan();
+ TString newOwnedString(partSize, '\0');
+ memcpy(newOwnedString.Detach(), data.data(), data.size());
+ partSet.Parts[i].ReferenceTo(partSet.Parts[i].OwnedString);
}
}
if (measureSplit) {
@@ -113,13 +116,10 @@ std::pair<double, double> MeasureTime(TErasureType &type, TVector<ui32> &missedP
time += timer.PassedReset() / attempts;
}
if (isRestoreFullData) {
- std::vector<TString> restoredData;
+ std::vector<TRope> restoredData;
restoredData.resize(attempts);
for (auto& restored : restoredData) {
- restored.resize(dataSize);
- for (ui64 i = 0; i < dataSize; ++i) {
- restored[i] = 0;
- }
+ restored = TString(dataSize, '\0');
}
// Remove the 'missing' parts
for (auto& partSet : partSets) {
@@ -136,7 +136,7 @@ std::pair<double, double> MeasureTime(TErasureType &type, TVector<ui32> &missedP
time += timer.PassedReset() / attempts;
}
for (size_t i = 0; i < attempts; ++i) {
- UNIT_ASSERT_EQUAL(originalData[i], restoredData[i]);
+ UNIT_ASSERT_EQUAL(TRope(originalData[i]), restoredData[i]);
}
}
times.push_back(time);
@@ -250,7 +250,7 @@ void RopeMeasureSplitTime(TErasureType &type, ui64 dataSize, const TString& buff
type.SplitData(TErasureType::CrcModeNone, rope, partSet);
if (convertToRope) {
for (int i = 0; i < 6; ++i) {
- ropes[i] = RopeFromStringReference(std::move(partSet.Parts[i].OwnedString));
+ ropes[i] = partSet.Parts[i].OwnedString;
}
}
#ifdef LONG_TEST
diff --git a/ydb/core/erasure/erasure_ut.cpp b/ydb/core/erasure/erasure_ut.cpp
index 2473bd16506..731230ff304 100644
--- a/ydb/core/erasure/erasure_ut.cpp
+++ b/ydb/core/erasure/erasure_ut.cpp
@@ -48,7 +48,7 @@ void TestMissingPartWithRandomData(TErasureType &groupType, ui32 *missingPartIdx
partSet.Parts[missingPartIdx[i]].clear();
}
// Restore the data
- TString restoredString;
+ TRope restoredString;
groupType.RestoreData(TErasureType::CrcModeNone, partSet, restoredString,
isRestoreParts, isRestoreFullData, isRestoreParts);
@@ -191,7 +191,7 @@ void RunTestDiff(TErasureType &groupType, ui32 dataSize, const TString &testStri
}
for (ui32 partIdx = 0; partIdx < dataParts + parityParts; ++partIdx) {
- UNIT_ASSERT_STRINGS_EQUAL(partSet.Parts[partIdx].OwnedString, resultPartSet.Parts[partIdx].OwnedString);
+ UNIT_ASSERT_STRINGS_EQUAL(partSet.Parts[partIdx].OwnedString.ConvertToString(), resultPartSet.Parts[partIdx].OwnedString.ConvertToString());
}
}
@@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
UNIT_ASSERT_EQUAL_C(partSet.Parts[i].size(), expectedParts[i].size(), Sprintf("%lu == %lu",
partSet.Parts[i].size(), expectedParts[i].size()));
for (ui32 j = 0; j < partSet.Parts[i].size(); ++j) {
- UNIT_ASSERT_EQUAL( (ui8)partSet.Parts[i].OwnedString[j], expectedParts[i][j]);
+ UNIT_ASSERT_EQUAL( (ui8)partSet.Parts[i].OwnedString.GetContiguousSpan().data()[j], expectedParts[i][j]);
}
}
}
@@ -391,7 +391,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
(isRestoreParts ? "true" : "false"),
(isRestoreFullData ? "true" : "false"));
- TString restoredString;
+ TRope restoredString;
try {
groupType.RestoreData(TErasureType::CrcModeNone, partSet, restoredString,
isRestoreParts, isRestoreFullData, isRestoreParts);
@@ -404,9 +404,9 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
if (isRestoreFullData) {
UNIT_ASSERT_EQUAL_C(testString.size(), restoredString.size(), errorInfo);
for (ui32 i = 0; i < testString.size(); ++i) {
- UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], ((char*)restoredString.data())[i],
+ UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], (restoredString.UnsafeGetContiguousSpanMut().data())[i],
(errorInfo + mode + " (full data)"));
- if (((char*)testString.data())[i] != ((char*)restoredString.data())[i]) {
+ if (((char*)testString.data())[i] != (restoredString.UnsafeGetContiguousSpanMut().data())[i]) {
VERBOSE_COUT("mismatch " << errorInfo << mode << " (full data)" << Endl);
break;
}
@@ -711,7 +711,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
partSet.Parts[missingPartIdx[idx]].clear();
}
}
- partSet.FullDataFragment.UninitializedOwnedWhole(dataSize);;
+ partSet.FullDataFragment.UninitializedOwnedWhole(dataSize);
TString mode = Sprintf(" restoreParts=%s isRestoreParityParts=%s restoreFullData=%s ",
@@ -720,7 +720,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
(isRestoreFullData ? "true" : "false"));
VERBOSE_COUT("RestoreData " << errorInfo << Endl);
- TString restoredString;
+ TRope restoredString;
try {
groupType.RestoreData(crcMode, partSet, restoredString,
isRestoreParts, isRestoreFullData, isRestoreParityParts);
@@ -733,7 +733,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
if (isRestoreFullData) {
UNIT_ASSERT_EQUAL_C(testString.size(), restoredString.size(), errorInfo);
for (ui32 i = 0; i < testString.size(); ++i) {
- UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], ((char*)restoredString.data())[i],
+ UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], (restoredString.UnsafeGetContiguousSpanMut().data())[i],
(errorInfo + erasureName + mode + " (full data)"));
}
}
@@ -1036,7 +1036,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) {
for (ui32 idx = 0; idx < partSet.Parts.size(); ++idx) {
ui32 cutBegin = Min(partSize, needBegin);
ui32 cutSize = Min(partSize, needEnd) - cutBegin;
- partSet.Parts[idx].ReferenceTo(partSet.Parts[idx].OwnedString.substr(cutBegin, cutSize),
+ partSet.Parts[idx].ReferenceTo(partSet.Parts[idx].OwnedString.ConvertToString().substr(cutBegin, cutSize),
cutBegin, cutSize, partSize);
}
diff --git a/ydb/core/util/defs.h b/ydb/core/util/defs.h
index 04dab350864..b587a43d7db 100644
--- a/ydb/core/util/defs.h
+++ b/ydb/core/util/defs.h
@@ -17,3 +17,7 @@
#include <util/generic/vector.h>
#include <util/datetime/base.h>
#include <util/generic/ylimits.h>
+
+namespace NKikimr {
+using namespace NActors;
+} // namespace NKikimr
diff --git a/ydb/core/util/fragmented_buffer.cpp b/ydb/core/util/fragmented_buffer.cpp
index 6da04399b60..3ae32c380ef 100644
--- a/ydb/core/util/fragmented_buffer.cpp
+++ b/ydb/core/util/fragmented_buffer.cpp
@@ -1,6 +1,7 @@
#include "fragmented_buffer.h"
#include <util/stream/str.h>
+#include <library/cpp/actors/util/shared_data_rope_backend.h>
namespace NKikimr {
@@ -9,7 +10,8 @@ TFragmentedBuffer::TFragmentedBuffer() {
void TFragmentedBuffer::Insert(i32 begin, const char* source, i32 bytesToCopy) {
Y_VERIFY(bytesToCopy);
- BufferForOffset[begin].AssignNoAlias(source, bytesToCopy);
+ //FIXME(innokentii): can save allocation
+ BufferForOffset[begin] = TRope(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Copy(source, bytesToCopy)));
}
@@ -17,12 +19,12 @@ bool TFragmentedBuffer::IsMonolith() const {
return (BufferForOffset.size() == 1 && BufferForOffset.begin()->first == 0);
}
-TString TFragmentedBuffer::GetMonolith() {
+TRope TFragmentedBuffer::GetMonolith() {
Y_VERIFY(IsMonolith());
return BufferForOffset.begin()->second;
}
-void TFragmentedBuffer::SetMonolith(TString &data) {
+void TFragmentedBuffer::SetMonolith(TRope &data) {
Y_VERIFY(data);
BufferForOffset.clear();
BufferForOffset.emplace(0, data);
@@ -64,7 +66,7 @@ void TFragmentedBuffer::Write(i32 begin, const char* buffer, i32 size) {
Y_VERIFY(it->first + i32(it->second.size()) > offset);
i32 bytesToNext = it->first + it->second.size() - offset;
i32 bytesToInsert = Min(bytesToCopy, bytesToNext);
- char *destination = const_cast<char*>(it->second.data()) + offset - it->first;
+ char *destination = it->second.UnsafeGetContiguousSpanMut().data() + offset - it->first;
memcpy(destination, source, bytesToInsert);
source += bytesToInsert;
offset += bytesToInsert;
@@ -97,7 +99,7 @@ void TFragmentedBuffer::Read(i32 begin, char* buffer, i32 size) const {
Y_VERIFY(it->first + i32(it->second.size()) > offset);
i32 bytesToNext = it->first + it->second.size() - offset;
i32 bytesToInsert = Min(bytesToCopy, bytesToNext);
- const char *source = it->second.data() + offset - it->first;
+ const char *source = const_cast<TRope&>(it->second).GetContiguousSpan().data() + offset - it->first;
memcpy(destination, source, bytesToInsert);
destination += bytesToInsert;
offset += bytesToInsert;
@@ -125,7 +127,7 @@ std::pair<const char*, i32> TFragmentedBuffer::Get(i32 begin) const {
--it;
const i32 offset = begin - it->first;
Y_VERIFY(offset >= 0 && (size_t)offset < it->second.size());
- return std::make_pair(it->second.data() + offset, it->second.size() - offset);
+ return std::make_pair(const_cast<TRope&>(it->second).GetContiguousSpan().data() + offset, it->second.size() - offset);
}
void TFragmentedBuffer::CopyFrom(const TFragmentedBuffer& from, const TIntervalSet<i32>& range) {
diff --git a/ydb/core/util/fragmented_buffer.h b/ydb/core/util/fragmented_buffer.h
index 0e7ce9f8425..65088aa6b5e 100644
--- a/ydb/core/util/fragmented_buffer.h
+++ b/ydb/core/util/fragmented_buffer.h
@@ -1,21 +1,22 @@
#pragma once
#include "defs.h"
+#include <library/cpp/actors/util/rope.h>
#include <util/generic/map.h>
#include "interval_set.h"
namespace NKikimr {
class TFragmentedBuffer {
- TMap<i32, TString> BufferForOffset;
+ TMap<i32, TRope> BufferForOffset;
void Insert(i32 begin, const char* source, i32 bytesToCopy);
public:
TFragmentedBuffer();
bool IsMonolith() const;
- TString GetMonolith();
- void SetMonolith(TString &data);
+ TRope GetMonolith();
+ void SetMonolith(TRope &data);
void Write(i32 begin, const char* buffer, i32 size);
void Read(i32 begin, char* buffer, i32 size) const;
diff --git a/ydb/core/util/fragmented_buffer_ut.cpp b/ydb/core/util/fragmented_buffer_ut.cpp
index 7e7dee86df5..7b9dac72b7d 100644
--- a/ydb/core/util/fragmented_buffer_ut.cpp
+++ b/ydb/core/util/fragmented_buffer_ut.cpp
@@ -130,22 +130,22 @@ Y_UNIT_TEST_SUITE(TFragmentedBufferTest) {
fb.Read(0, buffer, 3);
UNIT_ASSERT_VALUES_EQUAL(buffer, data2);
UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), true);
- TString res = fb.GetMonolith();
+ TRope res = fb.GetMonolith();
UNIT_ASSERT_VALUES_EQUAL(res.size(), 3);
- UNIT_ASSERT_VALUES_EQUAL(memcmp(res.data(), data2, 3), 0);
+ UNIT_ASSERT_VALUES_EQUAL(memcmp(res.GetContiguousSpan().data(), data2, 3), 0);
}
Y_UNIT_TEST(TestSetMonolith) {
- TString inData = "123";
+ TRope inData = TString("123");
TFragmentedBuffer fb;
fb.SetMonolith(inData);
UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), true);
- TString res = fb.GetMonolith();
- UNIT_ASSERT_VALUES_EQUAL(inData, res);
+ TRope res = fb.GetMonolith();
+ UNIT_ASSERT_VALUES_EQUAL(inData.ConvertToString(), res.ConvertToString());
}
Y_UNIT_TEST(TestReplaceWithSetMonolith) {
- TString inData = "123";
+ TRope inData = TString("123");
const char *data3v2 = "5";
const char *data4 = "678";
TFragmentedBuffer fb;
@@ -156,8 +156,8 @@ Y_UNIT_TEST_SUITE(TFragmentedBufferTest) {
UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), false);
fb.SetMonolith(inData);
UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), true);
- TString res = fb.GetMonolith();
- UNIT_ASSERT_VALUES_EQUAL(inData, res);
+ TRope res = fb.GetMonolith();
+ UNIT_ASSERT_VALUES_EQUAL(inData.ConvertToString(), res.ConvertToString());
}
Y_UNIT_TEST(CopyFrom) {
diff --git a/ydb/core/util/lz4_data_generator.h b/ydb/core/util/lz4_data_generator.h
index 04ba9508f5a..dd0ecbecae1 100644
--- a/ydb/core/util/lz4_data_generator.h
+++ b/ydb/core/util/lz4_data_generator.h
@@ -5,13 +5,21 @@
namespace NKikimr {
-inline TString GenDataForLZ4(const ui64 size, const ui64 seed = 0) {
- TString data = TString::Uninitialized(size);
+template <class ResultContainer = TString>
+inline ResultContainer GenDataForLZ4(const ui64 size, const ui64 seed = 0) {
+ ResultContainer data = ResultContainer::Uninitialized(size);
const ui32 long_step = Max<ui32>(2027, size / 20);
const ui32 short_step = Min<ui32>(53, long_step / 400);
+ char *buffer = [&]() -> char * {
+ if constexpr(std::is_same<ResultContainer, TString>::value) {
+ return data.Detach();
+ } else {
+ return data.mutable_data();
+ }
+ }();
for (ui32 i = 0; i < data.size(); ++i) {
const ui32 j = i + seed;
- data[i] = 0xff & (j % short_step + j / long_step);
+ buffer[i] = 0xff & (j % short_step + j / long_step);
}
return data;
}