diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2024-01-25 14:13:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-25 14:13:50 +0300 |
commit | ff65c22f774717ca02ca65f03cac5b7e16a9d82b (patch) | |
tree | 2128e356084ef75590101c7df4fd486fd76595c3 | |
parent | 7eb75fa787d8f4813a7bba7029086c5cb2c3cdaa (diff) | |
download | ydb-ff65c22f774717ca02ca65f03cac5b7e16a9d82b.tar.gz |
Refactor GroupDiskRequests and remove unused stuff (#1272)
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy.h | 7 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp | 23 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h | 33 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_cookies.h | 155 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_get.cpp | 156 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp | 205 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h | 45 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h | 11 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_request.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp | 180 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp | 88 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h | 1 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp | 105 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/counting_events.cpp | 6 |
15 files changed, 169 insertions, 852 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index ad43358043d..9160d88de59 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -405,8 +405,7 @@ public: void SendToQueues(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, bool timeStatsEnabled) { for (auto& request : vGets) { - Y_ABORT_UNLESS(request->Record.HasCookie()); - ui64 messageCookie = request->Record.GetCookie(); + const ui64 messageCookie = request->Record.GetCookie(); CountEvent(*request); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; request->Record.MutableTimestamps()->SetSentByDSProxyUs(GetCycleCountFast() / cyclesPerUs); @@ -442,7 +441,6 @@ public: template <typename TEvent> void SendToQueues(TDeque<std::unique_ptr<TEvent>> &events, bool timeStatsEnabled) { for (auto& request : events) { - Y_ABORT_UNLESS(request->Record.HasCookie()); ui64 messageCookie = request->Record.GetCookie(); CountEvent(*request); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; @@ -650,8 +648,7 @@ IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupIn const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev, ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout, - TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, - bool isVMultiPutMode); + TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 9b49630b4f8..97706a1b6f1 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -293,28 +293,19 @@ TString TBlobState::TWholeState::ToString() const { // TGroupDiskRequests // -TGroupDiskRequests::TGroupDiskRequests(ui32 disks) { - DiskRequestsForOrderNumber.resize(disks); -} - -void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet) { - Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size()); - auto &requestsToSend = DiskRequestsForOrderNumber[diskOrderNumber].GetsToSend; - for (auto pair: intervalSet) { - requestsToSend.emplace_back(id, pair.first, pair.second - pair.first); +void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet) { + for (const auto& pair : intervalSet) { + GetsPending.emplace_back(diskOrderNumber, id, pair.first, pair.second - pair.first); } } -void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, - const ui32 size) { - Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size()); - DiskRequestsForOrderNumber[diskOrderNumber].GetsToSend.emplace_back(id, shift, size); +void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size) { + GetsPending.emplace_back(diskOrderNumber, id, shift, size); } -void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, +void TGroupDiskRequests::AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) { - Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size()); - DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, blobIdx); + PutsPending.emplace_back(diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 8083e17fd41..c128a013e50 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -108,13 +108,14 @@ struct TBlobState { }; struct TDiskGetRequest { + ui32 OrderNumber; TLogoBlobID Id; ui32 Shift; ui32 Size; - ssize_t PartMapIndex = -1; - TDiskGetRequest(const TLogoBlobID &id, const ui32 shift, const ui32 size) - : Id(id) + TDiskGetRequest(ui32 orderNumber, const TLogoBlobID &id, ui32 shift, ui32 size) + : OrderNumber(orderNumber) + , Id(id) , Shift(shift) , Size(size) {} @@ -127,14 +128,16 @@ struct TDiskPutRequest { ReasonInitial, ReasonAccelerate }; + ui32 OrderNumber; TLogoBlobID Id; TRope Buffer; EPutReason Reason; bool IsHandoff; ui8 BlobIdx; - TDiskPutRequest(const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx) - : Id(id) + TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx) + : OrderNumber(orderNumber) + , Id(id) , Buffer(std::move(buffer)) , Reason(reason) , IsHandoff(isHandoff) @@ -142,20 +145,13 @@ struct TDiskPutRequest { {} }; -struct TDiskRequests { - TDeque<TDiskGetRequest> GetsToSend; - TStackVec<TDiskPutRequest, TypicalPartsInBlob> PutsToSend; - ui32 FirstUnsentRequestIdx = 0; - ui32 FirstUnsentPutIdx = 0; -}; - struct TGroupDiskRequests { - TStackVec<TDiskRequests, TypicalDisksInGroup> DiskRequestsForOrderNumber; + std::vector<TDiskGetRequest> GetsPending; + std::vector<TDiskPutRequest> PutsPending; - TGroupDiskRequests(ui32 disks); - void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet); - void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size); - void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, + void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet); + void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size); + void AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx); }; @@ -188,8 +184,7 @@ struct TBlackboard { TBlackboard(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues, NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool isAllRequestsTogether = true) - : GroupDiskRequests(info->GetTotalVDisksNum()) - , Info(info) + : Info(info) , GroupQueues(groupQueues) , AccelerationMode(AccelerationModeSkipOneSlowest) , PutHandleClass(putHandleClass) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_cookies.h b/ydb/core/blobstorage/dsproxy/dsproxy_cookies.h deleted file mode 100644 index d9eb26f9483..00000000000 --- a/ydb/core/blobstorage/dsproxy/dsproxy_cookies.h +++ /dev/null @@ -1,155 +0,0 @@ -#pragma once - -#include "dsproxy.h" - -namespace NKikimr { - -struct TVGetCookie { - ui64 Raw; - // [0, 27] 28bit queryBeginIdx - // [28, 55] 28bit queryEndIdx - // [56, 63] 8bit causeIdx - - TVGetCookie(ui64 raw) - : Raw(raw) - {} - - TVGetCookie(ui64 queryBeginIdx, ui64 queryEndIdx) - : Raw(queryBeginIdx | (queryEndIdx << 28)) - { - Y_ABORT_UNLESS(queryBeginIdx < (1LL << 28)); - Y_ABORT_UNLESS(queryEndIdx < (1LL << 28)); - } - - ui64 GetQueryBeginIdx() const { - return Raw & 0xFFFFFFF; - } - - ui8 GetQueryEndIdx() const { - return (Raw >> 28) & 0xFFFFFFF; - } - - ui64 GetCauseIdx() const { - return (Raw >> 56) & 0xFF; - } - - void SetCauseIdx(ui64 causeIdx) { - // It's a debug feature support, it's a bad idea to VERIFY here - if (causeIdx > 255) { - causeIdx = 255; - } - ui64 prevCauseIdx = GetCauseIdx(); - Raw = Raw ^ ((causeIdx ^ prevCauseIdx) << 56); - } - - operator ui64 () const { - return Raw; - } -}; - -struct TBlobCookie { - ui64 Raw; - // [0, 7 ] 8bit vDiskOrderNumber - // [8, 15] 8bit partId . - // [16, 31] 16bit blobIdx - // [32, 55] 24bit requestIdx - // [56, 63] 8bit causeIdx - - TBlobCookie(ui64 raw) - : Raw(raw) - {} - - TBlobCookie(ui64 vDiskOrderNumber, ui64 blobIdx, ui64 partId, ui64 requestIdx) - : Raw(vDiskOrderNumber | (partId << 8) | (blobIdx << 16) | (requestIdx << 32)) - { - Y_ABORT_UNLESS(vDiskOrderNumber < 256); - Y_ABORT_UNLESS(blobIdx < (1LL << 16)); - Y_ABORT_UNLESS(partId < 256); - Y_ABORT_UNLESS(requestIdx < (1LL << 24)); - } - - ui64 GetVDiskOrderNumber() const { - return Raw & 0xFF; - } - - ui8 GetPartId() const { - return (Raw >> 8) & 0xFF; - } - - ui64 GetBlobIdx() const { - return (Raw >> 16) & 0xFFFF; - } - - ui64 GetCauseIdx() const { - return (Raw >> 56) & 0xFF; - } - - void SetCauseIdx(ui64 causeIdx) { - // It's a debug feature support, it's a bad idea to VERIFY here - if (causeIdx > 255) { - causeIdx = 255; - } - ui64 prevCauseIdx = GetCauseIdx(); - Raw = Raw ^ ((causeIdx ^ prevCauseIdx) << 56); - } - - ui64 GetRequestIdx() const { - return (Raw >> 32) & 0xFFFFFF; - } - - operator ui64 () const { - return Raw; - } -}; - -struct TVMultiPutCookie { - ui64 Raw; - // [0, 7 ] 8bit vDiskOrderNumber - // [8, 15] 8bit itemCount - // [16, 31] 16bit --- - // [32, 55] 24bit requestIdx - // [56, 63] 8bit causeIdx - - TVMultiPutCookie(ui64 raw) - : Raw(raw) - {} - - TVMultiPutCookie(ui64 vDiskOrderNumber, ui64 itemCount, ui64 requestIdx) - : Raw(vDiskOrderNumber | (itemCount << 8) | (requestIdx << 32)) - { - Y_ABORT_UNLESS(vDiskOrderNumber < 256); - Y_ABORT_UNLESS(itemCount < 256); - Y_ABORT_UNLESS(requestIdx < (1LL << 24)); - } - - ui64 GetVDiskOrderNumber() const { - return Raw & 0xFF; - } - - ui64 GetItemCount() const { - return (Raw >> 8) & 0xFF; - } - - ui64 GetCauseIdx() const { - return (Raw >> 56) & 0xFF; - } - - void SetCauseIdx(ui64 causeIdx) { - // It's a debug feature support, it's a bad idea to VERIFY here - if (causeIdx > 255) { - causeIdx = 255; - } - ui64 prevCauseIdx = GetCauseIdx(); - Raw = Raw ^ ((causeIdx ^ prevCauseIdx) << 56); - } - - ui64 GetRequestIdx() const { - return (Raw >> 32) & 0xFFFFFF; - } - - operator ui64 () const { - return Raw; - } -}; - -}//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp index 56161a130b9..918d31812bf 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp @@ -56,8 +56,6 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt bool IsPutAccelerated = false; bool IsPutAccelerateScheduled = false; - const bool IsVMultiPutMode = false; - void Handle(TEvAccelerateGet::TPtr &ev) { RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx); AccelerateGet(); @@ -75,19 +73,11 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt IsGetAccelerated = true; TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; - if (IsVMultiPutMode) { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts; - GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vMultiPuts); - *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size(); - *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); - SendVGetsAndVPuts(vGets, vMultiPuts); - } else { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; - GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts); - *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); - *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); - SendVGetsAndVPuts(vGets, vPuts); - } + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; + GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts); + *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); + *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); + SendVGetsAndVPuts(vGets, vPuts); } void AcceleratePut() { @@ -97,23 +87,15 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt IsPutAccelerated = true; TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; - if (IsVMultiPutMode) { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts; - GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vMultiPuts); - *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size(); - *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); - SendVGetsAndVPuts(vGets, vMultiPuts); - } else { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; - GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts); - *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); - *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); - SendVGetsAndVPuts(vGets, vPuts); - } + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; + GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts); + *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); + *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); + SendVGetsAndVPuts(vGets, vPuts); } - template <typename TPutEvent> - void SendVGetsAndVPuts(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, TDeque<std::unique_ptr<TPutEvent>> &vPuts) { + void SendVGetsAndVPuts(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &vPuts) { ReportBytes(GetImpl.GrabBytesToReport()); RequestsSent += vGets.size(); RequestsSent += vPuts.size(); @@ -125,11 +107,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt } } for (size_t i = 0; i < vGets.size(); ++i) { - Y_ABORT_UNLESS(vGets[i]->Record.HasCookie()); - TVGetCookie cookie(vGets[i]->Record.GetCookie()); if (RootCauseTrack.IsOn) { - cookie.SetCauseIdx(RootCauseTrack.RegisterCause()); - vGets[i]->Record.SetCookie(cookie); + vGets[i]->Record.SetCookie(RootCauseTrack.RegisterCause()); } Y_ABORT_UNLESS(vGets[i]->Record.HasVDiskID()); TVDiskID vDiskId = VDiskIDFromVDiskID(vGets[i]->Record.GetVDiskID()); @@ -141,11 +120,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt DiskCounters[orderNumber].Sent++; } for (size_t i = 0; i < vPuts.size(); ++i) { - Y_ABORT_UNLESS(vPuts[i]->Record.HasCookie()); - TBlobCookie cookie(vPuts[i]->Record.GetCookie()); if (RootCauseTrack.IsOn) { - cookie.SetCauseIdx(RootCauseTrack.RegisterCause()); - vPuts[i]->Record.SetCookie(cookie); + vPuts[i]->Record.SetCookie(RootCauseTrack.RegisterCause()); } Y_ABORT_UNLESS(vPuts[i]->Record.HasVDiskID()); TVDiskID vDiskId = VDiskIDFromVDiskID(vPuts[i]->Record.GetVDiskID()); @@ -216,10 +192,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()), NKikimrProto::EReplyStatus_Name(record.GetStatus())); - Y_ABORT_UNLESS(record.HasCookie()); - TVGetCookie cookie(record.GetCookie()); - if (RootCauseTrack.IsOn) { - RootCauseTrack.OnReply(cookie.GetCauseIdx(), + if (RootCauseTrack.IsOn && record.HasCookie()) { + RootCauseTrack.OnReply(record.GetCookie(), GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), GetVDiskTimeMs(record.GetTimestamps())); } @@ -233,15 +207,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; TAutoPtr<TEvBlobStorage::TEvGetResult> getResult; ResponsesReceived++; - if (IsVMultiPutMode) { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts; - GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vMultiPuts, getResult); - SendVGetsAndVPuts(vGets, vMultiPuts); - } else { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; - GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vPuts, getResult); - SendVGetsAndVPuts(vGets, vPuts); - } + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; + GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vPuts, getResult); + SendVGetsAndVPuts(vGets, vPuts); if (getResult) { SendReplyAndDie(getResult); @@ -274,77 +242,16 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt return LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID()); } - TLogoBlobID GetFirstBlobId(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) { - Y_ABORT_UNLESS(ev->Get()->Record.ItemsSize()); - return LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetItems(0).GetBlobID()); - } - ui64 SumBlobSize(TEvBlobStorage::TEvVPutResult::TPtr &ev) { return GetFirstBlobId(ev).BlobSize(); } - ui64 SumBlobSize(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) { - ui64 sum = 0; - for (auto &item : ev->Get()->Record.GetItems()) { - TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - sum += blobId.BlobSize(); - } - return sum; - } - void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) { ProcessReplyFromQueue(ev); - HandleVPutResult<TEvBlobStorage::TEvVPut, TEvBlobStorage::TEvVPutResult>(ev); - } - - void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) { - ProcessReplyFromQueue(ev); - HandleVPutResult<TEvBlobStorage::TEvVMultiPut, TEvBlobStorage::TEvVMultiPutResult>(ev); + HandleVPutResult(ev); } - bool HandleVPutInnerErrorStatuses(TEvBlobStorage::TEvVPutResult::TPtr &ev, - TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) - { - Y_UNUSED(ev, outGetResult); - return false; - } - - bool HandleVPutInnerErrorStatuses(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev, - TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) - { - const auto &record = ev->Get()->Record; - const NKikimrProto::EReplyStatus status = record.GetStatus(); - const TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); - - for (auto &item : record.GetItems()) { - Y_ABORT_UNLESS(item.HasStatus()); - Y_ABORT_UNLESS(item.HasBlobID()); - Y_ABORT_UNLESS(item.HasCookie()); - NKikimrProto::EReplyStatus itemStatus = item.GetStatus(); - TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - Y_ABORT_UNLESS(itemStatus != NKikimrProto::RACE); // we should get RACE for the whole request and handle it in CheckForTermErrors - if (itemStatus == NKikimrProto::BLOCKED || itemStatus == NKikimrProto::DEADLINE) { - R_LOG_ERROR_S("BPG26", "Handle TEvVMultiPutResultItem" - << " blobId# " << blobId.ToString() - << " status# " << NKikimrProto::EReplyStatus_Name(status) - << " itemStatus# " << NKikimrProto::EReplyStatus_Name(itemStatus)); - TStringStream errorReason; - errorReason << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vDiskId; - ErrorReason = errorReason.Str(); - GetImpl.PrepareReply(itemStatus, ErrorReason, LogCtx, outGetResult); - return true; - } else { - R_LOG_DEBUG_S("BPG27", "Handle TEvVMultiPutResultItem" - << " blobId# " << blobId.ToString() - << " status# " << NKikimrProto::EReplyStatus_Name(status) - << " itemStatus# " << NKikimrProto::EReplyStatus_Name(itemStatus)); - } - } - return false; - } - - template <typename TPutEvent, typename TPutEventResult> - void HandleVPutResult(typename TPutEventResult::TPtr &ev) { + void HandleVPutResult(TEvBlobStorage::TEvVPutResult::TPtr &ev) { Y_ABORT_UNLESS(ev->Get()->Record.HasStatus()); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; @@ -369,10 +276,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt NKikimrBlobStorage::EPutHandleClass_Name(GetImpl.GetPutHandleClass()), NKikimrProto::EReplyStatus_Name(status)); - Y_ABORT_UNLESS(record.HasCookie()); - TBlobCookie cookie(record.GetCookie()); - if (RootCauseTrack.IsOn) { - RootCauseTrack.OnReply(cookie.GetCauseIdx(), + if (RootCauseTrack.IsOn && record.HasCookie()) { + RootCauseTrack.OnReply(record.GetCookie(), GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), GetVDiskTimeMs(record.GetTimestamps())); } @@ -384,15 +289,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt DiskCounters[orderNumber].Received++; TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; - TDeque<std::unique_ptr<TPutEvent>> vPuts; + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; TAutoPtr<TEvBlobStorage::TEvGetResult> getResult; ResponsesReceived++; - if (HandleVPutInnerErrorStatuses(ev, getResult)) { - Y_ABORT_UNLESS(getResult); - SendReplyAndDie(getResult); - return; - } GetImpl.OnVPutResult(LogCtx, *ev->Get(), vGets, vPuts, getResult); SendVGetsAndVPuts(vGets, vPuts); if (getResult) { @@ -491,7 +391,7 @@ public: const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev, ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout, TMaybe<TGroupStat::EKind> latencyQueueKind, - TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode) + TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo, latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get", @@ -504,7 +404,6 @@ public: , RequestsSent(0) , ResponsesReceived(0) , ReportedBytes(0) - , IsVMultiPutMode(isVMultiPutMode) { ReportBytes(sizeof(*this)); MaxSaneRequests = ev->QuerySize * info->Type.TotalPartCount() * (1 + info->Type.Handoff()) * 3; @@ -557,7 +456,6 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvVGetResult, Handle); hFunc(TEvBlobStorage::TEvVPutResult, Handle); - hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle); hFunc(TEvAccelerateGet, Handle); hFunc(TEvAcceleratePut, Handle); } @@ -569,9 +467,9 @@ IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupIn const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev, ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout, TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, - TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode) { + TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { return new TBlobStorageGroupGetRequest(info, state, source, mon, ev, cookie, std::move(traceId), - std::move(nodeLayout), latencyQueueKind, now, storagePoolCounters, isVMultiPutMode); + std::move(nodeLayout), latencyQueueKind, now, storagePoolCounters); } }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 3c9538fc244..003a4e1a777 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -226,10 +226,6 @@ TString TGetImpl::DumpFullState() const { str << Endl; str << " VPutResponses# " << VPutResponses; str << Endl; - str << " VMultiPutRequests# " << VMultiPutRequests; - str << Endl; - str << " VMultiPutResponses# " << VMultiPutResponses; - str << Endl; str << " IsNoData# " << IsNoData; str << Endl; @@ -275,118 +271,54 @@ void TGetImpl::GenerateInitialRequests(TLogContext &logCtx, TDeque<std::unique_p } void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets) { - for (ui32 diskOrderNumber = 0; diskOrderNumber < Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); - ++diskOrderNumber) { - TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber]; - ui32 endIdx = requests.GetsToSend.size(); - ui32 beginIdx = requests.FirstUnsentRequestIdx; - - if (beginIdx < endIdx) { - TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber); - auto vGet = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vDiskId, Deadline, Blackboard.GetHandleClass, - TEvBlobStorage::TEvVGet::EFlags::None, TVGetCookie(beginIdx, endIdx), {}, ForceBlockTabletData); - for (ui32 idx = beginIdx; idx < endIdx; ++idx) { - TDiskGetRequest &get = requests.GetsToSend[idx]; - ui64 cookie = idx; - vGet->AddExtremeQuery(get.Id, get.Shift, get.Size, &cookie); - if (ReportDetailedPartMap) { - get.PartMapIndex = Blackboard.AddPartMap(get.Id, diskOrderNumber, RequestIndex); - } - } - vGet->Record.SetSuppressBarrierCheck(IsInternal); - vGet->Record.SetTabletId(TabletId); - vGet->Record.SetAcquireBlockedGeneration(AcquireBlockedGeneration); - - if (ReaderTabletData) { - auto msg = vGet->Record.MutableReaderTabletData(); - msg->SetId(ReaderTabletData->Id); - msg->SetGeneration(ReaderTabletData->Generation); - } - - R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << diskOrderNumber - << " beginIdx# " << beginIdx - << " endIdx# " << endIdx - << " vGet# " << vGet->ToString()); - outVGets.push_back(std::move(vGet)); - ++RequestIndex; - Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentRequestIdx = endIdx; + TStackVec<std::unique_ptr<TEvBlobStorage::TEvVGet>, TypicalDisksInGroup> gets(Info->GetTotalVDisksNum()); + + for (auto& get : Blackboard.GroupDiskRequests.GetsPending) { + auto& vget = gets[get.OrderNumber]; + if (!vget) { + const TVDiskID vdiskId = Info->GetVDiskId(get.OrderNumber); + vget = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, Deadline, Blackboard.GetHandleClass, + TEvBlobStorage::TEvVGet::EFlags::None, {}, {}, ForceBlockTabletData); } + std::optional<ui64> cookie; + if (ReportDetailedPartMap) { + cookie = Blackboard.AddPartMap(get.Id, get.OrderNumber, RequestIndex); + } + vget->AddExtremeQuery(get.Id, get.Shift, get.Size, cookie ? &cookie.value() : nullptr); + vget->Record.SetSuppressBarrierCheck(IsInternal); + vget->Record.SetTabletId(TabletId); + vget->Record.SetAcquireBlockedGeneration(AcquireBlockedGeneration); + if (ReaderTabletData) { + auto msg = vget->Record.MutableReaderTabletData(); + msg->SetId(ReaderTabletData->Id); + msg->SetGeneration(ReaderTabletData->Generation); + } + R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber + << " vget# " << vget->ToString()); } -} -void TGetImpl::PrepareVPuts(TLogContext &logCtx, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) { - for (ui32 diskOrderNumber = 0; diskOrderNumber < Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); - ++diskOrderNumber) { - const TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber]; - ui32 endIdx = requests.PutsToSend.size(); - ui32 beginIdx = requests.FirstUnsentPutIdx; - - if (beginIdx < endIdx) { - TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber); - for (ui32 idx = beginIdx; idx < endIdx; ++idx) { - const TDiskPutRequest &put = requests.PutsToSend[idx]; - ui64 cookie = TBlobCookie(diskOrderNumber, put.BlobIdx, put.Id.PartId(), - VPutRequests); - Y_DEBUG_ABORT_UNLESS(Info->Type.GetErasure() != TBlobStorageGroupType::ErasureMirror3of4 || - put.Id.PartId() != 3 || put.Buffer.IsEmpty()); - auto vPut = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vDiskId, true, &cookie, - Deadline, Blackboard.PutHandleClass); - R_LOG_DEBUG_SX(logCtx, "BPG15", "Send put to orderNumber# " << diskOrderNumber << " idx# " << idx - << " vPut# " << vPut->ToString()); - outVPuts.push_back(std::move(vPut)); - ++VPutRequests; - ReceivedVPutResponses.push_back(false); - } - Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentPutIdx = endIdx; + for (auto& vget : gets) { + if (vget) { + outVGets.push_back(std::move(vget)); + ++RequestIndex; } } + + Blackboard.GroupDiskRequests.GetsPending.clear(); } -void TGetImpl::PrepareVPuts(TLogContext &logCtx, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts) { - for (ui32 diskOrderNumber = 0; diskOrderNumber < Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); - ++diskOrderNumber) { - const TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber]; - ui32 endIdx = requests.PutsToSend.size(); - ui32 beginIdx = requests.FirstUnsentPutIdx; - if (beginIdx < endIdx) { - TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber); - // set cookie after adding items - auto vMultiPut = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, Deadline, Blackboard.PutHandleClass, - true, nullptr); - ui64 bytes = 0; - ui64 lastItemCount = 0; - for (ui32 idx = beginIdx; idx < endIdx; ++idx) { - const TDiskPutRequest &put = requests.PutsToSend[idx]; - ui64 cookie = TBlobCookie(diskOrderNumber, put.BlobIdx, put.Id.PartId(), VMultiPutRequests); - ui64 itemSize = vMultiPut->Record.ItemsSize(); - if (itemSize == MaxBatchedPutRequests || bytes + put.Buffer.size() > MaxBatchedPutSize) { - vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests)); - ++VMultiPutRequests; - ReceivedVMultiPutResponses.push_back(false); - R_LOG_DEBUG_SX(logCtx, "BPG16", "Send multiPut to orderNumber# " << diskOrderNumber << " count# " - << vMultiPut->Record.ItemsSize() << " vMultiPut# " << vMultiPut->ToString()); - outVMultiPuts.push_back(std::move(vMultiPut)); - // set cookie after adding items - vMultiPut = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, Deadline, - Blackboard.PutHandleClass, true, nullptr); - bytes = 0; - lastItemCount = 0; - } - bytes += put.Buffer.size(); - lastItemCount++; - vMultiPut->AddVPut(put.Id, TRcBuf(TRope(put.Buffer)), &cookie, nullptr, NWilson::TTraceId()); - } - vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests)); - ++VMultiPutRequests; - ReceivedVMultiPutResponses.push_back(false); - R_LOG_DEBUG_SX(logCtx, "BPG17", "Send multiPut to orderNumber# " << diskOrderNumber << " count# " - << vMultiPut->Record.ItemsSize() << " vMultiPut# " << vMultiPut->ToString()); - outVMultiPuts.push_back(std::move(vMultiPut)); - Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentPutIdx = endIdx; - } +void TGetImpl::PrepareVPuts(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) { + for (auto& put : Blackboard.GroupDiskRequests.PutsPending) { + const TVDiskID vdiskId = Info->GetVDiskId(put.OrderNumber); + Y_DEBUG_ABORT_UNLESS(Info->Type.GetErasure() != TBlobStorageGroupType::ErasureMirror3of4 || + put.Id.PartId() != 3 || put.Buffer.IsEmpty()); + auto vput = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vdiskId, true, nullptr, Deadline, + Blackboard.PutHandleClass); + R_LOG_DEBUG_SX(logCtx, "BPG15", "Send put to orderNumber# " << put.OrderNumber << " vput# " << vput->ToString()); + outVPuts.push_back(std::move(vput)); + ++VPutRequests; } + Blackboard.GroupDiskRequests.PutsPending.clear(); } EStrategyOutcome TGetImpl::RunBoldStrategy(TLogContext &logCtx) { @@ -443,17 +375,6 @@ void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVPutResult & ui32 orderNumber = Info->GetOrderNumber(shortId); const TLogoBlobID blob = LogoBlobIDFromLogoBlobID(record.GetBlobID()); - Y_ABORT_UNLESS(record.HasCookie()); - TBlobCookie cookie(record.GetCookie()); - Y_ABORT_UNLESS(cookie.GetVDiskOrderNumber() == orderNumber); - Y_ABORT_UNLESS(cookie.GetPartId() == blob.PartId()); - - ui64 requestIdx = cookie.GetRequestIdx(); - Y_VERIFY_S(!ReceivedVPutResponses[requestIdx], "the response is received twice" - << " Event# " << ev.ToString() - << " State# " << DumpFullState()); - ReceivedVPutResponses[requestIdx] = true; - const NKikimrProto::EReplyStatus status = record.GetStatus(); ++VPutResponses; switch (status) { @@ -472,50 +393,4 @@ void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVPutResult & Step(logCtx, outVGets, outVPuts, outGetResult); } -void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVMultiPutResult &ev, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts, - TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) { - const NKikimrBlobStorage::TEvVMultiPutResult &record = ev.Record; - Y_ABORT_UNLESS(record.HasVDiskID()); - TVDiskID vdisk = VDiskIDFromVDiskID(record.GetVDiskID()); - TVDiskIdShort shortId(vdisk); - ui32 orderNumber = Info->GetOrderNumber(shortId); - - Y_ABORT_UNLESS(record.HasCookie()); - TVMultiPutCookie cookie(record.GetCookie()); - Y_ABORT_UNLESS(cookie.GetVDiskOrderNumber() == orderNumber); - Y_ABORT_UNLESS(cookie.GetItemCount() == record.ItemsSize()); - - ui64 requestIdx = cookie.GetRequestIdx(); - Y_VERIFY_S(!ReceivedVMultiPutResponses[requestIdx], "the response is received twice" - << " Event# " << ev.ToString() - << " State# " << DumpFullState()); - ReceivedVMultiPutResponses[requestIdx] = true; - - ++VMultiPutResponses; - for (auto &item : record.GetItems()) { - const NKikimrProto::EReplyStatus status = item.GetStatus(); - const TLogoBlobID blob = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - Y_ABORT_UNLESS(item.HasCookie()); - TBlobCookie itemCookie(item.GetCookie()); - Y_ABORT_UNLESS(itemCookie.GetVDiskOrderNumber() == orderNumber); - Y_ABORT_UNLESS(itemCookie.GetPartId() == blob.PartId()); - switch (status) { - case NKikimrProto::ERROR: - case NKikimrProto::VDISK_ERROR_STATE: - case NKikimrProto::OUT_OF_SPACE: - Blackboard.AddErrorResponse(blob, orderNumber); - break; - case NKikimrProto::OK: - case NKikimrProto::ALREADY: - Blackboard.AddPutOkResponse(blob, orderNumber); - break; - default: - Y_ABORT("Unexpected status# %s", NKikimrProto::EReplyStatus_Name(status).data()); - } - } - Step(logCtx, outVGets, outVMultiPuts, outGetResult); -} - }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h index 24ae81b8476..13cb325251c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h @@ -2,7 +2,6 @@ #include "dsproxy.h" #include "dsproxy_blackboard.h" -#include "dsproxy_cookies.h" #include "dsproxy_mon.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <util/generic/set.h> @@ -31,8 +30,6 @@ class TGetImpl { ui32 BlockedGeneration = 0; ui32 VPutRequests = 0; ui32 VPutResponses = 0; - ui32 VMultiPutRequests = 0; - ui32 VMultiPutResponses = 0; bool IsNoData = false; bool IsReplied = false; @@ -43,9 +40,6 @@ class TGetImpl { ui32 RequestIndex = 0; ui32 ResponseIndex = 0; - TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses; - TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses; - const TString RequestPrefix; const bool PhantomCheck; @@ -146,14 +140,6 @@ public: return VPutResponses; } - ui64 GetVMultiPutRequests() const { - return VMultiPutRequests; - } - - ui64 GetVMultiPutResponses() const { - return VMultiPutResponses; - } - ui64 GetRequestIndex() const { return RequestIndex; } @@ -162,12 +148,11 @@ public: return ResponseIndex; } - void GenerateInitialRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets); - template <typename TVPutEvent> void OnVGetResult(TLogContext &logCtx, TEvBlobStorage::TEvVGetResult &ev, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts, + TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts, TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) { const NKikimrBlobStorage::TEvVGetResult &record = ev.Record; Y_ABORT_UNLESS(record.HasStatus()); @@ -195,16 +180,11 @@ public: const NKikimrBlobStorage::TQueryResult &result = record.GetResult(i); Y_ABORT_UNLESS(result.HasStatus()); const NKikimrProto::EReplyStatus replyStatus = result.GetStatus(); - Y_ABORT_UNLESS(result.HasCookie()); - const ui64 cookie = result.GetCookie(); Y_ABORT_UNLESS(result.HasBlobID()); const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(result.GetBlobID()); if (ReportDetailedPartMap) { - Blackboard.ReportPartMapStatus(blobId, - Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[orderNumber].GetsToSend[cookie].PartMapIndex, - ResponseIndex, - replyStatus); + Blackboard.ReportPartMapStatus(blobId, result.GetCookie(), ResponseIndex, replyStatus); } TRope resultBuffer = ev.GetBlobData(result); @@ -266,17 +246,12 @@ public: TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts, TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult); - void OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVMultiPutResult &ev, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts, - TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult); - void PrepareReply(NKikimrProto::EReplyStatus status, TString errorReason, TLogContext &logCtx, TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult); - template <typename TVPutEvent> void AccelerateGet(TLogContext &logCtx, i32 slowDiskOrderNumber, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) { + TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) { TAutoPtr<TEvBlobStorage::TEvGetResult> outGetResult; TBlackboard::EAccelerationMode prevMode = Blackboard.AccelerationMode; Blackboard.AccelerationMode = TBlackboard::AccelerationModeSkipMarked; @@ -294,9 +269,9 @@ public: RequestPrefix.data(), outGetResult->Print(false).c_str(), DumpFullState().c_str()); } - template <typename TVPutEvent> void AcceleratePut(TLogContext &logCtx, i32 slowDiskOrderNumber, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) { + TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) { AccelerateGet(logCtx, slowDiskOrderNumber, outVGets, outVPuts); } @@ -312,9 +287,9 @@ protected: EStrategyOutcome RunStrategies(TLogContext &logCtx); // Returns true if there are additional requests to send - template <typename TVPutEvent> bool Step(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, - TDeque<std::unique_ptr<TVPutEvent>> &outVPuts, TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) { + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts, + TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) { switch (auto outcome = RunStrategies(logCtx)) { case EStrategyOutcome::IN_PROGRESS: { const ui32 numRequests = outVGets.size() + outVPuts.size(); @@ -337,8 +312,6 @@ protected: TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets); void PrepareVPuts(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts); - void PrepareVPuts(TLogContext &logCtx, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts); ui64 GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId); }; //TGetImpl diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 8f67e9ee6fb..5c499eda22a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -2,7 +2,6 @@ #include "dsproxy.h" #include "dsproxy_blackboard.h" -#include "dsproxy_cookies.h" #include "dsproxy_mon.h" #include "dsproxy_strategy_accelerate_put.h" #include "dsproxy_strategy_accelerate_put_m3dc.h" @@ -236,13 +235,8 @@ public: // Group put requests together by VDiskID. std::unordered_multimap<ui32, TDiskPutRequest*> puts; - auto& perDiskRequests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber; - for (ui32 orderNumber = 0, numDisks = perDiskRequests.size(); orderNumber < numDisks; ++orderNumber) { - TDiskRequests& requests = perDiskRequests[orderNumber]; - while (requests.FirstUnsentPutIdx < requests.PutsToSend.size()) { - auto& putRequest = requests.PutsToSend[requests.FirstUnsentPutIdx++]; - puts.emplace(orderNumber, &putRequest); - } + for (auto& put : Blackboard.GroupDiskRequests.PutsPending) { + puts.emplace(put.OrderNumber, &put); } // Generate queries to VDisks. @@ -279,6 +273,7 @@ public: } } + Blackboard.GroupDiskRequests.PutsPending.clear(); return events; } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 50a016384c8..2a7344105c7 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -61,7 +61,7 @@ namespace NKikimr { reqID = Register( CreateBlobStorageGroupGetRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TNodeLayoutInfoPtr(NodeLayoutInfo), - kind, TActivationContext::Now(), StoragePoolCounters, false)); + kind, TActivationContext::Now(), StoragePoolCounters)); ActiveRequests.insert(reqID); } else { Mon->EventMultiGet->Inc(); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h index f196cc92561..c74ea2d3c32 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h @@ -118,12 +118,12 @@ struct TDSProxyEnv { } std::unique_ptr<IActor> CreateGetRequestActor(TEvBlobStorage::TEvGet::TPtr &ev, - NKikimrBlobStorage::EPutHandleClass handleClass, bool withMultiPut) + NKikimrBlobStorage::EPutHandleClass handleClass) { TMaybe<TGroupStat::EKind> kind = PutHandleClassToGroupStatKind(handleClass); return std::unique_ptr<IActor>(CreateBlobStorageGroupGetRequest(Info, GroupQueues, ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TNodeLayoutInfoPtr(NodeLayoutInfo), - kind, TInstant::Now(), StoragePoolCounters, withMultiPut)); + kind, TInstant::Now(), StoragePoolCounters)); } std::unique_ptr<IActor> CreatePatchRequestActor(TEvBlobStorage::TEvPatch::TPtr &ev, bool useVPatch = false) { diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp index 141cca4d851..730ef996b5f 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp @@ -85,8 +85,6 @@ void TestIntervalsAndCrcAllOk(TErasureType::EErasureSpecies erasureSpecies, bool TAutoPtr<TEvBlobStorage::TEvGetResult> getResult; for (ui64 vGetIdx = 0; vGetIdx < vGets.size(); ++vGetIdx) { bool isLast = (vGetIdx == vGets.size() - 1); - auto &request = vGets[vGetIdx]->Record; - Y_ABORT_UNLESS(request.HasCookie()); //ui64 messageCookie = request->Record.GetCookie(); TEvBlobStorage::TEvVGetResult vGetResult; group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -154,11 +152,6 @@ Y_UNIT_TEST(TestMirror32GetBlobCrcCheck) { } class TTestWipedAllOkStep { - enum ETVPutEventKind { - TVPEK_VPUT, - TVPEK_VMULTIPUT - }; - public: struct TVPutInfo { TVDiskID VDiskId; @@ -195,8 +188,6 @@ private: ui64 VPutRequests = 0; ui64 VPutResponses = 0; - ui64 VMultiPutRequests = 0; - ui64 VMultiPutResponses = 0; ui64 RequestIndex = 0; ui64 ResponseIndex = 0; @@ -241,16 +232,12 @@ public: } } - void Run(bool useVMultiPut) { + void Run() { for (ui64 qci = 0; qci < QueryCounts.size(); ++qci) { ui64 queryCount = QueryCounts[qci]; for (ui64 bci = 0; bci < QueryCounts.size(); ++bci) { ui64 blobCount = QueryCounts[bci]; - if (useVMultiPut) { - SubStep<TVPEK_VMULTIPUT>(queryCount, blobCount); - } else { - SubStep<TVPEK_VPUT>(queryCount, blobCount); - } + SubStep(queryCount, blobCount); } } } @@ -274,8 +261,6 @@ private: void ClearCounters() { VPutRequests = 0; VPutResponses = 0; - VMultiPutRequests = 0; - VMultiPutResponses = 0; RequestIndex = 0; ResponseIndex = 0; } @@ -283,8 +268,6 @@ private: void AssertCounters(TGetImpl &getImpl) { UNIT_ASSERT_C(VPutResponses == getImpl.GetVPutResponses() && VPutRequests == getImpl.GetVPutRequests() - && VMultiPutResponses == getImpl.GetVMultiPutResponses() - && VMultiPutRequests == getImpl.GetVMultiPutRequests() && RequestIndex == getImpl.GetRequestIndex() && ResponseIndex == getImpl.GetResponseIndex(), "Not equal expected VPutRequest and VPutResponse with given" @@ -292,10 +275,6 @@ private: << " VPutResponses# " << VPutResponses << " getImpl.VPutRequests# " << getImpl.GetVPutRequests() << " getImpl.VPutResponses# " << getImpl.GetVPutResponses() - << " VMultiPutRequests# " << VMultiPutRequests - << " VMultiPutResponses# " << VMultiPutResponses - << " getImpl.VMultiPutRequests# " << getImpl.GetVMultiPutRequests() - << " getImpl.VMultiPutResponses# " << getImpl.GetVMultiPutResponses() << " RequestIndex# " << RequestIndex << " ResponseIndex# " << ResponseIndex << " getImpl.RequestIndex# " << getImpl.GetRequestIndex() @@ -307,11 +286,9 @@ private: TAutoPtr<TEvBlobStorage::TEvGetResult> &getResult) { for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); auto vdisk = VDiskIDFromVDiskID(putRequest.GetVDiskID()); auto blobId = LogoBlobIDFromLogoBlobID(putRequest.GetBlobID()); - TBlobCookie cookie(putRequest.GetCookie()); - SendVPuts.push_back({vdisk, blobId, cookie.GetBlobIdx()}); + SendVPuts.push_back({vdisk, blobId, 0}); TEvBlobStorage::TEvVPutResult vPutResult; vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest); @@ -332,51 +309,7 @@ private: vPuts.clear(); } - void ProcessVPuts(TLogContext &logCtx, TGetImpl &getImpl, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &vPuts, - TAutoPtr<TEvBlobStorage::TEvGetResult> &getResult) { - for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { - auto &multiPutRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(multiPutRequest.HasCookie()); - - auto vdisk = VDiskIDFromVDiskID(multiPutRequest.GetVDiskID()); - UNIT_ASSERT(multiPutRequest.ItemsSize() <= MaxBatchedPutRequests); - ui64 sendBytes = 0; - for (auto &item : multiPutRequest.GetItems()) { - Y_ABORT_UNLESS(item.HasCookie()); - TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - TString buffer = item.GetBuffer(); - sendBytes += buffer.size(); - TBlobCookie cookie(item.GetCookie()); - SendVPuts.push_back({vdisk, blobId, cookie.GetBlobIdx()}); - } - UNIT_ASSERT(sendBytes <= MaxBatchedPutSize); - - TEvBlobStorage::TEvVMultiPutResult vMultiPutResult; - vMultiPutResult.MakeError(NKikimrProto::OK, TString(), multiPutRequest); - - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> nextVGets; - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> nextVPuts; - getImpl.OnVPutResult(logCtx, vMultiPutResult, - nextVGets, nextVPuts, getResult); - VMultiPutResponses++; - RequestIndex += nextVGets.size(); - VMultiPutRequests += nextVPuts.size(); - AssertCounters(getImpl); - - std::move(nextVGets.begin(), nextVGets.end(), std::back_inserter(vGets)); - std::move(nextVPuts.begin(), nextVPuts.end(), std::back_inserter(vPuts)); - if (getResult) { - break; - } - } - vPuts.clear(); - } - - template <ETVPutEventKind TVPutEventKind> void SubStep(ui64 queryCount, ui64 blobCount) { - using TVPutEvent = std::conditional_t<TVPutEventKind == TVPEK_VPUT, TEvBlobStorage::TEvVPut, - TEvBlobStorage::TEvVMultiPut>; TArrayHolder<TEvBlobStorage::TEvGet::TQuery> queriesA( new TEvBlobStorage::TEvGet::TQuery[MaxQueryCount]); TArrayHolder<TEvBlobStorage::TEvGet::TQuery> queriesB( @@ -394,7 +327,7 @@ private: TGetImpl getImpl(Group->GetInfo(), GroupQueues, &ev, nullptr); ClearCounters(); TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; - TDeque<std::unique_ptr<TVPutEvent>> vPuts; + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; TLogContext logCtx(NKikimrServices::BS_PROXY_GET, false); logCtx.LogAcc.IsLogEnabled = false; getImpl.GenerateInitialRequests(logCtx, vGets); @@ -405,23 +338,17 @@ private: for (ui64 vGetIdx = 0; vGetIdx < vGets.size(); ++vGetIdx) { bool isLast = (vGetIdx == vGets.size() - 1); - auto &request = vGets[vGetIdx]->Record; - Y_ABORT_UNLESS(request.HasCookie()); //ui64 messageCookie = request->Record.GetCookie(); TEvBlobStorage::TEvVGetResult vGetResult; Group->OnVGet(*vGets[vGetIdx], vGetResult); // TODO: generate result TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> nextVGets; - TDeque<std::unique_ptr<TVPutEvent>> nextVPuts; + TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> nextVPuts; getImpl.OnVGetResult(logCtx, vGetResult, nextVGets, nextVPuts, getResult); ResponseIndex++; RequestIndex += nextVGets.size(); - if constexpr (TVPutEventKind == TVPEK_VPUT) { - VPutRequests += nextVPuts.size(); - } else { - VMultiPutRequests += nextVPuts.size(); - } + VPutRequests += nextVPuts.size(); AssertCounters(getImpl); std::move(nextVGets.begin(), nextVGets.end(), std::back_inserter(vGets)); @@ -456,7 +383,7 @@ private: } }; -void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool isVerboseNoDataEnabled, bool multiput) { +void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool isVerboseNoDataEnabled) { TActorSystemStub actorSystemStub; const ui32 groupId = 0; @@ -480,7 +407,7 @@ void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool testStep.Init(); testStep.AddWipedVDisk(wiped1, error1); testStep.AddWipedVDisk(wiped2, error2); - testStep.Run(multiput); + testStep.Run(); } } } @@ -488,61 +415,6 @@ void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool } } -void TestIntervalsWipedAllOkComparisonVMultiPutAndVPut(TErasureType::EErasureSpecies erasureSpecies, - bool isVerboseNoDataEnabled = false) { - TActorSystemStub actorSystemStub; - - const ui32 groupId = 0; - TBlobStorageGroupType groupType(erasureSpecies); - const ui32 domainCount = groupType.BlobSubgroupSize(); - - const TVector<ui64> queryCounts = {1, 2}; - - TVector<TLogoBlobID> blobIDs = { - TLogoBlobID(72075186224047637, 1, 863, 1, 786, 24576), - TLogoBlobID(72075186224047637, 1, 2194, 1, 142, 12288) - }; - - TVector<TBlobTestSet::TBlob> blobs; - for (const auto& id : blobIDs) { - TStringBuilder builder; - for (size_t i = 0; i < id.BlobSize(); ++i) { - builder << 'a'; - } - blobs.emplace_back(id, builder); - } - - for (ui64 wiped1 = 0; wiped1 < domainCount; ++wiped1) { - for (ui64 wiped2 = 0; wiped2 <= wiped1; ++wiped2) { - ui64 maxErrorMask = (wiped1 == wiped2 ? 4 : 24); - for (ui64 errorMask = 0; errorMask <= maxErrorMask; ++errorMask) { - ui64 error1 = errorMask % 5; - ui64 error2 = errorMask / 5; - TTestWipedAllOkStep testStep( - groupId, erasureSpecies, domainCount, queryCounts, - isVerboseNoDataEnabled, true); - testStep.SetBlobs(blobs); - - testStep.Init(); - testStep.AddWipedVDisk(wiped1, error1); - testStep.AddWipedVDisk(wiped2, error2); - testStep.Run(false); - auto sendVPuts = std::move(testStep.SendVPuts); - - testStep.Init(); - testStep.AddWipedVDisk(wiped1, error1); - testStep.AddWipedVDisk(wiped2, error2); - testStep.Run(true); - auto sendVMultiPuts = std::move(testStep.SendVPuts); - - Sort(sendVPuts.begin(), sendVPuts.end()); - Sort(sendVMultiPuts.begin(), sendVMultiPuts.end()); - UNIT_ASSERT(sendVPuts == sendVMultiPuts); - } - } - } -} - class TGetSimulator { TGroupMock Group; TIntrusivePtr<TGroupQueues> GroupQueues; @@ -590,8 +462,6 @@ public: for (ui64 vGetIdx = 0; vGetIdx < vGets.size(); ++vGetIdx) { bool isLast = (vGetIdx == vGets.size() - 1); - auto &request = vGets[vGetIdx]->Record; - Y_ABORT_UNLESS(request.HasCookie()); TEvBlobStorage::TEvVGetResult vGetResult; Group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -610,7 +480,6 @@ public: } for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); TEvBlobStorage::TEvVPutResult vPutResult; vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest); @@ -707,9 +576,7 @@ Y_UNIT_TEST(TestBlock42VGetCountWithErasure) { continue; } bool isLast = (vGetIdx == vGets.size() - 1); - auto &request = vGets[vGetIdx]->Record; - Y_ABORT_UNLESS(request.HasCookie()); TEvBlobStorage::TEvVGetResult vGetResult; group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -730,7 +597,6 @@ Y_UNIT_TEST(TestBlock42VGetCountWithErasure) { } for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); TEvBlobStorage::TEvVPutResult vPutResult; vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest); @@ -854,9 +720,7 @@ Y_UNIT_TEST(TestBlock42WipedOneDiskAndErrorDurringGet) { group.SetPredictedDelayNs(7, 1); } bool isLast = (vGetIdx == vGets.size() - 1); - auto &request = vGets[vGetIdx]->Record; - Y_ABORT_UNLESS(request.HasCookie()); TEvBlobStorage::TEvVGetResult vGetResult; group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -876,7 +740,6 @@ Y_UNIT_TEST(TestBlock42WipedOneDiskAndErrorDurringGet) { } for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); TEvBlobStorage::TEvVPutResult vPutResult; vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest); @@ -1123,13 +986,11 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo std::swap(vGets[vGetIdx], vGets[vGetIdx + needIdx]); bool isLast = (vGetIdx == vGets.size() - 1); - auto &request = vGets[vGetIdx]->Record; if ((ui32)errorIteration == vGetIdx) { group.SetError(errorDisk, NKikimrProto::ERROR); } - Y_ABORT_UNLESS(request.HasCookie()); TEvBlobStorage::TEvVGetResult vGetResult; group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -1150,7 +1011,6 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo } for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); TEvBlobStorage::TEvVPutResult vPutResult; vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest); @@ -1201,15 +1061,7 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo } Y_UNIT_TEST(TestBlock42GetIntervalsWipedAllOk) { - TestIntervalsWipedAllOk(TErasureType::Erasure4Plus2Block, false, false); -} - -Y_UNIT_TEST(TestBlock42GetIntervalsWipedAllOkVMultiPut) { - TestIntervalsWipedAllOk(TErasureType::Erasure4Plus2Block, false, true); -} - -Y_UNIT_TEST(TestBlock42GetIntervalsWipedAllOkComparisonVMultiPutAndVPut) { - TestIntervalsWipedAllOkComparisonVMultiPutAndVPut(TErasureType::Erasure4Plus2Block); + TestIntervalsWipedAllOk(TErasureType::Erasure4Plus2Block, false); } Y_UNIT_TEST(TestBlock42GetIntervalsWipedError) { @@ -1221,15 +1073,7 @@ Y_UNIT_TEST(TestBlock42WipedErrorWithTwoBlobs) { } Y_UNIT_TEST(TestMirror32GetIntervalsWipedAllOk) { - TestIntervalsWipedAllOk(TErasureType::ErasureMirror3Plus2, false, false); -} - -Y_UNIT_TEST(TestMirror32GetIntervalsWipedAllOkVMultiPut) { - TestIntervalsWipedAllOk(TErasureType::ErasureMirror3Plus2, false, true); -} - -Y_UNIT_TEST(TestMirror32GetIntervalsWipedAllOkComparisonVMultiPutAndVPut) { - TestIntervalsWipedAllOkComparisonVMultiPutAndVPut(TErasureType::ErasureMirror3Plus2); + TestIntervalsWipedAllOk(TErasureType::ErasureMirror3Plus2, false); } void SpecificTest(ui32 badA, ui32 badB, ui32 blobSize, TMap<i64, i64> sizeForOffset) { @@ -1431,7 +1275,6 @@ public: auto &request = vGets[vGetIdx]->Record; VERBOSE("vGetIdx# " << vGetIdx); VERBOSE("Send TEvVGet to VDiskID# " << VDiskIDFromVDiskID(request.GetVDiskID())); - Y_ABORT_UNLESS(request.HasCookie()); //ui64 messageCookie = request->Record.GetCookie(); TEvBlobStorage::TEvVGetResult vGetResult; Group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -1447,7 +1290,6 @@ public: } for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); TEvBlobStorage::TEvVPutResult vPutResult; vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest); @@ -1643,7 +1485,6 @@ public: auto &request = vGets[vGetIdx]->Record; VERBOSE("vGetIdx# " << vGetIdx << " request# " << vDIdx << " to domainIdx# " << domainIdx); VERBOSE("Send TEvVGet to VDiskID# " << VDiskIDFromVDiskID(request.GetVDiskID())); - Y_ABORT_UNLESS(request.HasCookie()); //ui64 messageCookie = request->Record.GetCookie(); TEvBlobStorage::TEvVGetResult vGetResult; Group.OnVGet(*vGets[vGetIdx], vGetResult); @@ -1684,7 +1525,6 @@ public: if (!getResult) { for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { auto &putRequest = vPuts[vPutIdx]->Record; - Y_ABORT_UNLESS(putRequest.HasCookie()); TEvBlobStorage::TEvVPutResult vPutResult; if (Mode == ReadAndWriteErrors && vPutIdx == 0) { vPutResult.MakeError(NKikimrProto::ERROR, TString(), putRequest); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp index 64b8725e2d1..34bed8826bc 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp @@ -644,92 +644,6 @@ Y_UNIT_TEST(TestGivenBlock42GroupGenerationGreaterThanVDiskGenerations) { testState.GrabEventPtr<TEvRequestEnd>(); } -void MakeTestGivenBlock42GetRecoverMultiPutStatuses(NKikimrProto::EReplyStatus expectedStatus, - const TVector<NKikimrProto::EReplyStatus> &statuses = {}) { - Y_UNUSED(expectedStatus); - TBlobStorageGroupType type = {TErasureType::Erasure4Plus2Block}; - TTestBasicRuntime runtime(1, false); - Setup(runtime, type); - - constexpr ui64 blobCount = 2; - TVector<TLogoBlobID> blobIds = { - TLogoBlobID(72075186224047637, 1, 863, 1, 786, 24576), - TLogoBlobID(72075186224047637, 1, 2194, 1, 142, 12288) - }; - Y_ABORT_UNLESS(blobIds.size() == blobCount); - Y_ABORT_UNLESS(statuses.empty() || statuses.size() == blobCount); - - TVector<TBlobTestSet::TBlob> blobs; - for (const auto& id : blobIds) { - TStringBuilder builder; - for (size_t i = 0; i < id.BlobSize(); ++i) { - builder << 'a'; - } - blobs.emplace_back(id, builder); - } - - TTestState testState(runtime, type, DSProxyEnv.Info); - - TGroupMock &groupMock = testState.GetGroupMock(); - testState.PutBlobsToGroupMock(blobs); - groupMock.Wipe(3); - groupMock.Wipe(4); - - TEvBlobStorage::TEvGet::TPtr ev = testState.CreateGetRequest(blobIds, true); - runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog, true).release()); - - testState.HandleVGetsWithMock(type.BlobSubgroupSize()); - - if (statuses.empty()) { - groupMock.SetError(3, expectedStatus); - groupMock.SetError(4, expectedStatus); - } else { - TMap<TPartLocation, NKikimrProto::EReplyStatus> specialStatuses; - for (ui64 idx = 0; idx < blobIds.size(); ++idx) { - for (ui64 part = 3; part <= 4; ++part) { - TLogoBlobID partBlobId(blobIds[idx], part - 2 * idx); - TPartLocation id = testState.PrimaryVDiskForBlobPart(partBlobId); - specialStatuses[id] = statuses[idx]; - } - } - groupMock.SetSpecialStatuses(specialStatuses); - } - - testState.HandleVMultiPutsWithMock(3); - - TAutoPtr<IEventHandle> handle; - auto getResult = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvGetResult>(handle); - UNIT_ASSERT(getResult); - UNIT_ASSERT(getResult->Status == expectedStatus); -} - -Y_UNIT_TEST(TestGivenBlock42GetRecoverMultiPutStatuses) { - constexpr ui64 statusCount = 3; - NKikimrProto::EReplyStatus maybeStatuses[statusCount] = { - NKikimrProto::OK, - NKikimrProto::BLOCKED, - NKikimrProto::DEADLINE - }; - Y_ABORT_UNLESS(maybeStatuses[statusCount - 1] == NKikimrProto::DEADLINE); - for (ui64 idx = 0; idx < statusCount; ++idx) { - MakeTestGivenBlock42GetRecoverMultiPutStatuses(maybeStatuses[idx]); - } -} - -Y_UNIT_TEST(TestGivenBlock42GetRecoverMultiPut2ItemsStatuses) { - constexpr ui64 statusCount = 3; - NKikimrProto::EReplyStatus maybeStatuses[statusCount] = { - NKikimrProto::OK, - NKikimrProto::BLOCKED, - NKikimrProto::DEADLINE - }; - Y_ABORT_UNLESS(maybeStatuses[statusCount - 1] == NKikimrProto::DEADLINE); - for (ui64 idx = 1; idx < statusCount; ++idx) { - MakeTestGivenBlock42GetRecoverMultiPutStatuses(maybeStatuses[idx], {maybeStatuses[idx], maybeStatuses[0]}); - MakeTestGivenBlock42GetRecoverMultiPutStatuses(maybeStatuses[idx], {maybeStatuses[0], maybeStatuses[idx]}); - } -} - Y_UNIT_TEST(TestGivenMirror3DCGetWithFirstSlowDisk) { TBlobStorageGroupType type = {TErasureType::ErasureMirror3dc}; TTestBasicRuntime runtime(1, false); @@ -741,7 +655,7 @@ Y_UNIT_TEST(TestGivenMirror3DCGetWithFirstSlowDisk) { TEvBlobStorage::TEvGet::TPtr ev = testState.CreateGetRequest({blobId}, false); - TActorId getActorId = runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog, false).release()); + TActorId getActorId = runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog).release()); runtime.EnableScheduleForActor(getActorId); testState.GrabEventPtr<TEvBlobStorage::TEvVGet>(); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h index e4058925975..2eac0a80e84 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h @@ -99,7 +99,6 @@ public: void OnVGet(const TEvBlobStorage::TEvVGet &vGet, TEvBlobStorage::TEvVGetResult &outVGetResult) { auto &request = vGet.Record; - Y_ABORT_UNLESS(request.HasCookie()); if (IsError) { outVGetResult.MakeError(Status, TString(), request); return; diff --git a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp index 17ba3f7a16b..8b77cc4ee6b 100644 --- a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp @@ -43,50 +43,48 @@ public: } void ProcessBlackboardRequests(TBlackboard& blackboard) { - for (ui32 i = 0; i < blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); ++i) { - auto& r = blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[i]; - Y_ABORT_UNLESS(i < DiskStates.size()); - auto& disk = DiskStates[i]; - for (auto& get : r.GetsToSend) { - Ctest << "orderNumber# " << i << " get Id# " << get.Id; - if (disk.InErrorState) { - Ctest << " ERROR"; - blackboard.AddErrorResponse(get.Id, i); - } else if (auto it = disk.Blobs.find(get.Id); it == disk.Blobs.end()) { - Ctest << " NODATA"; - blackboard.AddNoDataResponse(get.Id, i); - } else { - std::visit(TOverloaded{ - [&](TNotYet&) { - Ctest << " NOT_YET"; - blackboard.AddNotYetResponse(get.Id, i); - }, - [&](TRope& buffer) { - Ctest << " OK"; - size_t begin = Min<size_t>(get.Shift, buffer.size()); - size_t end = Min<size_t>(buffer.size(), begin + get.Size); - TRope data(buffer.begin() + begin, buffer.begin() + end); - blackboard.AddResponseData(get.Id, i, get.Shift, std::move(data)); - } - }, it->second); - } - Ctest << Endl; + for (auto& get : blackboard.GroupDiskRequests.GetsPending) { + auto& disk = DiskStates[get.OrderNumber]; + Ctest << "orderNumber# " << get.OrderNumber << " get Id# " << get.Id; + if (disk.InErrorState) { + Ctest << " ERROR"; + blackboard.AddErrorResponse(get.Id, get.OrderNumber); + } else if (auto it = disk.Blobs.find(get.Id); it == disk.Blobs.end()) { + Ctest << " NODATA"; + blackboard.AddNoDataResponse(get.Id, get.OrderNumber); + } else { + std::visit(TOverloaded{ + [&](TNotYet&) { + Ctest << " NOT_YET"; + blackboard.AddNotYetResponse(get.Id, get.OrderNumber); + }, + [&](TRope& buffer) { + Ctest << " OK"; + size_t begin = Min<size_t>(get.Shift, buffer.size()); + size_t end = Min<size_t>(buffer.size(), begin + get.Size); + TRope data(buffer.begin() + begin, buffer.begin() + end); + blackboard.AddResponseData(get.Id, get.OrderNumber, get.Shift, std::move(data)); + } + }, it->second); } - r.GetsToSend.clear(); - for (auto& put : r.PutsToSend) { - Ctest << "orderNumber# " << i << " put Id# " << put.Id; - if (disk.InErrorState) { - Ctest << " ERROR"; - blackboard.AddErrorResponse(put.Id, i); - } else { - Ctest << " OK"; - disk.Blobs[put.Id] = std::move(put.Buffer); - blackboard.AddPutOkResponse(put.Id, i); - } - Ctest << Endl; + Ctest << Endl; + } + blackboard.GroupDiskRequests.GetsPending.clear(); + + for (auto& put : blackboard.GroupDiskRequests.PutsPending) { + auto& disk = DiskStates[put.OrderNumber]; + Ctest << "orderNumber# " << put.OrderNumber << " put Id# " << put.Id; + if (disk.InErrorState) { + Ctest << " ERROR"; + blackboard.AddErrorResponse(put.Id, put.OrderNumber); + } else { + Ctest << " OK"; + disk.Blobs[put.Id] = std::move(put.Buffer); + blackboard.AddPutOkResponse(put.Id, put.OrderNumber); } - r.PutsToSend.clear(); + Ctest << Endl; } + blackboard.GroupDiskRequests.PutsPending.clear(); } }; @@ -223,21 +221,18 @@ void RunTestLevel(const TBlobStorageGroupInfo& info, TBlackboard& blackboard, std::set<TOperation>& context, ui32& terminals) { // see which operations we can add to the stock const size_t stockSizeOnEntry = stock.size(); - auto& requests = blackboard.GroupDiskRequests.DiskRequestsForOrderNumber; - for (ui32 i = 0; i < info.GetTotalVDisksNum(); ++i) { - for (auto& j = requests[i].FirstUnsentRequestIdx; j < requests[i].GetsToSend.size(); ++j) { - auto& get = requests[i].GetsToSend[j]; - stock.push_back(TGetQuery{i, get.Id, get.Shift, get.Size}); - const bool inserted = context.insert(stock.back()).second; - UNIT_ASSERT(inserted); - } - for (auto& j = requests[i].FirstUnsentPutIdx; j < requests[i].PutsToSend.size(); ++j) { - auto& put = requests[i].PutsToSend[j]; - stock.push_back(TPutQuery{i, put.Id}); - const bool inserted = context.insert(stock.back()).second; - UNIT_ASSERT(inserted); - } + for (auto& get : blackboard.GroupDiskRequests.GetsPending) { + stock.push_back(TGetQuery{get.OrderNumber, get.Id, get.Shift, get.Size}); + const bool inserted = context.insert(stock.back()).second; + UNIT_ASSERT(inserted); + } + blackboard.GroupDiskRequests.GetsPending.clear(); + for (auto& put : blackboard.GroupDiskRequests.PutsPending) { + stock.push_back(TPutQuery{put.OrderNumber, put.Id}); + const bool inserted = context.insert(stock.back()).second; + UNIT_ASSERT(inserted); } + blackboard.GroupDiskRequests.PutsPending.clear(); UNIT_ASSERT(!stock.empty()); bool canIssuePuts = true; diff --git a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp index ed4c397db8c..e9de1c0fab9 100644 --- a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp @@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } - void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType, ui32 alternative = 0) + void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType) { TEnvironmentSetup env(true, groupType); auto& runtime = env.Runtime; @@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { SendPut(test, originalBlobId3, data, NKikimrProto::OK); finishEventsCount = test.Runtime->GetEventsProcessed(); - UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, alternative ? alternative : eventsCount); + UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, eventsCount); } else if (typeOperation == "get") { TLogoBlobID originalBlobId(tabletId, 1, 0, 0, size, 0); NormalizePredictedDelays(queues); @@ -164,7 +164,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } Y_UNIT_TEST(Put_Mirror3of4) { - CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4, 114); + CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4); } Y_UNIT_TEST(Put_Mirror3dc) { |