diff options
author | innokentii <innokentii@yandex-team.com> | 2022-09-10 22:49:57 +0300 |
---|---|---|
committer | innokentii <innokentii@yandex-team.com> | 2022-09-10 22:49:57 +0300 |
commit | a8945929eb162f6910b228697c3d2db3b7cb2ca9 (patch) | |
tree | 4f6d161bffc1e27ec7bfe54827b31fa2365b7f96 | |
parent | 5a89de48da3bd0e557702e048c37f690f4263f3b (diff) | |
download | ydb-a8945929eb162f6910b228697c3d2db3b7cb2ca9.tar.gz |
Replace TString with TRope in TEvPut
replace TString with TRope in TEvPut
35 files changed, 251 insertions, 184 deletions
diff --git a/library/cpp/actors/util/rope.h b/library/cpp/actors/util/rope.h index 54b8e344124..fd4a5acdba1 100644 --- a/library/cpp/actors/util/rope.h +++ b/library/cpp/actors/util/rope.h @@ -1060,7 +1060,7 @@ public: Erase(Begin(), End()); } - bool IsContiguous() { + bool IsContiguous() const { if(Begin() == End() || (++Begin() == End())) { return true; } diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 0748eb6f780..d8bc5b3529c 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -16,6 +16,8 @@ #include <library/cpp/actors/wilson/wilson_trace.h> #include <library/cpp/lwtrace/shuttle.h> +#include <library/cpp/actors/util/rope.h> +#include <library/cpp/actors/util/shared_data_rope_backend.h> #include <util/stream/str.h> #include <util/generic/xrange.h> @@ -903,7 +905,7 @@ struct TEvBlobStorage { }; const TLogoBlobID Id; - const TString Buffer; + const TRope Buffer; //FIXME(innokentii) const members prevent usage of move-semantics elsewhere const TInstant Deadline; const NKikimrBlobStorage::EPutHandleClass HandleClass; const ETactic Tactic; @@ -912,35 +914,49 @@ struct TEvBlobStorage { std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs bool Decommission = false; // is it generated by decommission and should bypass all block checks? - TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline, + TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline, NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, ETactic tactic = TacticDefault) : Id(id) - , Buffer(buffer) + , Buffer(std::move(buffer)) , Deadline(deadline) , HandleClass(handleClass) , Tactic(tactic) { Y_VERIFY(Id, "EvPut invalid: LogoBlobId must have non-zero tablet field, id# %s", Id.ToString().c_str()); - Y_VERIFY(buffer.size() < (40 * 1024 * 1024), + Y_VERIFY(Buffer.size() < (40 * 1024 * 1024), "EvPut invalid: LogoBlobId# %s buffer.Size# %zu", - id.ToString().data(), buffer.size()); - Y_VERIFY(buffer.size() == id.BlobSize(), + id.ToString().data(), Buffer.size()); + Y_VERIFY(Buffer.size() == id.BlobSize(), "EvPut invalid: LogoBlobId# %s buffer.Size# %zu", - id.ToString().data(), buffer.size()); + id.ToString().data(), Buffer.size()); + Y_VERIFY(Buffer.IsContiguous(), "EvPut invalid: buffer must be contiguous"); REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&id, sizeof(id)); - REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(buffer.Data(), buffer.size()); + REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(Buffer.GetContiguousSpan.Data(), Buffer.size()); REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&deadline, sizeof(deadline)); REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&handleClass, sizeof(handleClass)); REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&tactic, sizeof(tactic)); } + TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline, + NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, + ETactic tactic = TacticDefault) + : TEvPut(id, TRope(buffer), deadline, handleClass, tactic) + {} + + + TEvPut(const TLogoBlobID &id, const TSharedData &buffer, TInstant deadline, + NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, + ETactic tactic = TacticDefault) + : TEvPut(id, TRope(MakeIntrusive<TRopeSharedDataBackend>(buffer)), deadline, handleClass, tactic) + {} + TString Print(bool isFull) const { TStringStream str; str << "TEvPut {Id# " << Id.ToString(); - str << " Size# " << Buffer.size(); + str << " Size# " << Buffer.GetSize(); if (isFull) { - str << " Buffer# " << Buffer.Quote(); + str << " Buffer# " << Buffer.ConvertToString().Quote(); } str << " Deadline# " << Deadline.MilliSeconds(); str << " HandleClass# " << HandleClass; @@ -954,7 +970,7 @@ struct TEvBlobStorage { } ui32 CalculateSize() const { - return sizeof(*this) + Buffer.capacity(); + return sizeof(*this) + Buffer.GetSize(); } std::unique_ptr<TEvPutResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason, diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 875af36a284..3c7c79cc466 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -77,11 +77,11 @@ namespace NKikimr::NBlobDepot { footer.StoredBlobId = msg.Id; TStringBuf footerData(reinterpret_cast<const char*>(&footer), sizeof(footer)); - auto put = [&](EBlobType type, const TString& buffer) { + auto put = [&](EBlobType type, TRope&& buffer) { const auto& [id, groupId] = kind.MakeBlobId(Agent, BlobSeqId, type, 0, buffer.size()); Y_VERIFY(!locator->HasGroupId() || locator->GetGroupId() == groupId); locator->SetGroupId(groupId); - auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, msg.Deadline, msg.HandleClass, msg.Tactic); + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, std::move(buffer), msg.Deadline, msg.HandleClass, msg.Tactic); ev->ExtraBlockChecks = msg.ExtraBlockChecks; if (!msg.Decommission) { // do not check original blob against blocks when writing decommission copy ev->ExtraBlockChecks.emplace_back(msg.Id.TabletID(), msg.Id.Generation()); @@ -92,15 +92,18 @@ namespace NKikimr::NBlobDepot { if (SuppressFooter) { // write the blob as is, we don't need footer for this kind - put(EBlobType::VG_DATA_BLOB, msg.Buffer); + put(EBlobType::VG_DATA_BLOB, TRope(msg.Buffer)); } else if (msg.Buffer.size() + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize) { // write single blob with footer - put(EBlobType::VG_COMPOSITE_BLOB, msg.Buffer + footerData); + TRope buffer = msg.Buffer; + buffer.Insert(buffer.End(), TRope(TString(footerData))); //FIXME(innokentii) + buffer.Compact(); + put(EBlobType::VG_COMPOSITE_BLOB, std::move(buffer)); } else { // write data blob and blob with footer - put(EBlobType::VG_DATA_BLOB, msg.Buffer); - put(EBlobType::VG_FOOTER_BLOB, TString(footerData)); - } + put(EBlobType::VG_DATA_BLOB, TRope(msg.Buffer)); + put(EBlobType::VG_FOOTER_BLOB, TRope(TString(footerData))); + } } void OnUpdateBlock(bool success) override { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index a602caf8ff6..8b7af89d97d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -15,7 +15,7 @@ void TBlobState::TState::AddResponseData(ui32 fullSize, ui32 shift, TString &dat Here.Add(shift, shift + data.size()); } -void TBlobState::TState::AddPartToPut(TString &data) { +void TBlobState::TState::AddPartToPut(TRope &data) { Y_VERIFY(data.size()); Data.SetMonolith(data); Here.Assign(0, data.size()); @@ -45,7 +45,7 @@ void TBlobState::AddNeeded(ui64 begin, ui64 size) { IsChanged = true; } -void TBlobState::AddPartToPut(ui32 partIdx, TString &partData) { +void TBlobState::AddPartToPut(ui32 partIdx, TRope &partData) { Y_VERIFY(bool(Id)); Y_VERIFY(partIdx < Parts.size()); Parts[partIdx].AddPartToPut(partData); @@ -84,17 +84,17 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) { if (const ui32 partSize = info.Type.PartSize(TLogoBlobID(Id, i + 1))) { if (TIntervalVec<i32>(0, partSize).IsSubsetOf(Parts[i].Here)) { partSet.PartsMask |= (1 << i); - TString tmp = TString::Uninitialized(partSize); - Parts[i].Data.Read(0, const_cast<char*>(tmp.data()), partSize); - partSet.Parts[i].ReferenceTo(tmp); + TRope data(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Uninitialized(partSize))); + Parts[i].Data.Read(0, data.UnsafeGetContiguousSpanMut().data(), partSize); + partSet.Parts[i].ReferenceTo(data); } } } partSet.FullDataSize = Id.BlobSize(); - TString whole; + TRope whole; info.Type.RestoreData((TErasureType::ECrcMode)Id.CrcMode(), partSet, whole, false, true, false); - Whole.Data.Write(0, whole.data(), Id.BlobSize()); + Whole.Data.Write(0, whole.GetContiguousSpan().data(), Id.BlobSize()); Whole.Here.Add(fullBlobInterval); Whole.NotHere.Subtract(fullBlobInterval); return true; @@ -350,7 +350,7 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i DiskRequestsForOrderNumber[diskOrderNumber].GetsToSend.emplace_back(id, shift, size); } -void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer, +void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx) { Y_VERIFY(diskOrderNumber < DiskRequestsForOrderNumber.size()); @@ -381,7 +381,7 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) { } } -void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData) { +void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope &partData) { Y_VERIFY(bool(id)); Y_VERIFY(id.PartId() == 0); Y_VERIFY(id.BlobSize() != 0); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 72bfd73800c..3108cf0b691 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -51,7 +51,7 @@ struct TBlobState { TIntervalSet<i32> Here; // Present in the Data buffer void AddResponseData(ui32 fullSize, ui32 shift, TString &data); - void AddPartToPut(TString &data); + void AddPartToPut(TRope &data); TString ToString() const; }; struct TWholeState : TState { @@ -88,7 +88,7 @@ struct TBlobState { void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info); void AddNeeded(ui64 begin, ui64 size); - void AddPartToPut(ui32 partIdx, TString &partData); + void AddPartToPut(ui32 partIdx, TRope &partData); void MarkBlobReadyToPut(ui8 blobIdx = 0); bool Restore(const TBlobStorageGroupInfo &info); void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring, @@ -128,14 +128,14 @@ struct TDiskPutRequest { ReasonAccelerate }; const TLogoBlobID Id; - TString Buffer; + TRope Buffer; EPutReason Reason; bool IsHandoff; ui8 BlobIdx; std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks; NWilson::TSpan *Span; - TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff, + TDiskPutRequest(const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx) : Id(id) , Buffer(std::move(buffer)) @@ -160,7 +160,7 @@ struct TGroupDiskRequests { TGroupDiskRequests(ui32 disks); void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet); void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size); - void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer, + void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx); }; @@ -205,7 +205,7 @@ struct TBlackboard { {} void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize); - void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData); + void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope &partData); void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0); void MoveBlobStateToDone(const TLogoBlobID &id); void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data, bool keep, bool doNotKeep); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index cb66db83460..8122e89e909 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -364,7 +364,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx, } bytes += put.Buffer.size(); lastItemCount++; - vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); // FIXME: trace + vMultiPut->AddVPut(put.Id, put.Buffer.ConvertToString(), &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); // FIXME: trace } vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests)); ++VMultiPutRequests; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index db13013f02b..01dd0a34fa7 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -387,14 +387,14 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } TEvBlobStorage::TEvPut *put; - TString buffer = item.Buffer; - char *data = buffer.Detach(); + TRope buffer = item.Buffer; + char* data = buffer.GetContiguousSpanMut().data(); Decrypt(data, data, 0, buffer.size(), item.BlobId, *Info); ev->Bunch.emplace_back(new IEventHandle( TActorId() /*recipient*/, item.Recipient, - put = new TEvBlobStorage::TEvPut(item.BlobId, buffer, Deadline, HandleClass, Tactic), + put = new TEvBlobStorage::TEvPut(item.BlobId, std::move(buffer), Deadline, HandleClass, Tactic), 0 /*flags*/, item.Cookie, nullptr /*forwardOnNondelivery*/, @@ -545,7 +545,7 @@ public: const ui64 partSize = Info->Type.PartSize(blobId); ui64 bufferSize = PutImpl.Blobs[idx].BufferSize; - char *data = PutImpl.Blobs[idx].Buffer.Detach(); + char *data = PutImpl.Blobs[idx].Buffer.GetContiguousSpanMut().data(); Encrypt(data, data, 0, bufferSize, blobId, *Info); TDataPartSet &partSet = resume->PartSets[idx]; @@ -630,9 +630,8 @@ public: bool splitDone = true; for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) { TDataPartSet &partSet = resume->PartSets[idx]; - TString &buffer = PutImpl.Blobs[idx].Buffer; TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId; - Info->Type.IncrementalSplitData((TErasureType::ECrcMode)blobId.CrcMode(), buffer, partSet); + Info->Type.IncrementalSplitData((TErasureType::ECrcMode)blobId.CrcMode(), PutImpl.Blobs[idx].Buffer, partSet); if (partSet.IsSplitDone()) { ReportBytes(partSet.MemoryConsumed - PutImpl.Blobs[idx].BufferSize); } else { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 2e3ab60fc30..5636a3daf14 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -46,7 +46,7 @@ private: struct TBlobInfo { TLogoBlobID BlobId; - TString Buffer; + TRope Buffer; ui64 BufferSize; TActorId Recipient; ui64 Cookie; @@ -55,7 +55,7 @@ private: std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; NWilson::TSpan Span; - TBlobInfo(TLogoBlobID id, TString buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId, + TBlobInfo(TLogoBlobID id, TRope&& buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single) : BlobId(id) , Buffer(std::move(buffer)) @@ -115,7 +115,7 @@ public: , EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy) , Tactic(ev->Tactic) { - Blobs.emplace_back(ev->Id, std::move(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit), + Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit), std::move(ev->ExtraBlockChecks), true); auto& blob = Blobs.back(); @@ -143,7 +143,7 @@ public: auto& msg = *ev->Get(); Y_VERIFY(msg.HandleClass == putHandleClass); Y_VERIFY(msg.Tactic == tactic); - Blobs.emplace_back(msg.Id, std::move(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId), + Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId), std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false); Deadline = Max(Deadline, msg.Deadline); @@ -496,7 +496,7 @@ protected: } else if constexpr (isVMultiPut) { // this request MUST originate from the TEvPut, so the Span field must be filled in Y_VERIFY(put.Span); - outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, put.Span->GetTraceId()); + outVPutEvents.back()->AddVPut(put.Id, put.Buffer.ConvertToString(), &cookie, put.ExtraBlockChecks, put.Span->GetTraceId()); } if (put.IsHandoff) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index efeedb94465..050b70846ef 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -291,14 +291,15 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState Y_VERIFY(fullInterval.IsSubsetOf(state.Whole.Here), "Can't put unrestored blob! Unexpected blob state# %s", state.ToString().c_str()); - TString wholeBuffer = TString::Uninitialized(state.Id.BlobSize()); - state.Whole.Data.Read(0, const_cast<char*>(wholeBuffer.data()), state.Id.BlobSize()); + TRope wholeBuffer(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Uninitialized(state.Id.BlobSize()))); + state.Whole.Data.Read(0, wholeBuffer.UnsafeGetContiguousSpanMut().data(), state.Id.BlobSize()); TDataPartSet partSet; info.Type.SplitData((TErasureType::ECrcMode)state.Id.CrcMode(), wholeBuffer, partSet); Y_VERIFY(partSet.Parts.size() == state.Parts.size()); for (ui32 partIdx = 0; partIdx < partSet.Parts.size(); ++partIdx) { - state.AddPartToPut(partIdx, partSet.Parts[partIdx].OwnedString); + TRope data = partSet.Parts[partIdx].OwnedString; + state.AddPartToPut(partIdx, data); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h index d191f110060..9297db7a807 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h @@ -223,7 +223,7 @@ public: } } partSet.FullDataSize = state.Id.BlobSize(); - TString whole; + TRope whole; info.Type.RestoreData((TErasureType::ECrcMode)state.Id.CrcMode(), partSet, whole, false, true, false); TIntervalSet<i32> missing(state.Whole.NotHere); @@ -231,7 +231,7 @@ public: auto [begin, end] = *it; i32 offset = begin; i32 size = end - begin; - const char *source = whole.data() + offset; + const char *source = whole.GetContiguousSpan().data() + offset; // Cerr << "LINE " << __LINE__ << " copy whole [" << offset << ", " << (offset + size) << ")" << Endl; state.Whole.Data.Write(offset, source, size); state.Whole.Here.Add(offset, offset + size); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h index 4442c48218b..0fca82976e0 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h @@ -39,7 +39,7 @@ public: } protected: - TString GetDataBuffer(TBlobState& state, const TBlobStorageGroupInfo& info) { + TRope GetDataBuffer(TBlobState& state, const TBlobStorageGroupInfo& info) { for (ui32 i = 0; i < info.Type.TotalPartCount(); ++i) { if (info.Type.PartSize(TLogoBlobID(state.Id, i + 1)) && state.Parts[i].Data.IsMonolith()) { return state.Parts[i].Data.GetMonolith(); @@ -47,11 +47,11 @@ protected: } const TIntervalVec<i32> interval(0, state.Id.BlobSize()); Y_VERIFY(interval.IsSubsetOf(state.Whole.Here), "missing blob data State# %s", state.ToString().data()); - TString wholeBuffer = TString::Uninitialized(state.Id.BlobSize()); - state.Whole.Data.Read(0, const_cast<char*>(wholeBuffer.data()), state.Id.BlobSize()); + TRope wholeBuffer(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Uninitialized(state.Id.BlobSize()))); + state.Whole.Data.Read(0, wholeBuffer.UnsafeGetContiguousSpanMut().data(), state.Id.BlobSize()); TDataPartSet partSet; info.Type.SplitData((TErasureType::ECrcMode)state.Id.CrcMode(), wholeBuffer, partSet); - TString s = partSet.Parts[0].OwnedString; + TRope s = partSet.Parts[0].OwnedString; state.Parts[0].Data.SetMonolith(s); return s; } diff --git a/ydb/core/blobstorage/dsproxy/mock/model.h b/ydb/core/blobstorage/dsproxy/mock/model.h index 16783993f28..ead4f90bb6b 100644 --- a/ydb/core/blobstorage/dsproxy/mock/model.h +++ b/ydb/core/blobstorage/dsproxy/mock/model.h @@ -13,11 +13,15 @@ namespace NFake { using TStep = ui32; struct TBlob { + TBlob(TRope buffer) + : Buffer(std::move(buffer)) + {} + TBlob(TString buffer) : Buffer(std::move(buffer)) {} - TString Buffer; + TRope Buffer; bool Keep = false; bool DoNotKeep = false; }; @@ -125,7 +129,7 @@ namespace NFake { const ui32 size = Min<ui32>(maxSize, !query.Size ? Max<ui32>() : query.Size); // calculate substring; use 0 instead of query.Shift because it may exceed the buffer - response.Buffer = data.Buffer.substr(size ? query.Shift : 0, size); + response.Buffer = data.Buffer.ConvertToString().substr(size ? query.Shift : 0, size); } else { // ensure this blob is not under GC Y_VERIFY(!IsCollectedByBarrier(id), "Id# %s", id.ToString().data()); @@ -173,7 +177,7 @@ namespace NFake { lastBlobId.Generation() >= msg->MinGeneration) { TString buffer; if (msg->ReadBody) { - buffer = it->second.Buffer; + buffer = it->second.Buffer.ConvertToString(); } result = std::make_unique<TEvBlobStorage::TEvDiscoverResult>(lastBlobId, msg->MinGeneration, buffer, @@ -206,7 +210,7 @@ namespace NFake { if (from <= to) { // forward scan for (auto it = Blobs.lower_bound(from); it != Blobs.end() && it->first <= to; ++it) { - process(it->first, it->second.Buffer); + process(it->first, it->second.Buffer.ConvertToString()); } } else { // reverse scan @@ -215,7 +219,7 @@ namespace NFake { if (it->first < to) { break; } else { - process(it->first, it->second.Buffer); + process(it->first, it->second.Buffer.ConvertToString()); } } } diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h index 4c7bd5f1eef..fd7628ef398 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_fault_tolerance_ut_get_hardened.h @@ -305,7 +305,7 @@ public: layout.ForEachPartOfDisk(type, [&](ui32 partIdx, ui32 idxInSubgroup) { const TLogoBlobID partId(id, partIdx + 1); const NKikimrProto::EReplyStatus res = PutToVDisk(orderNums[idxInSubgroup], partId, - parts.Parts[partIdx].OwnedString); + parts.Parts[partIdx].OwnedString.ConvertToString()); UNIT_ASSERT_VALUES_EQUAL(res, NKikimrProto::OK); disksWrittenTo |= {topology, topology->GetVDiskId(idxInSubgroup)}; }); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp index 0e96d053298..deef7756d65 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp @@ -336,7 +336,7 @@ void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa UNIT_ASSERT_VALUES_EQUAL(put->Id, args.PatchedId); UNIT_ASSERT_VALUES_EQUAL(handle->Cookie, args.OriginalId.Hash()); TString patchedBuffer = MakePatchedBuffer(args); - UNIT_ASSERT_VALUES_EQUAL(put->Buffer, patchedBuffer); + UNIT_ASSERT_VALUES_EQUAL(put->Buffer.ConvertToString(), patchedBuffer); std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult = std::make_unique<TEvBlobStorage::TEvPutResult>( resultStatus, args.PatchedId, args.StatusFlags, args.CurrentGroupId, args.ApproximateFreeSpaceShare); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h index 0a17c046e71..dc095900af2 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h @@ -486,12 +486,12 @@ public: Info->Type.SplitData((TErasureType::ECrcMode)id.CrcMode(), encryptedData, partSet); for (ui32 i = 0; i < totalParts; ++i) { TLogoBlobID pId(id, i + 1); - TString pData = partSet.Parts[i].OwnedString; + TRope pData = partSet.Parts[i].OwnedString; if (i < handoffsToUse) { Y_VERIFY(totalParts + i < totalvd); - GetVDisk(vDisksId[totalParts + i].FailDomain, vDisksId[totalParts + i].VDisk).Put(pId, pData); + GetVDisk(vDisksId[totalParts + i].FailDomain, vDisksId[totalParts + i].VDisk).Put(pId, pData.ConvertToString()); } else { - GetVDisk(vDisksId[i].FailDomain, vDisksId[i].VDisk).Put(pId, pData); + GetVDisk(vDisksId[i].FailDomain, vDisksId[i].VDisk).Put(pId, pData.ConvertToString()); } } } diff --git a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp index 43183478c37..07f5948f3ae 100644 --- a/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp @@ -1420,7 +1420,7 @@ class TTestBlobStorageProxyVPutVGet : public TTestBlobStorageProxy { TBlobStorageGroupType type(ErasureSpecies); NKikimr::TDataPartSet partSet; type.SplitData((TErasureType::ECrcMode)blobId.CrcMode(), encryptedTestData2, partSet); - TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString); + TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString.ConvertToString()); VERBOSE_COUT("Done"); Env->DoneEvent.Signal(); @@ -1713,7 +1713,7 @@ class TTestBlobStorageProxyVGet : public TTestBlobStorageProxy { TBlobStorageGroupType type(ErasureSpecies); NKikimr::TDataPartSet partSet; type.SplitData((TErasureType::ECrcMode)blobId.CrcMode(), encryptedTestData2, partSet); - TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString); + TEST_RESPONSE(MessageVGetResult, OK, 1, partSet.Parts[0].OwnedString.ConvertToString()); isNoData = false; VERBOSE_COUT("Done"); diff --git a/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp b/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp index 6cf73e7276b..24e0ccca196 100644 --- a/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp +++ b/ydb/core/blobstorage/testload/test_load_vdisk_write.cpp @@ -7,6 +7,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/interconnect_channels.h> #include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/actors/util/rope.h> namespace NKikimr { @@ -184,7 +185,9 @@ namespace NKikimr { } void IssuePutRequest(const TLogoBlobID& logoBlobId, ui64 cookie, const TActorContext& ctx) { - TString whole(logoBlobId.BlobSize(), 'X'); + TSharedData data = TSharedData::Uninitialized(logoBlobId.BlobSize()); + std::fill_n(data.mutable_data(), logoBlobId.BlobSize(), 'X'); + TRope whole(MakeIntrusive<TRopeSharedDataBackend>(data)); TDataPartSet parts; GType.SplitData((TErasureType::ECrcMode)logoBlobId.CrcMode(), whole, parts); auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(logoBlobId, diff --git a/ydb/core/blobstorage/testload/test_load_write.cpp b/ydb/core/blobstorage/testload/test_load_write.cpp index 01e6839b10e..37de3a35a00 100644 --- a/ydb/core/blobstorage/testload/test_load_write.cpp +++ b/ydb/core/blobstorage/testload/test_load_write.cpp @@ -291,7 +291,7 @@ class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActo const ui32 size = 1; const ui32 lastStep = Max<ui32>(); const TLogoBlobID id(TabletId, Generation, lastStep, Channel, size, 0); - const TString buffer = GenerateBuffer(id); + const TSharedData buffer = GenerateBuffer<TSharedData>(id); auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass); auto callback = [this] (IEventBase *event, const TActorContext& ctx) { @@ -531,7 +531,7 @@ class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActo putHandleClass = PutHandleClass; } const TLogoBlobID id(TabletId, Generation, WriteStep, Channel, size, Cookie); - const TString buffer = GenerateBuffer(id); + const TSharedData buffer = GenerateBuffer<TSharedData>(id); auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), putHandleClass); const ui64 writeQueryId = ++WriteQueryId; @@ -741,8 +741,9 @@ class TLogWriterTestLoadActor : public TActorBootstrapped<TLogWriterTestLoadActo NextReadInQueue = false; } - static TString GenerateBuffer(const TLogoBlobID& id) { - return GenDataForLZ4(id.BlobSize()); + template <class ResultContainer = TString> + static ResultContainer GenerateBuffer(const TLogoBlobID& id) { + return GenDataForLZ4<ResultContainer>(id.BlobSize()); } }; diff --git a/ydb/core/blobstorage/ut_blobstorage/main.cpp b/ydb/core/blobstorage/ut_blobstorage/main.cpp index 1aa4999c8c8..73b302815fd 100644 --- a/ydb/core/blobstorage/ut_blobstorage/main.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/main.cpp @@ -66,7 +66,7 @@ Y_UNIT_TEST_SUITE(DonorMode) { break; } } - TString part = partSet.Parts[blobId.PartId() - 1].OwnedString; + TRope part = partSet.Parts[blobId.PartId() - 1].OwnedString; // scan through existing stored blobs and ensure they are intact for (const auto& [blobId, part] : stored) { @@ -75,19 +75,19 @@ Y_UNIT_TEST_SUITE(DonorMode) { } // add it to stored set - stored.emplace(blobId, part); + stored.emplace(blobId, part.ConvertToString()); // get the blob location for this group const auto& [vdiskActorId, vdiskId] = getBlobLocation(info, blobId); // first, check that there is no such blob in the disk - env.CheckBlob(vdiskActorId, vdiskId, blobId, part, NKikimrProto::NODATA); + env.CheckBlob(vdiskActorId, vdiskId, blobId, part.ConvertToString(), NKikimrProto::NODATA); // put the blob to the disk - env.PutBlob(vdiskId, blobId, part); + env.PutBlob(vdiskId, blobId, part.ConvertToString()); // check it appeared - env.CheckBlob(vdiskActorId, vdiskId, blobId, part); + env.CheckBlob(vdiskActorId, vdiskId, blobId, part.ConvertToString()); // wait for sync env.WaitForSync(info, blobId); diff --git a/ydb/core/blobstorage/ut_blobstorage/osiris.cpp b/ydb/core/blobstorage/ut_blobstorage/osiris.cpp index 989452e3087..432d078bf58 100644 --- a/ydb/core/blobstorage/ut_blobstorage/osiris.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/osiris.cpp @@ -43,7 +43,7 @@ bool DoTestCase(TBlobStorageGroupType::EErasureSpecies erasure, const std::set<s const TActorId& queueId = env.CreateQueueActor(info->GetVDiskId(orderNum), NKikimrBlobStorage::PutTabletLog, 0); const TActorId& sender = env.Runtime->AllocateEdgeActor(queueId.NodeId()); const TLogoBlobID blobId(id, partIdx + 1); - const TString& buffer = type.PartSize(blobId) ? parts.Parts[partIdx].OwnedString : TString(); + TRope buffer = type.PartSize(blobId) ? parts.Parts[partIdx].OwnedString : TString(); env.Runtime->Send(new IEventHandle(queueId, sender, new TEvBlobStorage::TEvVPut(blobId, buffer, info->GetVDiskId(orderNum), false, nullptr, TInstant::Max(), NKikimrBlobStorage::TabletLog)), sender.NodeId()); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h index 9568165e383..3c13238ad54 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h @@ -206,7 +206,7 @@ namespace NKikimr { Y_VERIFY(partSet.PartSet.FullDataSize == id.BlobSize()); // PartSet contains some data, other data will be restored and written in the same PartSet - TString recoveredData; + TRope recoveredData; const ui32 incomingMask = partSet.PartSet.PartsMask; if (canRestore && needToRestore) { groupType.RestoreData((TErasureType::ECrcMode)id.CrcMode(), partSet.PartSet, recoveredData, diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp index 02e8a077893..83d376047b0 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp @@ -40,7 +40,7 @@ namespace NKikimr { for (ui32 i = 0; i < info->Type.TotalPartCount(); ++i) { for (ui32 k = 0; k < vdisks.size(); ++k) { if (varray[i] == vdisks[k]) { - diskvec[k] = parts.Parts[i].OwnedString; + diskvec[k] = parts.Parts[i].OwnedString.ConvertToString(); break; } } diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h b/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h index 2a06f7df1ae..57e892ada4d 100644 --- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h +++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery.h @@ -48,7 +48,7 @@ namespace NKikimr { Y_VERIFY(id.PartId()); const ui32 partIdx = id.PartId() - 1; Y_VERIFY(PartSet.PartsMask & (1 << partIdx)); - return PartSet.Parts[partIdx].OwnedString; + return PartSet.Parts[partIdx].OwnedString.ConvertToString(); } NMatrix::TVectorType GetAvailableParts() const { diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp index 0348ada82de..d78757e78ac 100644 --- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp @@ -123,7 +123,7 @@ namespace NKikimr { } const ui32 numParts = PopCount(item.PartSet.PartsMask); if (numParts >= Info->Type.MinimalRestorablePartCount()) { - TString buffer; + TRope buffer; Info->Type.RestoreData((TErasureType::ECrcMode)item.BlobId.CrcMode(), item.PartSet, buffer, true, false, true); item.PartSet.PartsMask = (1u << item.PartSet.Parts.size()) - 1; diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h index 2b04da97606..489a8e97a78 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_huge_blob_merger.h @@ -65,7 +65,7 @@ namespace NKikimr { key.LogoBlobID()); TRope holder; TRope part = blob.GetPart(i, &holder); - PartSet.Parts[i].ReferenceTo(part.ConvertToString()); + PartSet.Parts[i].ReferenceTo(part); } else { CorruptedParts.push_back(*part); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp index cb9f9708a11..7e8e7244756 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp @@ -229,6 +229,12 @@ namespace NKikimr { { } + TBlob(const TLogoBlobID &blob, ui8 partId, const TRope &buffer) + : BlobId(blob, partId) + , Buffer(buffer.ConvertToString()) + { + } + TBlob(const TLogoBlobID &blob, ui8 partId, ui32 bufferSize) : BlobId(blob, partId) { @@ -759,7 +765,7 @@ namespace NKikimr { ui32 patchedPartId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId()).PartId(); TVector<TDiff> xorDiffs; - const ui8 *buffer = reinterpret_cast<const ui8*>(partSet.Parts[fromPartId - 1].OwnedString.data()); + const ui8 *buffer = reinterpret_cast<const ui8*>(partSet.Parts[fromPartId - 1].OwnedString.GetContiguousSpan().data()); testData.GType.MakeXorDiff(TErasureType::CrcModeNone, data.size(), buffer, diffSet.PartDiffs[fromPartId - 1].Diffs, &xorDiffs); UNIT_ASSERT_VALUES_EQUAL_C(xorDiffs.size(), record.DiffsSize(), "from# " << (ui32)fromPartId); diff --git a/ydb/core/erasure/erasure.cpp b/ydb/core/erasure/erasure.cpp index a41b0279328..fc1d8e8a206 100644 --- a/ydb/core/erasure/erasure.cpp +++ b/ydb/core/erasure/erasure.cpp @@ -44,7 +44,7 @@ static TString DebugFormatBits(ui64 value) { namespace NKikimr { -static void Refurbish(TString &str, ui64 size) { +static void Refurbish(TRope &str, ui64 size) { if (str.size() != size) { str = TString::Uninitialized(size); } @@ -1378,12 +1378,12 @@ void PadAndCrcParts(TErasureType::ECrcMode crcMode, const TBlockParams &p, TData } template <bool isStripe> -void StarBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString &buffer, +void StarBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope &buffer, TDataPartSet &outPartSet) { TBlockParams p(crcMode, type, buffer.size()); // Prepare input data pointers - p.PrepareInputDataPointers<isStripe>(const_cast<char*>(buffer.data())); + p.PrepareInputDataPointers<isStripe>(buffer.UnsafeGetContiguousSpanMut().data()); outPartSet.FullDataSize = buffer.size(); outPartSet.PartsMask = ~((~(ui32)0) << p.TotalParts); @@ -1399,12 +1399,12 @@ void StarBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, co } template <bool isStripe> -void EoBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString &buffer, +void EoBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope &buffer, TDataPartSet &outPartSet) { TBlockParams p(crcMode, type, buffer.size()); // Prepare input data pointers - p.PrepareInputDataPointers<isStripe>(const_cast<char*>(buffer.data())); + p.PrepareInputDataPointers<isStripe>(buffer.UnsafeGetContiguousSpanMut().data()); // Prepare if not yet if (!outPartSet.IsSplitStarted()) { @@ -1429,12 +1429,12 @@ void EoBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, cons } template <bool isStripe> -void XorBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString& buffer, +void XorBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope& buffer, TDataPartSet& outPartSet) { TBlockParams p(crcMode, type, buffer.size()); // Prepare input data pointers - p.PrepareInputDataPointers<isStripe>(const_cast<char*>(buffer.data())); + p.PrepareInputDataPointers<isStripe>(buffer.UnsafeGetContiguousSpanMut().data()); outPartSet.FullDataSize = buffer.size(); outPartSet.PartsMask = ~((~(ui32)0) << p.TotalParts); @@ -1451,7 +1451,7 @@ void XorBlockSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, con template <bool isStripe, bool restoreParts, bool restoreFullData, bool restoreParityParts> void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDataPartSet& partSet) { - TString &outBuffer = partSet.FullDataFragment.OwnedString; + TRope &outBuffer = partSet.FullDataFragment.OwnedString; ui32 totalParts = type.TotalPartCount(); Y_VERIFY(partSet.Parts.size() >= totalParts); @@ -1515,17 +1515,17 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD (!restoreParts && missingDataPartIdxA >= p.TotalParts - 2)) { VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl); if (isStripe) { - p.PrepareInputDataPointers<isStripe>(outBuffer.Detach()); + p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data()); p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts); } else { - p.GlueBlockParts(outBuffer.Detach(), partSet); + p.GlueBlockParts(outBuffer.GetContiguousSpanMut().data(), partSet); } return; } // Prepare output data pointers if (restoreFullData) { - p.PrepareInputDataPointers<isStripe>(outBuffer.Detach()); + p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data()); } // Consider failed disk cases @@ -1541,15 +1541,15 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD // TODO: use 2-nd part of 'eo-split' to restore m+1 part // TODO: 1-pass if (missingDataPartIdxA <= p.DataParts && missingDataPartIdxB >= p.TotalParts - 1) { - TString temp; - TString &buffer = restoreFullData ? outBuffer : temp; + TRope temp; + TRope &buffer = restoreFullData ? outBuffer : temp; if (!restoreFullData && restoreParts && missingDataPartIdxB == p.TotalParts - 1) { // The (f1) case, but no full data needed, only parts TRACE("case# f1" << Endl); VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl); if (isStripe) { Refurbish(buffer, dataSize); - p.PrepareInputDataPointers<isStripe>(buffer.Detach()); + p.PrepareInputDataPointers<isStripe>(buffer.GetContiguousSpanMut().data()); } p.XorRestorePart<isStripe, true, false, false>(partSet, missingDataPartIdxA); TRACE("case# f1 split" << Endl); @@ -1583,8 +1583,8 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD if (missingDataPartIdxA == p.TotalParts - 1 && missingDataPartIdxB == p.TotalParts) { TRACE("case# c" << Endl); VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl); - TString temp; - TString &buffer = restoreFullData ? outBuffer : temp; + TRope temp; + TRope &buffer = restoreFullData ? outBuffer : temp; if (!restoreFullData) { TRACE(__LINE__ << Endl); if (!restoreParityParts) { @@ -1598,12 +1598,12 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD } if (isStripe) { TRACE(__LINE__ << Endl); - p.PrepareInputDataPointers<isStripe>(buffer.Detach()); + p.PrepareInputDataPointers<isStripe>(buffer.GetContiguousSpanMut().data()); p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts); } else { TRACE(__LINE__ << Endl); if (restoreFullData) { - p.GlueBlockParts(buffer.Detach(), partSet); + p.GlueBlockParts(buffer.GetContiguousSpanMut().data(), partSet); } } if (restoreParts) { @@ -1650,7 +1650,7 @@ void EoBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TD // restorePartiyParts may be set only togehter with restore parts template <bool isStripe, bool restoreParts, bool restoreFullData, bool restoreParityParts> void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDataPartSet& partSet) { - TString &outBuffer = partSet.FullDataFragment.OwnedString; + TRope &outBuffer = partSet.FullDataFragment.OwnedString; ui32 totalParts = type.TotalPartCount(); Y_VERIFY(partSet.Parts.size() == totalParts); @@ -1728,7 +1728,7 @@ void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TBlockParams p(crcMode, type, dataSize); if (restoreFullData) { Refurbish(outBuffer, dataSize); - p.PrepareInputDataPointers<isStripe>(outBuffer.Detach()); + p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data()); } else if (missingDataPartCount == 0) { return; } @@ -1738,10 +1738,10 @@ void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, (!restoreParts && missingDataPartIdxA >= p.DataParts)) { VERBOSE_COUT(__LINE__ << " of " << __FILE__ << Endl); if (isStripe) { - p.PrepareInputDataPointers<isStripe>(outBuffer.Detach()); + p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data()); p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts); } else { - p.GlueBlockParts(outBuffer.Detach(), partSet); + p.GlueBlockParts(outBuffer.GetContiguousSpanMut().data(), partSet); } return; } @@ -1880,7 +1880,7 @@ void StarBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, template <bool isStripe, bool restoreParts, bool restoreFullData, bool restoreParityParts> void XorBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDataPartSet &partSet) { - TString &outBuffer = partSet.FullDataFragment.OwnedString; + TRope &outBuffer = partSet.FullDataFragment.OwnedString; ui32 totalParts = type.TotalPartCount(); Y_VERIFY(partSet.Parts.size() == totalParts, "partSet.Parts.size(): %" PRIu64 " totalParts: %" PRIu32 " erasure: %s", @@ -1919,16 +1919,16 @@ void XorBlockRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, T if (missingDataPartCount == 0 || (missingDataPartCount == 1 && !restoreParts && missingDataPartIdx == p.TotalParts - 1)) { if (isStripe) { - p.PrepareInputDataPointers<isStripe>(outBuffer.Detach()); + p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data()); p.XorRestorePart<isStripe, false, true, false>(partSet, p.DataParts); } else { - p.GlueBlockParts(outBuffer.Detach(), partSet); + p.GlueBlockParts(outBuffer.GetContiguousSpanMut().data(), partSet); } return; } // Prepare output data pointers if (restoreFullData) { - p.PrepareInputDataPointers<isStripe>(outBuffer.Detach()); + p.PrepareInputDataPointers<isStripe>(outBuffer.GetContiguousSpanMut().data()); } p.XorRestorePart<isStripe, restoreParts, restoreFullData, restoreParityParts>(partSet, missingDataPartIdx); @@ -2502,7 +2502,7 @@ ui64 TErasureType::BlockSplitPartUsedSize(ui64 dataSize, ui32 partIdx) const { return lastPartSize; } -void MirrorSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const TString& buffer, +void MirrorSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, TRope& buffer, TDataPartSet& outPartSet) { outPartSet.FullDataSize = buffer.size(); outPartSet.Parts.resize(type.TotalPartCount()); @@ -2519,12 +2519,12 @@ void MirrorSplit(TErasureType::ECrcMode crcMode, const TErasureType &type, const case TErasureType::CrcModeWholePart: { ui64 partSize = type.PartSize(crcMode, buffer.size()); - TString part = TString::Uninitialized(partSize); - char *dst = part.Detach(); + TRope part = TString::Uninitialized(partSize); + char *dst = part.GetContiguousSpanMut().data(); if (buffer.size() || part.size()) { Y_VERIFY(part.size() >= buffer.size() + sizeof(ui32), "Part size too small, buffer size# %" PRIu64 " partSize# %" PRIu64, (ui64)buffer.size(), (ui64)partSize); - memcpy(dst, buffer.data(), buffer.size()); + memcpy(dst, buffer.GetContiguousSpan().data(), buffer.size()); PadAndCrcAtTheEnd(dst, buffer.size(), part.size()); } for (ui32 partIdx = 0; partIdx <= parityParts; ++partIdx) { @@ -2558,11 +2558,15 @@ void MirrorRestore(TErasureType::ECrcMode crcMode, const TErasureType &type, TDa partSet.FullDataFragment.ReferenceTo(partSet.Parts[partIdx].OwnedString); return; case TErasureType::CrcModeWholePart: - TString outBuffer = partSet.Parts[partIdx].OwnedString; - outBuffer.Detach(); - Y_VERIFY(outBuffer.size() >= partSet.FullDataSize, "Unexpected outBuffer.size# %" PRIu64 - " fullDataSize# %" PRIu64, (ui64)outBuffer.size(), (ui64)partSet.FullDataSize); - outBuffer.resize(partSet.FullDataSize); // To pad with zeroes! + TRope outBuffer = partSet.Parts[partIdx].OwnedString; + outBuffer.GetContiguousSpanMut(); // Detach + if(outBuffer.size() != partSet.FullDataSize) { + TString newOutBuffer(outBuffer.GetContiguousSpan().data(), outBuffer.size()); //FIXME(innokentii) potentially redundant allocation for resize + Y_VERIFY(outBuffer.size() >= partSet.FullDataSize, "Unexpected outBuffer.size# %" PRIu64 + " fullDataSize# %" PRIu64, (ui64)outBuffer.size(), (ui64)partSet.FullDataSize); + newOutBuffer.resize(partSet.FullDataSize); // To pad with zeroes! + outBuffer = newOutBuffer; + } partSet.FullDataFragment.ReferenceTo(outBuffer); return; } @@ -2586,14 +2590,14 @@ static void VerifyPartSizes(TDataPartSet& partSet, size_t definedPartEndIdx) { } } -void TErasureType::SplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const { +void TErasureType::SplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const { outPartSet.ResetSplit(); do { IncrementalSplitData(crcMode, buffer, outPartSet); } while (!outPartSet.IsSplitDone()); } -void TErasureType::IncrementalSplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const { +void TErasureType::IncrementalSplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const { const TErasureParameters& erasure = ErasureSpeciesParameters[ErasureSpecies]; switch (erasure.ErasureFamily) { case TErasureType::ErasureMirror: @@ -2968,7 +2972,7 @@ void TErasureType::ApplyXorDiff(ECrcMode crcMode, ui32 dataSize, ui8 *dst, } } -void TErasureType::RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TString& outBuffer, bool restoreParts, +void TErasureType::RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TRope& outBuffer, bool restoreParts, bool restoreFullData, bool restoreParityParts) const { partSet.FullDataFragment.ReferenceTo(outBuffer); RestoreData(crcMode, partSet, restoreParts, restoreFullData, restoreParityParts); diff --git a/ydb/core/erasure/erasure.h b/ydb/core/erasure/erasure.h index 35bd68c2d2f..d7d4da114b8 100644 --- a/ydb/core/erasure/erasure.h +++ b/ydb/core/erasure/erasure.h @@ -12,6 +12,7 @@ #include <util/generic/list.h> #include <library/cpp/containers/stack_vector/stack_vec.h> +#include <library/cpp/actors/util/rope.h> namespace NKikimr { @@ -57,7 +58,7 @@ struct TPartDiffSet { // Part fragment, contains only some data struct TPartFragment { - TString OwnedString; // Used for ownership only + TRope OwnedString; // Used for ownership only char *Bytes = nullptr; ui64 Offset = 0; // Relative to part beginning ui64 Size = 0; @@ -76,32 +77,34 @@ struct TPartFragment { } void UninitializedOwnedWhole(ui64 size) { - OwnedString = TString::Uninitialized(size); - Bytes = OwnedString.Detach(); + OwnedString = TString::Uninitialized(size); //FIXME(innokentii) + Bytes = OwnedString.GetContiguousSpanMut().data(); Offset = 0; Size = size; PartSize = size; } - void ResetToWhole(const TString &whole) { + void ResetToWhole(const TRope &whole) { OwnedString = whole; - Bytes = OwnedString.Detach(); + Bytes = OwnedString.GetContiguousSpanMut().data(); Offset = 0; - Size = whole.size(); + Size = OwnedString.size(); PartSize = Size; } - void ReferenceTo(const TString &whole) { + void ReferenceTo(const TRope &whole) { + Y_VERIFY(whole.IsContiguous()); OwnedString = whole; - Bytes = const_cast<char*>(whole.data()); + Bytes = OwnedString.UnsafeGetContiguousSpanMut().data(); Offset = 0; - Size = whole.size(); + Size = OwnedString.size(); PartSize = Size; } - void ReferenceTo(const TString &piece, ui64 offset, ui64 size, ui64 partSize) { + void ReferenceTo(const TRope &piece, ui64 offset, ui64 size, ui64 partSize) { + Y_VERIFY(piece.IsContiguous()); OwnedString = piece; - Bytes = const_cast<char*>(piece.data()); + Bytes = OwnedString.UnsafeGetContiguousSpanMut().data(); Offset = offset; Y_VERIFY(size <= piece.size()); Size = size; @@ -109,6 +112,17 @@ struct TPartFragment { PartSize = partSize; } + void ReferenceTo(const TString &whole) { + TRope rope(whole); + ReferenceTo(rope); + } + + void ReferenceTo(const TString &piece, ui64 offset, ui64 size, ui64 partSize) { + TRope rope(piece); + ReferenceTo(rope, offset, size, partSize); + } + + char *GetDataAt(ui64 get_offset) const { Y_VERIFY_DEBUG(Size); Y_VERIFY_DEBUG(get_offset >= Offset, "%s", (TStringBuilder() << "get_offset# " << get_offset @@ -130,19 +144,12 @@ struct TPartFragment { } void Detach() { - char *newBytes = nullptr; - if (Bytes) { + if(Bytes) { char *oldBytes = Bytes; - char *oldData = const_cast<char*>(OwnedString.data()); + char *oldData = OwnedString.UnsafeGetContiguousSpanMut().data(); intptr_t bytesOffset = oldBytes - oldData; - OwnedString.Detach(); - if (OwnedString.data() != oldData) { - newBytes = const_cast<char*>(OwnedString.data()) + bytesOffset; - } else { - newBytes = oldBytes; - } + Bytes = OwnedString.GetContiguousSpanMut().data() + bytesOffset; } - Bytes = newBytes; } }; @@ -317,8 +324,16 @@ struct TErasureType { ui64 SuggestDataSize(ECrcMode crcMode, ui64 partSize, bool roundDown) const; ui32 Prime() const; - void SplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const; - void IncrementalSplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const; + void SplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const; + void SplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const { + TRope rope = buffer; + SplitData(crcMode, rope, outPartSet); + } + void IncrementalSplitData(ECrcMode crcMode, TRope& buffer, TDataPartSet& outPartSet) const; + void IncrementalSplitData(ECrcMode crcMode, const TString& buffer, TDataPartSet& outPartSet) const { + TRope rope = buffer; + IncrementalSplitData(crcMode, rope, outPartSet); + } void SplitDiffs(ECrcMode crcMode, ui32 dataSize, const TVector<TDiff> &diffs, TPartDiffSet& outDiffSet) const; void ApplyDiff(ECrcMode crcMode, ui8 *dst, const TVector<TDiff> &diffs) const; @@ -327,7 +342,7 @@ struct TErasureType { void ApplyXorDiff(ECrcMode crcMode, ui32 dataSize, ui8 *dst, const TVector<TDiff> &diffs, ui8 fromPart, ui8 toPart) const; - void RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TString& outBuffer, bool restoreParts, + void RestoreData(ECrcMode crcMode, TDataPartSet& partSet, TRope& outBuffer, bool restoreParts, bool restoreFullData, bool restoreParityParts) const; void RestoreData(ECrcMode crcMode, TDataPartSet& partSet, bool restoreParts, bool restoreFullData, bool restoreParityParts) const; diff --git a/ydb/core/erasure/erasure_perf_test.cpp b/ydb/core/erasure/erasure_perf_test.cpp index 37b1a3bab64..ae4c79c946f 100644 --- a/ydb/core/erasure/erasure_perf_test.cpp +++ b/ydb/core/erasure/erasure_perf_test.cpp @@ -99,7 +99,10 @@ std::pair<double, double> MeasureTime(TErasureType &type, TVector<ui32> &missedP const ui32 partSize = type.PartSize(TErasureType::CrcModeNone, dataSize); partSet.Parts.resize(type.TotalPartCount()); for (ui32 i = 0; i < type.TotalPartCount(); ++i) { - partSet.Parts[i].ReferenceTo(partSet.Parts[i].OwnedString.resize(partSize)); + auto data = partSet.Parts[i].OwnedString.GetContiguousSpan(); + TString newOwnedString(partSize, '\0'); + memcpy(newOwnedString.Detach(), data.data(), data.size()); + partSet.Parts[i].ReferenceTo(partSet.Parts[i].OwnedString); } } if (measureSplit) { @@ -113,13 +116,10 @@ std::pair<double, double> MeasureTime(TErasureType &type, TVector<ui32> &missedP time += timer.PassedReset() / attempts; } if (isRestoreFullData) { - std::vector<TString> restoredData; + std::vector<TRope> restoredData; restoredData.resize(attempts); for (auto& restored : restoredData) { - restored.resize(dataSize); - for (ui64 i = 0; i < dataSize; ++i) { - restored[i] = 0; - } + restored = TString(dataSize, '\0'); } // Remove the 'missing' parts for (auto& partSet : partSets) { @@ -136,7 +136,7 @@ std::pair<double, double> MeasureTime(TErasureType &type, TVector<ui32> &missedP time += timer.PassedReset() / attempts; } for (size_t i = 0; i < attempts; ++i) { - UNIT_ASSERT_EQUAL(originalData[i], restoredData[i]); + UNIT_ASSERT_EQUAL(TRope(originalData[i]), restoredData[i]); } } times.push_back(time); @@ -250,7 +250,7 @@ void RopeMeasureSplitTime(TErasureType &type, ui64 dataSize, const TString& buff type.SplitData(TErasureType::CrcModeNone, rope, partSet); if (convertToRope) { for (int i = 0; i < 6; ++i) { - ropes[i] = RopeFromStringReference(std::move(partSet.Parts[i].OwnedString)); + ropes[i] = partSet.Parts[i].OwnedString; } } #ifdef LONG_TEST diff --git a/ydb/core/erasure/erasure_ut.cpp b/ydb/core/erasure/erasure_ut.cpp index 2473bd16506..731230ff304 100644 --- a/ydb/core/erasure/erasure_ut.cpp +++ b/ydb/core/erasure/erasure_ut.cpp @@ -48,7 +48,7 @@ void TestMissingPartWithRandomData(TErasureType &groupType, ui32 *missingPartIdx partSet.Parts[missingPartIdx[i]].clear(); } // Restore the data - TString restoredString; + TRope restoredString; groupType.RestoreData(TErasureType::CrcModeNone, partSet, restoredString, isRestoreParts, isRestoreFullData, isRestoreParts); @@ -191,7 +191,7 @@ void RunTestDiff(TErasureType &groupType, ui32 dataSize, const TString &testStri } for (ui32 partIdx = 0; partIdx < dataParts + parityParts; ++partIdx) { - UNIT_ASSERT_STRINGS_EQUAL(partSet.Parts[partIdx].OwnedString, resultPartSet.Parts[partIdx].OwnedString); + UNIT_ASSERT_STRINGS_EQUAL(partSet.Parts[partIdx].OwnedString.ConvertToString(), resultPartSet.Parts[partIdx].OwnedString.ConvertToString()); } } @@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { UNIT_ASSERT_EQUAL_C(partSet.Parts[i].size(), expectedParts[i].size(), Sprintf("%lu == %lu", partSet.Parts[i].size(), expectedParts[i].size())); for (ui32 j = 0; j < partSet.Parts[i].size(); ++j) { - UNIT_ASSERT_EQUAL( (ui8)partSet.Parts[i].OwnedString[j], expectedParts[i][j]); + UNIT_ASSERT_EQUAL( (ui8)partSet.Parts[i].OwnedString.GetContiguousSpan().data()[j], expectedParts[i][j]); } } } @@ -391,7 +391,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { (isRestoreParts ? "true" : "false"), (isRestoreFullData ? "true" : "false")); - TString restoredString; + TRope restoredString; try { groupType.RestoreData(TErasureType::CrcModeNone, partSet, restoredString, isRestoreParts, isRestoreFullData, isRestoreParts); @@ -404,9 +404,9 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { if (isRestoreFullData) { UNIT_ASSERT_EQUAL_C(testString.size(), restoredString.size(), errorInfo); for (ui32 i = 0; i < testString.size(); ++i) { - UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], ((char*)restoredString.data())[i], + UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], (restoredString.UnsafeGetContiguousSpanMut().data())[i], (errorInfo + mode + " (full data)")); - if (((char*)testString.data())[i] != ((char*)restoredString.data())[i]) { + if (((char*)testString.data())[i] != (restoredString.UnsafeGetContiguousSpanMut().data())[i]) { VERBOSE_COUT("mismatch " << errorInfo << mode << " (full data)" << Endl); break; } @@ -711,7 +711,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { partSet.Parts[missingPartIdx[idx]].clear(); } } - partSet.FullDataFragment.UninitializedOwnedWhole(dataSize);; + partSet.FullDataFragment.UninitializedOwnedWhole(dataSize); TString mode = Sprintf(" restoreParts=%s isRestoreParityParts=%s restoreFullData=%s ", @@ -720,7 +720,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { (isRestoreFullData ? "true" : "false")); VERBOSE_COUT("RestoreData " << errorInfo << Endl); - TString restoredString; + TRope restoredString; try { groupType.RestoreData(crcMode, partSet, restoredString, isRestoreParts, isRestoreFullData, isRestoreParityParts); @@ -733,7 +733,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { if (isRestoreFullData) { UNIT_ASSERT_EQUAL_C(testString.size(), restoredString.size(), errorInfo); for (ui32 i = 0; i < testString.size(); ++i) { - UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], ((char*)restoredString.data())[i], + UNIT_ASSERT_EQUAL_C(((char*)testString.data())[i], (restoredString.UnsafeGetContiguousSpanMut().data())[i], (errorInfo + erasureName + mode + " (full data)")); } } @@ -1036,7 +1036,7 @@ Y_UNIT_TEST_SUITE(TErasureTypeTest) { for (ui32 idx = 0; idx < partSet.Parts.size(); ++idx) { ui32 cutBegin = Min(partSize, needBegin); ui32 cutSize = Min(partSize, needEnd) - cutBegin; - partSet.Parts[idx].ReferenceTo(partSet.Parts[idx].OwnedString.substr(cutBegin, cutSize), + partSet.Parts[idx].ReferenceTo(partSet.Parts[idx].OwnedString.ConvertToString().substr(cutBegin, cutSize), cutBegin, cutSize, partSize); } diff --git a/ydb/core/util/defs.h b/ydb/core/util/defs.h index 04dab350864..b587a43d7db 100644 --- a/ydb/core/util/defs.h +++ b/ydb/core/util/defs.h @@ -17,3 +17,7 @@ #include <util/generic/vector.h> #include <util/datetime/base.h> #include <util/generic/ylimits.h> + +namespace NKikimr { +using namespace NActors; +} // namespace NKikimr diff --git a/ydb/core/util/fragmented_buffer.cpp b/ydb/core/util/fragmented_buffer.cpp index 6da04399b60..3ae32c380ef 100644 --- a/ydb/core/util/fragmented_buffer.cpp +++ b/ydb/core/util/fragmented_buffer.cpp @@ -1,6 +1,7 @@ #include "fragmented_buffer.h" #include <util/stream/str.h> +#include <library/cpp/actors/util/shared_data_rope_backend.h> namespace NKikimr { @@ -9,7 +10,8 @@ TFragmentedBuffer::TFragmentedBuffer() { void TFragmentedBuffer::Insert(i32 begin, const char* source, i32 bytesToCopy) { Y_VERIFY(bytesToCopy); - BufferForOffset[begin].AssignNoAlias(source, bytesToCopy); + //FIXME(innokentii): can save allocation + BufferForOffset[begin] = TRope(MakeIntrusive<TRopeSharedDataBackend>(TSharedData::Copy(source, bytesToCopy))); } @@ -17,12 +19,12 @@ bool TFragmentedBuffer::IsMonolith() const { return (BufferForOffset.size() == 1 && BufferForOffset.begin()->first == 0); } -TString TFragmentedBuffer::GetMonolith() { +TRope TFragmentedBuffer::GetMonolith() { Y_VERIFY(IsMonolith()); return BufferForOffset.begin()->second; } -void TFragmentedBuffer::SetMonolith(TString &data) { +void TFragmentedBuffer::SetMonolith(TRope &data) { Y_VERIFY(data); BufferForOffset.clear(); BufferForOffset.emplace(0, data); @@ -64,7 +66,7 @@ void TFragmentedBuffer::Write(i32 begin, const char* buffer, i32 size) { Y_VERIFY(it->first + i32(it->second.size()) > offset); i32 bytesToNext = it->first + it->second.size() - offset; i32 bytesToInsert = Min(bytesToCopy, bytesToNext); - char *destination = const_cast<char*>(it->second.data()) + offset - it->first; + char *destination = it->second.UnsafeGetContiguousSpanMut().data() + offset - it->first; memcpy(destination, source, bytesToInsert); source += bytesToInsert; offset += bytesToInsert; @@ -97,7 +99,7 @@ void TFragmentedBuffer::Read(i32 begin, char* buffer, i32 size) const { Y_VERIFY(it->first + i32(it->second.size()) > offset); i32 bytesToNext = it->first + it->second.size() - offset; i32 bytesToInsert = Min(bytesToCopy, bytesToNext); - const char *source = it->second.data() + offset - it->first; + const char *source = const_cast<TRope&>(it->second).GetContiguousSpan().data() + offset - it->first; memcpy(destination, source, bytesToInsert); destination += bytesToInsert; offset += bytesToInsert; @@ -125,7 +127,7 @@ std::pair<const char*, i32> TFragmentedBuffer::Get(i32 begin) const { --it; const i32 offset = begin - it->first; Y_VERIFY(offset >= 0 && (size_t)offset < it->second.size()); - return std::make_pair(it->second.data() + offset, it->second.size() - offset); + return std::make_pair(const_cast<TRope&>(it->second).GetContiguousSpan().data() + offset, it->second.size() - offset); } void TFragmentedBuffer::CopyFrom(const TFragmentedBuffer& from, const TIntervalSet<i32>& range) { diff --git a/ydb/core/util/fragmented_buffer.h b/ydb/core/util/fragmented_buffer.h index 0e7ce9f8425..65088aa6b5e 100644 --- a/ydb/core/util/fragmented_buffer.h +++ b/ydb/core/util/fragmented_buffer.h @@ -1,21 +1,22 @@ #pragma once #include "defs.h" +#include <library/cpp/actors/util/rope.h> #include <util/generic/map.h> #include "interval_set.h" namespace NKikimr { class TFragmentedBuffer { - TMap<i32, TString> BufferForOffset; + TMap<i32, TRope> BufferForOffset; void Insert(i32 begin, const char* source, i32 bytesToCopy); public: TFragmentedBuffer(); bool IsMonolith() const; - TString GetMonolith(); - void SetMonolith(TString &data); + TRope GetMonolith(); + void SetMonolith(TRope &data); void Write(i32 begin, const char* buffer, i32 size); void Read(i32 begin, char* buffer, i32 size) const; diff --git a/ydb/core/util/fragmented_buffer_ut.cpp b/ydb/core/util/fragmented_buffer_ut.cpp index 7e7dee86df5..7b9dac72b7d 100644 --- a/ydb/core/util/fragmented_buffer_ut.cpp +++ b/ydb/core/util/fragmented_buffer_ut.cpp @@ -130,22 +130,22 @@ Y_UNIT_TEST_SUITE(TFragmentedBufferTest) { fb.Read(0, buffer, 3); UNIT_ASSERT_VALUES_EQUAL(buffer, data2); UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), true); - TString res = fb.GetMonolith(); + TRope res = fb.GetMonolith(); UNIT_ASSERT_VALUES_EQUAL(res.size(), 3); - UNIT_ASSERT_VALUES_EQUAL(memcmp(res.data(), data2, 3), 0); + UNIT_ASSERT_VALUES_EQUAL(memcmp(res.GetContiguousSpan().data(), data2, 3), 0); } Y_UNIT_TEST(TestSetMonolith) { - TString inData = "123"; + TRope inData = TString("123"); TFragmentedBuffer fb; fb.SetMonolith(inData); UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), true); - TString res = fb.GetMonolith(); - UNIT_ASSERT_VALUES_EQUAL(inData, res); + TRope res = fb.GetMonolith(); + UNIT_ASSERT_VALUES_EQUAL(inData.ConvertToString(), res.ConvertToString()); } Y_UNIT_TEST(TestReplaceWithSetMonolith) { - TString inData = "123"; + TRope inData = TString("123"); const char *data3v2 = "5"; const char *data4 = "678"; TFragmentedBuffer fb; @@ -156,8 +156,8 @@ Y_UNIT_TEST_SUITE(TFragmentedBufferTest) { UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), false); fb.SetMonolith(inData); UNIT_ASSERT_VALUES_EQUAL(fb.IsMonolith(), true); - TString res = fb.GetMonolith(); - UNIT_ASSERT_VALUES_EQUAL(inData, res); + TRope res = fb.GetMonolith(); + UNIT_ASSERT_VALUES_EQUAL(inData.ConvertToString(), res.ConvertToString()); } Y_UNIT_TEST(CopyFrom) { diff --git a/ydb/core/util/lz4_data_generator.h b/ydb/core/util/lz4_data_generator.h index 04ba9508f5a..dd0ecbecae1 100644 --- a/ydb/core/util/lz4_data_generator.h +++ b/ydb/core/util/lz4_data_generator.h @@ -5,13 +5,21 @@ namespace NKikimr { -inline TString GenDataForLZ4(const ui64 size, const ui64 seed = 0) { - TString data = TString::Uninitialized(size); +template <class ResultContainer = TString> +inline ResultContainer GenDataForLZ4(const ui64 size, const ui64 seed = 0) { + ResultContainer data = ResultContainer::Uninitialized(size); const ui32 long_step = Max<ui32>(2027, size / 20); const ui32 short_step = Min<ui32>(53, long_step / 400); + char *buffer = [&]() -> char * { + if constexpr(std::is_same<ResultContainer, TString>::value) { + return data.Detach(); + } else { + return data.mutable_data(); + } + }(); for (ui32 i = 0; i < data.size(); ++i) { const ui32 j = i + seed; - data[i] = 0xff & (j % short_step + j / long_step); + buffer[i] = 0xff & (j % short_step + j / long_step); } return data; } |