diff options
author | alexvru <alexvru@ydb.tech> | 2023-06-28 12:17:41 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-06-28 12:17:41 +0300 |
commit | eb92c027b4d7b9c94509d0b466f77e51ef3f6b6a (patch) | |
tree | a5a2fdae7a25edd96def12e7935e6d31dbb74b6a | |
parent | 7990574ff7313066d560591f279f2dd03e765ff4 (diff) | |
download | ydb-eb92c027b4d7b9c94509d0b466f77e51ef3f6b6a.tar.gz |
Make ErasureSplit incremental again
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy.h | 3 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp | 5 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_put.cpp | 149 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h | 4 | ||||
-rw-r--r-- | ydb/core/erasure/erasure.h | 12 | ||||
-rw-r--r-- | ydb/core/erasure/erasure_new_ut.cpp | 7 | ||||
-rw-r--r-- | ydb/core/erasure/erasure_split.cpp | 81 |
7 files changed, 145 insertions, 116 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index f7983cf7c68..6d06f68f795 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -591,7 +591,8 @@ private: void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id, const TBlobStorageGroupInfo &info); -void EncryptInplace(TRope& rope, const TLogoBlobID& id, const TBlobStorageGroupInfo& info); + +void EncryptInplace(TRope& rope, ui32 offset, ui32 size, const TLogoBlobID& id, const TBlobStorageGroupInfo& info); void Decrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id, const TBlobStorageGroupInfo &info); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp index 5982fe4216f..62cf2a90fff 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp @@ -56,12 +56,13 @@ namespace NKikimr { Y_VERIFY(false, "Unexpected Encryption Mode# %" PRIu64, (ui64)info.GetEncryptionMode()); } - void EncryptInplace(TRope& rope, const TLogoBlobID& id, const TBlobStorageGroupInfo& info) { + void EncryptInplace(TRope& rope, ui32 offset, ui32 size, const TLogoBlobID& id, const TBlobStorageGroupInfo& info) { if (info.GetEncryptionMode() == TBlobStorageGroupInfo::EEM_NONE) { return; } auto span = rope.GetContiguousSpanMut(); - Encrypt(span.data(), span.data(), 0, span.size(), id, info); + Y_VERIFY(offset < span.size() && size <= span.size() - offset); + Encrypt(span.data() + offset, span.data() + offset, offset, size, id, info); } void Decrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 25bc9e153e3..6edf7d4d836 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -14,15 +14,6 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); namespace NKikimr { -struct TEvResume : public TEventLocal<TEvResume, TEvBlobStorage::EvResume> { - double WilsonSec; - double AllocateSec; - double WaitSec; - double SplitSec; - size_t Count; - TBatchedVec<TStackVec<TRope, 8>> PartSets; -}; - struct TEvAccelerate : public TEventLocal<TEvAccelerate, TEvBlobStorage::EvAccelerate> { ui64 CauseIdx; @@ -39,6 +30,16 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt TPutImpl PutImpl; TRootCause RootCauseTrack; + const TDuration MaxQuantumDuration = TDuration::MicroSeconds(500); + const ui32 MaxBytesToEncryptAtOnce = 256_KB; + const ui32 MaxBytesToSplitAtOnce = 256_KB; + + ui32 BlobsEncrypted = 0; + ui32 CurrentEncryptionOffset = 0; + ui32 BlobsSplit = 0; + TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce); + TBatchedVec<TStackVec<TRope, 8>> PartSets; + TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount; ui64 WaitingVDiskCount = 0; @@ -53,7 +54,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt TInstant StartTime; NKikimrBlobStorage::EPutHandleClass HandleClass; - THPTimer Timer; i64 ReportedBytes; TBlobStorageGroupProxyTimeStats TimeStats; @@ -529,52 +529,47 @@ public: LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit); } - Become(&TThis::StateWait); + Become(&TThis::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup); - Timer.Reset(); - - double wilsonSec = Timer.PassedReset(); - - const ui32 totalParts = Info->Type.TotalPartCount(); - - TAutoPtr<TEvResume> resume(new TEvResume()); - resume->PartSets.resize(PutImpl.Blobs.size()); - - for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) { - TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId; - EncryptInplace(PutImpl.Blobs[idx].Buffer, blobId, *Info); - auto& parts = resume->PartSets[idx]; - parts.resize(totalParts); + PartSets.resize(PutImpl.Blobs.size()); + for (auto& partSet : PartSets) { + partSet.resize(Info->Type.TotalPartCount()); } - double allocateSec = Timer.PassedReset(); - - resume->WilsonSec = wilsonSec; - resume->AllocateSec = allocateSec; - resume->WaitSec = 0.0; - resume->SplitSec = 0.0; - resume->Count = 0; - if (RequestBytes < BufferSizeThreshold) { - ResumeBootstrap(resume); - } else { - Send(SelfId(), resume.Release()); - for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { - LWTRACK(DSProxyPutPauseBootstrap, PutImpl.Blobs[blobIdx].Orbit); - } + if (Info->GetEncryptionMode() == TBlobStorageGroupInfo::EEM_NONE) { + BlobsEncrypted = PartSets.size(); } - - Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup); - SanityCheck(); // May Die + ResumeBootstrap(); } - void Handle(TEvResume::TPtr &ev) { - if (ev->Get()->Count == 0) { - // Record only the first resume to keep tracks structure simple - for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { - LWTRACK(DSProxyPutResumeBootstrap, PutImpl.Blobs[blobIdx].Orbit); + bool EncodeQuantum() { + const ui64 endTime = GetCycleCountFast() + DurationToCycles(MaxQuantumDuration); + bool firstIteration = true; + while (Min(BlobsEncrypted, BlobsSplit) < PutImpl.Blobs.size()) { + if (!firstIteration && endTime <= GetCycleCountFast()) { + return false; + } + firstIteration = false; + + if (BlobsEncrypted <= BlobsSplit) { // first we encrypt the blob (if encryption is enabled) + auto& blob = PutImpl.Blobs[BlobsEncrypted]; + const ui32 size = Min<ui32>(blob.Buffer.size() - CurrentEncryptionOffset, MaxBytesToEncryptAtOnce); + EncryptInplace(blob.Buffer, CurrentEncryptionOffset, size, blob.BlobId, *Info); + CurrentEncryptionOffset += size; + if (CurrentEncryptionOffset == blob.Buffer.size()) { + ++BlobsEncrypted; + CurrentEncryptionOffset = 0; + } + } else { // BlobsSplit < BlobsEncrypted -- then we split it + auto& blob = PutImpl.Blobs[BlobsSplit]; + const auto crcMode = static_cast<TErasureType::ECrcMode>(blob.BlobId.CrcMode()); + if (ErasureSplit(crcMode, Info->Type, blob.Buffer, PartSets[BlobsSplit], &ErasureSplitContext)) { + ++BlobsSplit; + ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce); + } } } - ResumeBootstrap(ev->Release()); - SanityCheck(); // May Die + + return true; } void Handle(TKikimrEvents::TEvWakeup::TPtr &ev) { @@ -611,47 +606,29 @@ public: } } - void ResumeBootstrap(TAutoPtr<TEvResume> resume) { - double waitSec = Timer.PassedReset(); - resume->WaitSec += waitSec; - - Y_VERIFY(PutImpl.Blobs.size() == resume->PartSets.size()); - bool splitDone = true; - for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) { - auto& parts = resume->PartSets[idx]; - TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId; - ErasureSplit((TErasureType::ECrcMode)blobId.CrcMode(), Info->Type, TRope(PutImpl.Blobs[idx].Buffer), parts); - // TODO: ReportBytes(partSet.MemoryConsumed - PutImpl.Blobs[idx].BufferSize); - } - double splitSec = Timer.PassedReset(); - resume->SplitSec += splitSec; - resume->Count++; - LWPROBE(ProxyPutBootstrapPart, RequestBytes, waitSec * 1000.0, splitSec * 1000.0, resume->Count, resume->SplitSec * 1000.0); - - if (splitDone) { - for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { - LWTRACK(DSProxyPutBootstrapDone, PutImpl.Blobs[blobIdx].Orbit, - RequestBytes, resume->WilsonSec * 1000.0, resume->AllocateSec * 1000.0, - resume->WaitSec * 1000.0, resume->SplitSec * 1000.0, resume->Count, blobIdx); - } + template<typename TEvV, typename TCookie> + struct TIssue { + void operator ()(TThis& self) { + TDeque<std::unique_ptr<TEvV>> events; + self.PutImpl.GenerateInitialRequests(self.LogCtx, self.PartSets, events); + self.UpdatePengingVDiskResponseCount<TEvV, TCookie>(events); + self.RequestsSent += events.size(); + self.CountPuts(events); + self.SendToQueues(events, self.TimeStatsEnabled); + } + }; + + void ResumeBootstrap() { + if (EncodeQuantum()) { if (IsMultiPutMode) { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts; - PutImpl.GenerateInitialRequests(LogCtx, resume->PartSets, vMultiPuts); - UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts); - RequestsSent += vMultiPuts.size(); - CountPuts(vMultiPuts); - SendToQueues(vMultiPuts, TimeStatsEnabled); + TIssue<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>()(*this); } else { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; - PutImpl.GenerateInitialRequests(LogCtx, resume->PartSets, vPuts); - UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts); - RequestsSent += vPuts.size(); - CountPuts(vPuts); - SendToQueues(vPuts, TimeStatsEnabled); + TIssue<TEvBlobStorage::TEvVPut, TBlobCookie>()(*this); } } else { - Send(SelfId(), resume.Release()); + TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0)); } + SanityCheck(); } STATEFN(StateWait) { @@ -662,7 +639,7 @@ public: hFunc(TEvBlobStorage::TEvVPutResult, Handle); hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle); hFunc(TEvAccelerate, Handle); - hFunc(TEvResume, Handle); + cFunc(TEvBlobStorage::EvResume, ResumeBootstrap); hFunc(TKikimrEvents::TEvWakeup, Handle); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h index 65f913ca98a..4a48e10224a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h @@ -47,9 +47,9 @@ protected: } const TIntervalVec<i32> interval(0, state.Id.BlobSize()); Y_VERIFY(interval.IsSubsetOf(state.Whole.Here), "missing blob data State# %s", state.ToString().data()); - TRope whole = state.Whole.Data.Read(0, state.Id.BlobSize()); std::array<TRope, 3> parts; - ErasureSplit((TErasureType::ECrcMode)state.Id.CrcMode(), info.Type, std::move(whole), parts); + ErasureSplit((TErasureType::ECrcMode)state.Id.CrcMode(), info.Type, + state.Whole.Data.Read(0, state.Id.BlobSize()), parts); state.Parts[0].Data.SetMonolith(std::move(parts[0])); return parts[1]; // must be the same as parts[0] } diff --git a/ydb/core/erasure/erasure.h b/ydb/core/erasure/erasure.h index 5afa6ccb7bd..37b8c31b83b 100644 --- a/ydb/core/erasure/erasure.h +++ b/ydb/core/erasure/erasure.h @@ -380,9 +380,17 @@ protected: bool CheckCrcAtTheEnd(TErasureType::ECrcMode crcMode, const TContiguousSpan& buf); bool CheckCrcAtTheEnd(TErasureType::ECrcMode crcMode, const TRope& rope); -void ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, TRope&& whole, std::span<TRope> parts); +struct TErasureSplitContext { + ui32 MaxSizeAtOnce = 0; + ui32 Offset = 0; + + static TErasureSplitContext Init(ui32 maxSizeAtOnce) { return {maxSizeAtOnce, 0}; } +}; + +bool ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, const TRope& whole, std::span<TRope> parts, + TErasureSplitContext *context = nullptr); + void ErasureRestore(TErasureType::ECrcMode crcMode, TErasureType erasure, ui32 fullSize, TRope *whole, std::span<TRope> parts, ui32 restoreMask); } - diff --git a/ydb/core/erasure/erasure_new_ut.cpp b/ydb/core/erasure/erasure_new_ut.cpp index 2481c6491ea..727008b92fc 100644 --- a/ydb/core/erasure/erasure_new_ut.cpp +++ b/ydb/core/erasure/erasure_new_ut.cpp @@ -78,11 +78,16 @@ Y_UNIT_TEST_SUITE(ErasureBrandNew) { erasure.SplitData(TErasureType::CrcModeNone, buffer, p1); std::array<TRope, 6> p2; - ErasureSplit(TErasureType::CrcModeNone, TErasureType::Erasure4Plus2Block, TRope(rope), p2); + ErasureSplit(TErasureType::CrcModeNone, TErasureType::Erasure4Plus2Block, rope, p2); + + std::array<TRope, 6> p3; + TErasureSplitContext ctx = TErasureSplitContext::Init(32 * (1 + RandomNumber(1000u))); + while (!ErasureSplit(TErasureType::CrcModeNone, TErasureType::Erasure4Plus2Block, rope, p3, &ctx)) {} for (ui32 i = 0; i < 6; ++i) { UNIT_ASSERT_VALUES_EQUAL(p1.Parts[i].OwnedString.size(), p2[i].size()); UNIT_ASSERT_EQUAL(p1.Parts[i].OwnedString, p2[i]); + UNIT_ASSERT_EQUAL(p2[i], p3[i]); } } } diff --git a/ydb/core/erasure/erasure_split.cpp b/ydb/core/erasure/erasure_split.cpp index 5d8c4e97f26..7073d2a0c5b 100644 --- a/ydb/core/erasure/erasure_split.cpp +++ b/ydb/core/erasure/erasure_split.cpp @@ -17,7 +17,7 @@ namespace NKikimr { } } ZeroBuffer; - void ErasureSplitBlock42(TRope&& whole, std::span<TRope> parts) { + void ErasureSplitBlock42Prepare(const TRope& whole, std::span<TRope> parts) { const ui32 blockSize = 32; const ui32 fullBlockSize = 4 * blockSize; const ui32 partSize = ((whole.size() + fullBlockSize - 1) & ~(fullBlockSize - 1)) / 4; @@ -32,38 +32,61 @@ namespace NKikimr { partLen += std::min(remains, blockSize); remains -= blockSize; } - auto& r = parts[part]; + auto nextIter = iter + partLen; - r = {iter, nextIter}; - Y_VERIFY_DEBUG(r.size() == partLen); - if (const ui32 padding = partSize - r.size()) { - auto buffer = ZeroBuffer.GetBuffer(); - r.Insert(r.End(), TRcBuf(TRcBuf::Piece, buffer.data(), padding, buffer)); + if (auto& r = parts[part]; !r) { + r = {iter, nextIter}; + Y_VERIFY_DEBUG(r.size() == partLen); + if (const ui32 padding = partSize - r.size()) { + auto buffer = ZeroBuffer.GetBuffer(); + r.Insert(r.End(), TRcBuf(TRcBuf::Piece, buffer.data(), padding, buffer)); + } } iter = nextIter; } - TRcBuf xorPart = TRcBuf::Uninitialized(partSize); - ui64 *ptr = reinterpret_cast<ui64*>(xorPart.GetDataMut()); - parts[4] = TRope(std::move(xorPart)); + if (!parts[4]) { + TRcBuf xorPart = TRcBuf::Uninitialized(partSize); + parts[4] = TRope(std::move(xorPart)); + } + + if (!parts[5]) { + TRcBuf diagPart = TRcBuf::Uninitialized(partSize); + parts[5] = TRope(std::move(diagPart)); + } + } + + size_t ErasureSplitBlock42(std::span<TRope> parts, size_t offset, size_t size) { + const ui32 blockSize = 32; + + Y_VERIFY_DEBUG(parts[0].size() == parts[1].size()); + Y_VERIFY_DEBUG(parts[1].size() == parts[2].size()); + Y_VERIFY_DEBUG(parts[2].size() == parts[3].size()); + + auto ptrSpan = parts[4].GetContiguousSpanMut(); + Y_VERIFY_DEBUG(ptrSpan.size() == parts[0].size()); + ui64 *ptr = reinterpret_cast<ui64*>(ptrSpan.data() + offset); + auto diagSpan = parts[5].GetContiguousSpanMut(); + Y_VERIFY_DEBUG(diagSpan.size() == parts[0].size()); + ui64 *diag = reinterpret_cast<ui64*>(diagSpan.data() + offset); - TRcBuf diagPart = TRcBuf::Uninitialized(partSize); - ui64 *diag = reinterpret_cast<ui64*>(diagPart.GetDataMut()); - parts[5] = TRope(std::move(diagPart)); + auto iter0 = parts[0].begin() + offset; + auto iter1 = parts[1].begin() + offset; + auto iter2 = parts[2].begin() + offset; + auto iter3 = parts[3].begin() + offset; - auto iter0 = parts[0].begin(); - auto iter1 = parts[1].begin(); - auto iter2 = parts[2].begin(); - auto iter3 = parts[3].begin(); + size_t bytesProcessed = 0; - while (iter0.Valid()) { + while (iter0.Valid() && size >= blockSize) { const size_t size0 = iter0.ContiguousSize(); const size_t size1 = iter1.ContiguousSize(); const size_t size2 = iter2.ContiguousSize(); const size_t size3 = iter3.ContiguousSize(); - const size_t common = Min(size0, size1, size2, size3); + const size_t common = Min(size0, size1, size2, size3, size); size_t numBlocks = Max<size_t>(1, common / blockSize); const size_t numBytes = numBlocks * blockSize; + size -= numBytes; + bytesProcessed += numBytes; #define FETCH_BLOCK(I) \ alignas(32) ui64 temp##I[4]; \ @@ -127,21 +150,35 @@ namespace NKikimr { a3 += 4; } } + + return bytesProcessed; } - void ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, TRope&& whole, std::span<TRope> parts) { + bool ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, const TRope& whole, std::span<TRope> parts, + TErasureSplitContext *context) { Y_VERIFY(parts.size() == erasure.TotalPartCount()); if (erasure.GetErasure() == TErasureType::Erasure4Plus2Block && crcMode == TErasureType::CrcModeNone) { - return ErasureSplitBlock42(std::move(whole), parts); + ErasureSplitBlock42Prepare(whole, parts); + if (context) { + Y_VERIFY(context->MaxSizeAtOnce % 32 == 0); + context->Offset += ErasureSplitBlock42(parts, context->Offset, context->MaxSizeAtOnce); + return context->Offset == parts[0].size(); + } else { + ErasureSplitBlock42(parts, 0, Max<size_t>()); + return true; + } } TDataPartSet p; - erasure.SplitData(crcMode, whole, p); + TRope copy(whole); + erasure.SplitData(crcMode, copy, p); Y_VERIFY(p.Parts.size() == parts.size()); for (ui32 i = 0; i < parts.size(); ++i) { parts[i] = std::move(p.Parts[i].OwnedString); } + + return true; } } // NKikimr |