aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-06-28 12:17:41 +0300
committeralexvru <alexvru@ydb.tech>2023-06-28 12:17:41 +0300
commiteb92c027b4d7b9c94509d0b466f77e51ef3f6b6a (patch)
treea5a2fdae7a25edd96def12e7935e6d31dbb74b6a
parent7990574ff7313066d560591f279f2dd03e765ff4 (diff)
downloadydb-eb92c027b4d7b9c94509d0b466f77e51ef3f6b6a.tar.gz
Make ErasureSplit incremental again
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp5
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp149
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h4
-rw-r--r--ydb/core/erasure/erasure.h12
-rw-r--r--ydb/core/erasure/erasure_new_ut.cpp7
-rw-r--r--ydb/core/erasure/erasure_split.cpp81
7 files changed, 145 insertions, 116 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index f7983cf7c68..6d06f68f795 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -591,7 +591,8 @@ private:
void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,
const TBlobStorageGroupInfo &info);
-void EncryptInplace(TRope& rope, const TLogoBlobID& id, const TBlobStorageGroupInfo& info);
+
+void EncryptInplace(TRope& rope, ui32 offset, ui32 size, const TLogoBlobID& id, const TBlobStorageGroupInfo& info);
void Decrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,
const TBlobStorageGroupInfo &info);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp
index 5982fe4216f..62cf2a90fff 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_encrypt.cpp
@@ -56,12 +56,13 @@ namespace NKikimr {
Y_VERIFY(false, "Unexpected Encryption Mode# %" PRIu64, (ui64)info.GetEncryptionMode());
}
- void EncryptInplace(TRope& rope, const TLogoBlobID& id, const TBlobStorageGroupInfo& info) {
+ void EncryptInplace(TRope& rope, ui32 offset, ui32 size, const TLogoBlobID& id, const TBlobStorageGroupInfo& info) {
if (info.GetEncryptionMode() == TBlobStorageGroupInfo::EEM_NONE) {
return;
}
auto span = rope.GetContiguousSpanMut();
- Encrypt(span.data(), span.data(), 0, span.size(), id, info);
+ Y_VERIFY(offset < span.size() && size <= span.size() - offset);
+ Encrypt(span.data() + offset, span.data() + offset, offset, size, id, info);
}
void Decrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index 25bc9e153e3..6edf7d4d836 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -14,15 +14,6 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
namespace NKikimr {
-struct TEvResume : public TEventLocal<TEvResume, TEvBlobStorage::EvResume> {
- double WilsonSec;
- double AllocateSec;
- double WaitSec;
- double SplitSec;
- size_t Count;
- TBatchedVec<TStackVec<TRope, 8>> PartSets;
-};
-
struct TEvAccelerate : public TEventLocal<TEvAccelerate, TEvBlobStorage::EvAccelerate> {
ui64 CauseIdx;
@@ -39,6 +30,16 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
TPutImpl PutImpl;
TRootCause RootCauseTrack;
+ const TDuration MaxQuantumDuration = TDuration::MicroSeconds(500);
+ const ui32 MaxBytesToEncryptAtOnce = 256_KB;
+ const ui32 MaxBytesToSplitAtOnce = 256_KB;
+
+ ui32 BlobsEncrypted = 0;
+ ui32 CurrentEncryptionOffset = 0;
+ ui32 BlobsSplit = 0;
+ TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
+ TBatchedVec<TStackVec<TRope, 8>> PartSets;
+
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
ui64 WaitingVDiskCount = 0;
@@ -53,7 +54,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
TInstant StartTime;
NKikimrBlobStorage::EPutHandleClass HandleClass;
- THPTimer Timer;
i64 ReportedBytes;
TBlobStorageGroupProxyTimeStats TimeStats;
@@ -529,52 +529,47 @@ public:
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
}
- Become(&TThis::StateWait);
+ Become(&TThis::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
- Timer.Reset();
-
- double wilsonSec = Timer.PassedReset();
-
- const ui32 totalParts = Info->Type.TotalPartCount();
-
- TAutoPtr<TEvResume> resume(new TEvResume());
- resume->PartSets.resize(PutImpl.Blobs.size());
-
- for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) {
- TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId;
- EncryptInplace(PutImpl.Blobs[idx].Buffer, blobId, *Info);
- auto& parts = resume->PartSets[idx];
- parts.resize(totalParts);
+ PartSets.resize(PutImpl.Blobs.size());
+ for (auto& partSet : PartSets) {
+ partSet.resize(Info->Type.TotalPartCount());
}
- double allocateSec = Timer.PassedReset();
-
- resume->WilsonSec = wilsonSec;
- resume->AllocateSec = allocateSec;
- resume->WaitSec = 0.0;
- resume->SplitSec = 0.0;
- resume->Count = 0;
- if (RequestBytes < BufferSizeThreshold) {
- ResumeBootstrap(resume);
- } else {
- Send(SelfId(), resume.Release());
- for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
- LWTRACK(DSProxyPutPauseBootstrap, PutImpl.Blobs[blobIdx].Orbit);
- }
+ if (Info->GetEncryptionMode() == TBlobStorageGroupInfo::EEM_NONE) {
+ BlobsEncrypted = PartSets.size();
}
-
- Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
- SanityCheck(); // May Die
+ ResumeBootstrap();
}
- void Handle(TEvResume::TPtr &ev) {
- if (ev->Get()->Count == 0) {
- // Record only the first resume to keep tracks structure simple
- for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
- LWTRACK(DSProxyPutResumeBootstrap, PutImpl.Blobs[blobIdx].Orbit);
+ bool EncodeQuantum() {
+ const ui64 endTime = GetCycleCountFast() + DurationToCycles(MaxQuantumDuration);
+ bool firstIteration = true;
+ while (Min(BlobsEncrypted, BlobsSplit) < PutImpl.Blobs.size()) {
+ if (!firstIteration && endTime <= GetCycleCountFast()) {
+ return false;
+ }
+ firstIteration = false;
+
+ if (BlobsEncrypted <= BlobsSplit) { // first we encrypt the blob (if encryption is enabled)
+ auto& blob = PutImpl.Blobs[BlobsEncrypted];
+ const ui32 size = Min<ui32>(blob.Buffer.size() - CurrentEncryptionOffset, MaxBytesToEncryptAtOnce);
+ EncryptInplace(blob.Buffer, CurrentEncryptionOffset, size, blob.BlobId, *Info);
+ CurrentEncryptionOffset += size;
+ if (CurrentEncryptionOffset == blob.Buffer.size()) {
+ ++BlobsEncrypted;
+ CurrentEncryptionOffset = 0;
+ }
+ } else { // BlobsSplit < BlobsEncrypted -- then we split it
+ auto& blob = PutImpl.Blobs[BlobsSplit];
+ const auto crcMode = static_cast<TErasureType::ECrcMode>(blob.BlobId.CrcMode());
+ if (ErasureSplit(crcMode, Info->Type, blob.Buffer, PartSets[BlobsSplit], &ErasureSplitContext)) {
+ ++BlobsSplit;
+ ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
+ }
}
}
- ResumeBootstrap(ev->Release());
- SanityCheck(); // May Die
+
+ return true;
}
void Handle(TKikimrEvents::TEvWakeup::TPtr &ev) {
@@ -611,47 +606,29 @@ public:
}
}
- void ResumeBootstrap(TAutoPtr<TEvResume> resume) {
- double waitSec = Timer.PassedReset();
- resume->WaitSec += waitSec;
-
- Y_VERIFY(PutImpl.Blobs.size() == resume->PartSets.size());
- bool splitDone = true;
- for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) {
- auto& parts = resume->PartSets[idx];
- TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId;
- ErasureSplit((TErasureType::ECrcMode)blobId.CrcMode(), Info->Type, TRope(PutImpl.Blobs[idx].Buffer), parts);
- // TODO: ReportBytes(partSet.MemoryConsumed - PutImpl.Blobs[idx].BufferSize);
- }
- double splitSec = Timer.PassedReset();
- resume->SplitSec += splitSec;
- resume->Count++;
- LWPROBE(ProxyPutBootstrapPart, RequestBytes, waitSec * 1000.0, splitSec * 1000.0, resume->Count, resume->SplitSec * 1000.0);
-
- if (splitDone) {
- for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
- LWTRACK(DSProxyPutBootstrapDone, PutImpl.Blobs[blobIdx].Orbit,
- RequestBytes, resume->WilsonSec * 1000.0, resume->AllocateSec * 1000.0,
- resume->WaitSec * 1000.0, resume->SplitSec * 1000.0, resume->Count, blobIdx);
- }
+ template<typename TEvV, typename TCookie>
+ struct TIssue {
+ void operator ()(TThis& self) {
+ TDeque<std::unique_ptr<TEvV>> events;
+ self.PutImpl.GenerateInitialRequests(self.LogCtx, self.PartSets, events);
+ self.UpdatePengingVDiskResponseCount<TEvV, TCookie>(events);
+ self.RequestsSent += events.size();
+ self.CountPuts(events);
+ self.SendToQueues(events, self.TimeStatsEnabled);
+ }
+ };
+
+ void ResumeBootstrap() {
+ if (EncodeQuantum()) {
if (IsMultiPutMode) {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
- PutImpl.GenerateInitialRequests(LogCtx, resume->PartSets, vMultiPuts);
- UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts);
- RequestsSent += vMultiPuts.size();
- CountPuts(vMultiPuts);
- SendToQueues(vMultiPuts, TimeStatsEnabled);
+ TIssue<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>()(*this);
} else {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
- PutImpl.GenerateInitialRequests(LogCtx, resume->PartSets, vPuts);
- UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts);
- RequestsSent += vPuts.size();
- CountPuts(vPuts);
- SendToQueues(vPuts, TimeStatsEnabled);
+ TIssue<TEvBlobStorage::TEvVPut, TBlobCookie>()(*this);
}
} else {
- Send(SelfId(), resume.Release());
+ TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0));
}
+ SanityCheck();
}
STATEFN(StateWait) {
@@ -662,7 +639,7 @@ public:
hFunc(TEvBlobStorage::TEvVPutResult, Handle);
hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle);
hFunc(TEvAccelerate, Handle);
- hFunc(TEvResume, Handle);
+ cFunc(TEvBlobStorage::EvResume, ResumeBootstrap);
hFunc(TKikimrEvents::TEvWakeup, Handle);
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
index 65f913ca98a..4a48e10224a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
@@ -47,9 +47,9 @@ protected:
}
const TIntervalVec<i32> interval(0, state.Id.BlobSize());
Y_VERIFY(interval.IsSubsetOf(state.Whole.Here), "missing blob data State# %s", state.ToString().data());
- TRope whole = state.Whole.Data.Read(0, state.Id.BlobSize());
std::array<TRope, 3> parts;
- ErasureSplit((TErasureType::ECrcMode)state.Id.CrcMode(), info.Type, std::move(whole), parts);
+ ErasureSplit((TErasureType::ECrcMode)state.Id.CrcMode(), info.Type,
+ state.Whole.Data.Read(0, state.Id.BlobSize()), parts);
state.Parts[0].Data.SetMonolith(std::move(parts[0]));
return parts[1]; // must be the same as parts[0]
}
diff --git a/ydb/core/erasure/erasure.h b/ydb/core/erasure/erasure.h
index 5afa6ccb7bd..37b8c31b83b 100644
--- a/ydb/core/erasure/erasure.h
+++ b/ydb/core/erasure/erasure.h
@@ -380,9 +380,17 @@ protected:
bool CheckCrcAtTheEnd(TErasureType::ECrcMode crcMode, const TContiguousSpan& buf);
bool CheckCrcAtTheEnd(TErasureType::ECrcMode crcMode, const TRope& rope);
-void ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, TRope&& whole, std::span<TRope> parts);
+struct TErasureSplitContext {
+ ui32 MaxSizeAtOnce = 0;
+ ui32 Offset = 0;
+
+ static TErasureSplitContext Init(ui32 maxSizeAtOnce) { return {maxSizeAtOnce, 0}; }
+};
+
+bool ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, const TRope& whole, std::span<TRope> parts,
+ TErasureSplitContext *context = nullptr);
+
void ErasureRestore(TErasureType::ECrcMode crcMode, TErasureType erasure, ui32 fullSize, TRope *whole,
std::span<TRope> parts, ui32 restoreMask);
}
-
diff --git a/ydb/core/erasure/erasure_new_ut.cpp b/ydb/core/erasure/erasure_new_ut.cpp
index 2481c6491ea..727008b92fc 100644
--- a/ydb/core/erasure/erasure_new_ut.cpp
+++ b/ydb/core/erasure/erasure_new_ut.cpp
@@ -78,11 +78,16 @@ Y_UNIT_TEST_SUITE(ErasureBrandNew) {
erasure.SplitData(TErasureType::CrcModeNone, buffer, p1);
std::array<TRope, 6> p2;
- ErasureSplit(TErasureType::CrcModeNone, TErasureType::Erasure4Plus2Block, TRope(rope), p2);
+ ErasureSplit(TErasureType::CrcModeNone, TErasureType::Erasure4Plus2Block, rope, p2);
+
+ std::array<TRope, 6> p3;
+ TErasureSplitContext ctx = TErasureSplitContext::Init(32 * (1 + RandomNumber(1000u)));
+ while (!ErasureSplit(TErasureType::CrcModeNone, TErasureType::Erasure4Plus2Block, rope, p3, &ctx)) {}
for (ui32 i = 0; i < 6; ++i) {
UNIT_ASSERT_VALUES_EQUAL(p1.Parts[i].OwnedString.size(), p2[i].size());
UNIT_ASSERT_EQUAL(p1.Parts[i].OwnedString, p2[i]);
+ UNIT_ASSERT_EQUAL(p2[i], p3[i]);
}
}
}
diff --git a/ydb/core/erasure/erasure_split.cpp b/ydb/core/erasure/erasure_split.cpp
index 5d8c4e97f26..7073d2a0c5b 100644
--- a/ydb/core/erasure/erasure_split.cpp
+++ b/ydb/core/erasure/erasure_split.cpp
@@ -17,7 +17,7 @@ namespace NKikimr {
}
} ZeroBuffer;
- void ErasureSplitBlock42(TRope&& whole, std::span<TRope> parts) {
+ void ErasureSplitBlock42Prepare(const TRope& whole, std::span<TRope> parts) {
const ui32 blockSize = 32;
const ui32 fullBlockSize = 4 * blockSize;
const ui32 partSize = ((whole.size() + fullBlockSize - 1) & ~(fullBlockSize - 1)) / 4;
@@ -32,38 +32,61 @@ namespace NKikimr {
partLen += std::min(remains, blockSize);
remains -= blockSize;
}
- auto& r = parts[part];
+
auto nextIter = iter + partLen;
- r = {iter, nextIter};
- Y_VERIFY_DEBUG(r.size() == partLen);
- if (const ui32 padding = partSize - r.size()) {
- auto buffer = ZeroBuffer.GetBuffer();
- r.Insert(r.End(), TRcBuf(TRcBuf::Piece, buffer.data(), padding, buffer));
+ if (auto& r = parts[part]; !r) {
+ r = {iter, nextIter};
+ Y_VERIFY_DEBUG(r.size() == partLen);
+ if (const ui32 padding = partSize - r.size()) {
+ auto buffer = ZeroBuffer.GetBuffer();
+ r.Insert(r.End(), TRcBuf(TRcBuf::Piece, buffer.data(), padding, buffer));
+ }
}
iter = nextIter;
}
- TRcBuf xorPart = TRcBuf::Uninitialized(partSize);
- ui64 *ptr = reinterpret_cast<ui64*>(xorPart.GetDataMut());
- parts[4] = TRope(std::move(xorPart));
+ if (!parts[4]) {
+ TRcBuf xorPart = TRcBuf::Uninitialized(partSize);
+ parts[4] = TRope(std::move(xorPart));
+ }
+
+ if (!parts[5]) {
+ TRcBuf diagPart = TRcBuf::Uninitialized(partSize);
+ parts[5] = TRope(std::move(diagPart));
+ }
+ }
+
+ size_t ErasureSplitBlock42(std::span<TRope> parts, size_t offset, size_t size) {
+ const ui32 blockSize = 32;
+
+ Y_VERIFY_DEBUG(parts[0].size() == parts[1].size());
+ Y_VERIFY_DEBUG(parts[1].size() == parts[2].size());
+ Y_VERIFY_DEBUG(parts[2].size() == parts[3].size());
+
+ auto ptrSpan = parts[4].GetContiguousSpanMut();
+ Y_VERIFY_DEBUG(ptrSpan.size() == parts[0].size());
+ ui64 *ptr = reinterpret_cast<ui64*>(ptrSpan.data() + offset);
+ auto diagSpan = parts[5].GetContiguousSpanMut();
+ Y_VERIFY_DEBUG(diagSpan.size() == parts[0].size());
+ ui64 *diag = reinterpret_cast<ui64*>(diagSpan.data() + offset);
- TRcBuf diagPart = TRcBuf::Uninitialized(partSize);
- ui64 *diag = reinterpret_cast<ui64*>(diagPart.GetDataMut());
- parts[5] = TRope(std::move(diagPart));
+ auto iter0 = parts[0].begin() + offset;
+ auto iter1 = parts[1].begin() + offset;
+ auto iter2 = parts[2].begin() + offset;
+ auto iter3 = parts[3].begin() + offset;
- auto iter0 = parts[0].begin();
- auto iter1 = parts[1].begin();
- auto iter2 = parts[2].begin();
- auto iter3 = parts[3].begin();
+ size_t bytesProcessed = 0;
- while (iter0.Valid()) {
+ while (iter0.Valid() && size >= blockSize) {
const size_t size0 = iter0.ContiguousSize();
const size_t size1 = iter1.ContiguousSize();
const size_t size2 = iter2.ContiguousSize();
const size_t size3 = iter3.ContiguousSize();
- const size_t common = Min(size0, size1, size2, size3);
+ const size_t common = Min(size0, size1, size2, size3, size);
size_t numBlocks = Max<size_t>(1, common / blockSize);
const size_t numBytes = numBlocks * blockSize;
+ size -= numBytes;
+ bytesProcessed += numBytes;
#define FETCH_BLOCK(I) \
alignas(32) ui64 temp##I[4]; \
@@ -127,21 +150,35 @@ namespace NKikimr {
a3 += 4;
}
}
+
+ return bytesProcessed;
}
- void ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, TRope&& whole, std::span<TRope> parts) {
+ bool ErasureSplit(TErasureType::ECrcMode crcMode, TErasureType erasure, const TRope& whole, std::span<TRope> parts,
+ TErasureSplitContext *context) {
Y_VERIFY(parts.size() == erasure.TotalPartCount());
if (erasure.GetErasure() == TErasureType::Erasure4Plus2Block && crcMode == TErasureType::CrcModeNone) {
- return ErasureSplitBlock42(std::move(whole), parts);
+ ErasureSplitBlock42Prepare(whole, parts);
+ if (context) {
+ Y_VERIFY(context->MaxSizeAtOnce % 32 == 0);
+ context->Offset += ErasureSplitBlock42(parts, context->Offset, context->MaxSizeAtOnce);
+ return context->Offset == parts[0].size();
+ } else {
+ ErasureSplitBlock42(parts, 0, Max<size_t>());
+ return true;
+ }
}
TDataPartSet p;
- erasure.SplitData(crcMode, whole, p);
+ TRope copy(whole);
+ erasure.SplitData(crcMode, copy, p);
Y_VERIFY(p.Parts.size() == parts.size());
for (ui32 i = 0; i < parts.size(); ++i) {
parts[i] = std::move(p.Parts[i].OwnedString);
}
+
+ return true;
}
} // NKikimr