aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2024-01-25 14:13:50 +0300
committerGitHub <noreply@github.com>2024-01-25 14:13:50 +0300
commitff65c22f774717ca02ca65f03cac5b7e16a9d82b (patch)
tree2128e356084ef75590101c7df4fd486fd76595c3
parent7eb75fa787d8f4813a7bba7029086c5cb2c3cdaa (diff)
downloadydb-ff65c22f774717ca02ca65f03cac5b7e16a9d82b.tar.gz
Refactor GroupDiskRequests and remove unused stuff (#1272)
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h7
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp23
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h33
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_cookies.h155
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get.cpp156
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp205
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h45
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_request.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h4
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp180
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp88
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h1
-rw-r--r--ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp105
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/counting_events.cpp6
15 files changed, 169 insertions, 852 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index ad43358043d..9160d88de59 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -405,8 +405,7 @@ public:
void SendToQueues(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, bool timeStatsEnabled) {
for (auto& request : vGets) {
- Y_ABORT_UNLESS(request->Record.HasCookie());
- ui64 messageCookie = request->Record.GetCookie();
+ const ui64 messageCookie = request->Record.GetCookie();
CountEvent(*request);
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
request->Record.MutableTimestamps()->SetSentByDSProxyUs(GetCycleCountFast() / cyclesPerUs);
@@ -442,7 +441,6 @@ public:
template <typename TEvent>
void SendToQueues(TDeque<std::unique_ptr<TEvent>> &events, bool timeStatsEnabled) {
for (auto& request : events) {
- Y_ABORT_UNLESS(request->Record.HasCookie());
ui64 messageCookie = request->Record.GetCookie();
CountEvent(*request);
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
@@ -650,8 +648,7 @@ IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupIn
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev,
ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout,
- TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
- bool isVMultiPutMode);
+ TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
index 9b49630b4f8..97706a1b6f1 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
@@ -293,28 +293,19 @@ TString TBlobState::TWholeState::ToString() const {
// TGroupDiskRequests
//
-TGroupDiskRequests::TGroupDiskRequests(ui32 disks) {
- DiskRequestsForOrderNumber.resize(disks);
-}
-
-void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet) {
- Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size());
- auto &requestsToSend = DiskRequestsForOrderNumber[diskOrderNumber].GetsToSend;
- for (auto pair: intervalSet) {
- requestsToSend.emplace_back(id, pair.first, pair.second - pair.first);
+void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet) {
+ for (const auto& pair : intervalSet) {
+ GetsPending.emplace_back(diskOrderNumber, id, pair.first, pair.second - pair.first);
}
}
-void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift,
- const ui32 size) {
- Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size());
- DiskRequestsForOrderNumber[diskOrderNumber].GetsToSend.emplace_back(id, shift, size);
+void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size) {
+ GetsPending.emplace_back(diskOrderNumber, id, shift, size);
}
-void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
+void TGroupDiskRequests::AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) {
- Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size());
- DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, blobIdx);
+ PutsPending.emplace_back(diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
index 8083e17fd41..c128a013e50 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
@@ -108,13 +108,14 @@ struct TBlobState {
};
struct TDiskGetRequest {
+ ui32 OrderNumber;
TLogoBlobID Id;
ui32 Shift;
ui32 Size;
- ssize_t PartMapIndex = -1;
- TDiskGetRequest(const TLogoBlobID &id, const ui32 shift, const ui32 size)
- : Id(id)
+ TDiskGetRequest(ui32 orderNumber, const TLogoBlobID &id, ui32 shift, ui32 size)
+ : OrderNumber(orderNumber)
+ , Id(id)
, Shift(shift)
, Size(size)
{}
@@ -127,14 +128,16 @@ struct TDiskPutRequest {
ReasonInitial,
ReasonAccelerate
};
+ ui32 OrderNumber;
TLogoBlobID Id;
TRope Buffer;
EPutReason Reason;
bool IsHandoff;
ui8 BlobIdx;
- TDiskPutRequest(const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx)
- : Id(id)
+ TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx)
+ : OrderNumber(orderNumber)
+ , Id(id)
, Buffer(std::move(buffer))
, Reason(reason)
, IsHandoff(isHandoff)
@@ -142,20 +145,13 @@ struct TDiskPutRequest {
{}
};
-struct TDiskRequests {
- TDeque<TDiskGetRequest> GetsToSend;
- TStackVec<TDiskPutRequest, TypicalPartsInBlob> PutsToSend;
- ui32 FirstUnsentRequestIdx = 0;
- ui32 FirstUnsentPutIdx = 0;
-};
-
struct TGroupDiskRequests {
- TStackVec<TDiskRequests, TypicalDisksInGroup> DiskRequestsForOrderNumber;
+ std::vector<TDiskGetRequest> GetsPending;
+ std::vector<TDiskPutRequest> PutsPending;
- TGroupDiskRequests(ui32 disks);
- void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
- void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size);
- void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
+ void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
+ void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size);
+ void AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx);
};
@@ -188,8 +184,7 @@ struct TBlackboard {
TBlackboard(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass,
bool isAllRequestsTogether = true)
- : GroupDiskRequests(info->GetTotalVDisksNum())
- , Info(info)
+ : Info(info)
, GroupQueues(groupQueues)
, AccelerationMode(AccelerationModeSkipOneSlowest)
, PutHandleClass(putHandleClass)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_cookies.h b/ydb/core/blobstorage/dsproxy/dsproxy_cookies.h
deleted file mode 100644
index d9eb26f9483..00000000000
--- a/ydb/core/blobstorage/dsproxy/dsproxy_cookies.h
+++ /dev/null
@@ -1,155 +0,0 @@
-#pragma once
-
-#include "dsproxy.h"
-
-namespace NKikimr {
-
-struct TVGetCookie {
- ui64 Raw;
- // [0, 27] 28bit queryBeginIdx
- // [28, 55] 28bit queryEndIdx
- // [56, 63] 8bit causeIdx
-
- TVGetCookie(ui64 raw)
- : Raw(raw)
- {}
-
- TVGetCookie(ui64 queryBeginIdx, ui64 queryEndIdx)
- : Raw(queryBeginIdx | (queryEndIdx << 28))
- {
- Y_ABORT_UNLESS(queryBeginIdx < (1LL << 28));
- Y_ABORT_UNLESS(queryEndIdx < (1LL << 28));
- }
-
- ui64 GetQueryBeginIdx() const {
- return Raw & 0xFFFFFFF;
- }
-
- ui8 GetQueryEndIdx() const {
- return (Raw >> 28) & 0xFFFFFFF;
- }
-
- ui64 GetCauseIdx() const {
- return (Raw >> 56) & 0xFF;
- }
-
- void SetCauseIdx(ui64 causeIdx) {
- // It's a debug feature support, it's a bad idea to VERIFY here
- if (causeIdx > 255) {
- causeIdx = 255;
- }
- ui64 prevCauseIdx = GetCauseIdx();
- Raw = Raw ^ ((causeIdx ^ prevCauseIdx) << 56);
- }
-
- operator ui64 () const {
- return Raw;
- }
-};
-
-struct TBlobCookie {
- ui64 Raw;
- // [0, 7 ] 8bit vDiskOrderNumber
- // [8, 15] 8bit partId .
- // [16, 31] 16bit blobIdx
- // [32, 55] 24bit requestIdx
- // [56, 63] 8bit causeIdx
-
- TBlobCookie(ui64 raw)
- : Raw(raw)
- {}
-
- TBlobCookie(ui64 vDiskOrderNumber, ui64 blobIdx, ui64 partId, ui64 requestIdx)
- : Raw(vDiskOrderNumber | (partId << 8) | (blobIdx << 16) | (requestIdx << 32))
- {
- Y_ABORT_UNLESS(vDiskOrderNumber < 256);
- Y_ABORT_UNLESS(blobIdx < (1LL << 16));
- Y_ABORT_UNLESS(partId < 256);
- Y_ABORT_UNLESS(requestIdx < (1LL << 24));
- }
-
- ui64 GetVDiskOrderNumber() const {
- return Raw & 0xFF;
- }
-
- ui8 GetPartId() const {
- return (Raw >> 8) & 0xFF;
- }
-
- ui64 GetBlobIdx() const {
- return (Raw >> 16) & 0xFFFF;
- }
-
- ui64 GetCauseIdx() const {
- return (Raw >> 56) & 0xFF;
- }
-
- void SetCauseIdx(ui64 causeIdx) {
- // It's a debug feature support, it's a bad idea to VERIFY here
- if (causeIdx > 255) {
- causeIdx = 255;
- }
- ui64 prevCauseIdx = GetCauseIdx();
- Raw = Raw ^ ((causeIdx ^ prevCauseIdx) << 56);
- }
-
- ui64 GetRequestIdx() const {
- return (Raw >> 32) & 0xFFFFFF;
- }
-
- operator ui64 () const {
- return Raw;
- }
-};
-
-struct TVMultiPutCookie {
- ui64 Raw;
- // [0, 7 ] 8bit vDiskOrderNumber
- // [8, 15] 8bit itemCount
- // [16, 31] 16bit ---
- // [32, 55] 24bit requestIdx
- // [56, 63] 8bit causeIdx
-
- TVMultiPutCookie(ui64 raw)
- : Raw(raw)
- {}
-
- TVMultiPutCookie(ui64 vDiskOrderNumber, ui64 itemCount, ui64 requestIdx)
- : Raw(vDiskOrderNumber | (itemCount << 8) | (requestIdx << 32))
- {
- Y_ABORT_UNLESS(vDiskOrderNumber < 256);
- Y_ABORT_UNLESS(itemCount < 256);
- Y_ABORT_UNLESS(requestIdx < (1LL << 24));
- }
-
- ui64 GetVDiskOrderNumber() const {
- return Raw & 0xFF;
- }
-
- ui64 GetItemCount() const {
- return (Raw >> 8) & 0xFF;
- }
-
- ui64 GetCauseIdx() const {
- return (Raw >> 56) & 0xFF;
- }
-
- void SetCauseIdx(ui64 causeIdx) {
- // It's a debug feature support, it's a bad idea to VERIFY here
- if (causeIdx > 255) {
- causeIdx = 255;
- }
- ui64 prevCauseIdx = GetCauseIdx();
- Raw = Raw ^ ((causeIdx ^ prevCauseIdx) << 56);
- }
-
- ui64 GetRequestIdx() const {
- return (Raw >> 32) & 0xFFFFFF;
- }
-
- operator ui64 () const {
- return Raw;
- }
-};
-
-}//NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
index 56161a130b9..918d31812bf 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
@@ -56,8 +56,6 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
bool IsPutAccelerated = false;
bool IsPutAccelerateScheduled = false;
- const bool IsVMultiPutMode = false;
-
void Handle(TEvAccelerateGet::TPtr &ev) {
RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx);
AccelerateGet();
@@ -75,19 +73,11 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
IsGetAccelerated = true;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
- if (IsVMultiPutMode) {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
- GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vMultiPuts);
- *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size();
- *Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
- SendVGetsAndVPuts(vGets, vMultiPuts);
- } else {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
- GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts);
- *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
- *Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
- SendVGetsAndVPuts(vGets, vPuts);
- }
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
+ GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts);
+ *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
+ *Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
+ SendVGetsAndVPuts(vGets, vPuts);
}
void AcceleratePut() {
@@ -97,23 +87,15 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
IsPutAccelerated = true;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
- if (IsVMultiPutMode) {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
- GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vMultiPuts);
- *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size();
- *Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
- SendVGetsAndVPuts(vGets, vMultiPuts);
- } else {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
- GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts);
- *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
- *Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
- SendVGetsAndVPuts(vGets, vPuts);
- }
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
+ GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts);
+ *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
+ *Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
+ SendVGetsAndVPuts(vGets, vPuts);
}
- template <typename TPutEvent>
- void SendVGetsAndVPuts(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, TDeque<std::unique_ptr<TPutEvent>> &vPuts) {
+ void SendVGetsAndVPuts(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets,
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &vPuts) {
ReportBytes(GetImpl.GrabBytesToReport());
RequestsSent += vGets.size();
RequestsSent += vPuts.size();
@@ -125,11 +107,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
}
for (size_t i = 0; i < vGets.size(); ++i) {
- Y_ABORT_UNLESS(vGets[i]->Record.HasCookie());
- TVGetCookie cookie(vGets[i]->Record.GetCookie());
if (RootCauseTrack.IsOn) {
- cookie.SetCauseIdx(RootCauseTrack.RegisterCause());
- vGets[i]->Record.SetCookie(cookie);
+ vGets[i]->Record.SetCookie(RootCauseTrack.RegisterCause());
}
Y_ABORT_UNLESS(vGets[i]->Record.HasVDiskID());
TVDiskID vDiskId = VDiskIDFromVDiskID(vGets[i]->Record.GetVDiskID());
@@ -141,11 +120,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
DiskCounters[orderNumber].Sent++;
}
for (size_t i = 0; i < vPuts.size(); ++i) {
- Y_ABORT_UNLESS(vPuts[i]->Record.HasCookie());
- TBlobCookie cookie(vPuts[i]->Record.GetCookie());
if (RootCauseTrack.IsOn) {
- cookie.SetCauseIdx(RootCauseTrack.RegisterCause());
- vPuts[i]->Record.SetCookie(cookie);
+ vPuts[i]->Record.SetCookie(RootCauseTrack.RegisterCause());
}
Y_ABORT_UNLESS(vPuts[i]->Record.HasVDiskID());
TVDiskID vDiskId = VDiskIDFromVDiskID(vPuts[i]->Record.GetVDiskID());
@@ -216,10 +192,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()),
NKikimrProto::EReplyStatus_Name(record.GetStatus()));
- Y_ABORT_UNLESS(record.HasCookie());
- TVGetCookie cookie(record.GetCookie());
- if (RootCauseTrack.IsOn) {
- RootCauseTrack.OnReply(cookie.GetCauseIdx(),
+ if (RootCauseTrack.IsOn && record.HasCookie()) {
+ RootCauseTrack.OnReply(record.GetCookie(),
GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
GetVDiskTimeMs(record.GetTimestamps()));
}
@@ -233,15 +207,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TAutoPtr<TEvBlobStorage::TEvGetResult> getResult;
ResponsesReceived++;
- if (IsVMultiPutMode) {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts;
- GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vMultiPuts, getResult);
- SendVGetsAndVPuts(vGets, vMultiPuts);
- } else {
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
- GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vPuts, getResult);
- SendVGetsAndVPuts(vGets, vPuts);
- }
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
+ GetImpl.OnVGetResult(LogCtx, *ev->Get(), vGets, vPuts, getResult);
+ SendVGetsAndVPuts(vGets, vPuts);
if (getResult) {
SendReplyAndDie(getResult);
@@ -274,77 +242,16 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
return LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID());
}
- TLogoBlobID GetFirstBlobId(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) {
- Y_ABORT_UNLESS(ev->Get()->Record.ItemsSize());
- return LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetItems(0).GetBlobID());
- }
-
ui64 SumBlobSize(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
return GetFirstBlobId(ev).BlobSize();
}
- ui64 SumBlobSize(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) {
- ui64 sum = 0;
- for (auto &item : ev->Get()->Record.GetItems()) {
- TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID());
- sum += blobId.BlobSize();
- }
- return sum;
- }
-
void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
ProcessReplyFromQueue(ev);
- HandleVPutResult<TEvBlobStorage::TEvVPut, TEvBlobStorage::TEvVPutResult>(ev);
- }
-
- void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) {
- ProcessReplyFromQueue(ev);
- HandleVPutResult<TEvBlobStorage::TEvVMultiPut, TEvBlobStorage::TEvVMultiPutResult>(ev);
+ HandleVPutResult(ev);
}
- bool HandleVPutInnerErrorStatuses(TEvBlobStorage::TEvVPutResult::TPtr &ev,
- TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult)
- {
- Y_UNUSED(ev, outGetResult);
- return false;
- }
-
- bool HandleVPutInnerErrorStatuses(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev,
- TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult)
- {
- const auto &record = ev->Get()->Record;
- const NKikimrProto::EReplyStatus status = record.GetStatus();
- const TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID());
-
- for (auto &item : record.GetItems()) {
- Y_ABORT_UNLESS(item.HasStatus());
- Y_ABORT_UNLESS(item.HasBlobID());
- Y_ABORT_UNLESS(item.HasCookie());
- NKikimrProto::EReplyStatus itemStatus = item.GetStatus();
- TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID());
- Y_ABORT_UNLESS(itemStatus != NKikimrProto::RACE); // we should get RACE for the whole request and handle it in CheckForTermErrors
- if (itemStatus == NKikimrProto::BLOCKED || itemStatus == NKikimrProto::DEADLINE) {
- R_LOG_ERROR_S("BPG26", "Handle TEvVMultiPutResultItem"
- << " blobId# " << blobId.ToString()
- << " status# " << NKikimrProto::EReplyStatus_Name(status)
- << " itemStatus# " << NKikimrProto::EReplyStatus_Name(itemStatus));
- TStringStream errorReason;
- errorReason << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vDiskId;
- ErrorReason = errorReason.Str();
- GetImpl.PrepareReply(itemStatus, ErrorReason, LogCtx, outGetResult);
- return true;
- } else {
- R_LOG_DEBUG_S("BPG27", "Handle TEvVMultiPutResultItem"
- << " blobId# " << blobId.ToString()
- << " status# " << NKikimrProto::EReplyStatus_Name(status)
- << " itemStatus# " << NKikimrProto::EReplyStatus_Name(itemStatus));
- }
- }
- return false;
- }
-
- template <typename TPutEvent, typename TPutEventResult>
- void HandleVPutResult(typename TPutEventResult::TPtr &ev) {
+ void HandleVPutResult(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
Y_ABORT_UNLESS(ev->Get()->Record.HasStatus());
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
@@ -369,10 +276,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
NKikimrBlobStorage::EPutHandleClass_Name(GetImpl.GetPutHandleClass()),
NKikimrProto::EReplyStatus_Name(status));
- Y_ABORT_UNLESS(record.HasCookie());
- TBlobCookie cookie(record.GetCookie());
- if (RootCauseTrack.IsOn) {
- RootCauseTrack.OnReply(cookie.GetCauseIdx(),
+ if (RootCauseTrack.IsOn && record.HasCookie()) {
+ RootCauseTrack.OnReply(record.GetCookie(),
GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
GetVDiskTimeMs(record.GetTimestamps()));
}
@@ -384,15 +289,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
DiskCounters[orderNumber].Received++;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
- TDeque<std::unique_ptr<TPutEvent>> vPuts;
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
TAutoPtr<TEvBlobStorage::TEvGetResult> getResult;
ResponsesReceived++;
- if (HandleVPutInnerErrorStatuses(ev, getResult)) {
- Y_ABORT_UNLESS(getResult);
- SendReplyAndDie(getResult);
- return;
- }
GetImpl.OnVPutResult(LogCtx, *ev->Get(), vGets, vPuts, getResult);
SendVGetsAndVPuts(vGets, vPuts);
if (getResult) {
@@ -491,7 +391,7 @@ public:
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev, ui64 cookie,
NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout, TMaybe<TGroupStat::EKind> latencyQueueKind,
- TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode)
+ TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo,
latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get",
@@ -504,7 +404,6 @@ public:
, RequestsSent(0)
, ResponsesReceived(0)
, ReportedBytes(0)
- , IsVMultiPutMode(isVMultiPutMode)
{
ReportBytes(sizeof(*this));
MaxSaneRequests = ev->QuerySize * info->Type.TotalPartCount() * (1 + info->Type.Handoff()) * 3;
@@ -557,7 +456,6 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvVGetResult, Handle);
hFunc(TEvBlobStorage::TEvVPutResult, Handle);
- hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle);
hFunc(TEvAccelerateGet, Handle);
hFunc(TEvAcceleratePut, Handle);
}
@@ -569,9 +467,9 @@ IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupIn
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev,
ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout,
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now,
- TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode) {
+ TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupGetRequest(info, state, source, mon, ev, cookie, std::move(traceId),
- std::move(nodeLayout), latencyQueueKind, now, storagePoolCounters, isVMultiPutMode);
+ std::move(nodeLayout), latencyQueueKind, now, storagePoolCounters);
}
}//NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
index 3c9538fc244..003a4e1a777 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
@@ -226,10 +226,6 @@ TString TGetImpl::DumpFullState() const {
str << Endl;
str << " VPutResponses# " << VPutResponses;
str << Endl;
- str << " VMultiPutRequests# " << VMultiPutRequests;
- str << Endl;
- str << " VMultiPutResponses# " << VMultiPutResponses;
- str << Endl;
str << " IsNoData# " << IsNoData;
str << Endl;
@@ -275,118 +271,54 @@ void TGetImpl::GenerateInitialRequests(TLogContext &logCtx, TDeque<std::unique_p
}
void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets) {
- for (ui32 diskOrderNumber = 0; diskOrderNumber < Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size();
- ++diskOrderNumber) {
- TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber];
- ui32 endIdx = requests.GetsToSend.size();
- ui32 beginIdx = requests.FirstUnsentRequestIdx;
-
- if (beginIdx < endIdx) {
- TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber);
- auto vGet = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vDiskId, Deadline, Blackboard.GetHandleClass,
- TEvBlobStorage::TEvVGet::EFlags::None, TVGetCookie(beginIdx, endIdx), {}, ForceBlockTabletData);
- for (ui32 idx = beginIdx; idx < endIdx; ++idx) {
- TDiskGetRequest &get = requests.GetsToSend[idx];
- ui64 cookie = idx;
- vGet->AddExtremeQuery(get.Id, get.Shift, get.Size, &cookie);
- if (ReportDetailedPartMap) {
- get.PartMapIndex = Blackboard.AddPartMap(get.Id, diskOrderNumber, RequestIndex);
- }
- }
- vGet->Record.SetSuppressBarrierCheck(IsInternal);
- vGet->Record.SetTabletId(TabletId);
- vGet->Record.SetAcquireBlockedGeneration(AcquireBlockedGeneration);
-
- if (ReaderTabletData) {
- auto msg = vGet->Record.MutableReaderTabletData();
- msg->SetId(ReaderTabletData->Id);
- msg->SetGeneration(ReaderTabletData->Generation);
- }
-
- R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << diskOrderNumber
- << " beginIdx# " << beginIdx
- << " endIdx# " << endIdx
- << " vGet# " << vGet->ToString());
- outVGets.push_back(std::move(vGet));
- ++RequestIndex;
- Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentRequestIdx = endIdx;
+ TStackVec<std::unique_ptr<TEvBlobStorage::TEvVGet>, TypicalDisksInGroup> gets(Info->GetTotalVDisksNum());
+
+ for (auto& get : Blackboard.GroupDiskRequests.GetsPending) {
+ auto& vget = gets[get.OrderNumber];
+ if (!vget) {
+ const TVDiskID vdiskId = Info->GetVDiskId(get.OrderNumber);
+ vget = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, Deadline, Blackboard.GetHandleClass,
+ TEvBlobStorage::TEvVGet::EFlags::None, {}, {}, ForceBlockTabletData);
}
+ std::optional<ui64> cookie;
+ if (ReportDetailedPartMap) {
+ cookie = Blackboard.AddPartMap(get.Id, get.OrderNumber, RequestIndex);
+ }
+ vget->AddExtremeQuery(get.Id, get.Shift, get.Size, cookie ? &cookie.value() : nullptr);
+ vget->Record.SetSuppressBarrierCheck(IsInternal);
+ vget->Record.SetTabletId(TabletId);
+ vget->Record.SetAcquireBlockedGeneration(AcquireBlockedGeneration);
+ if (ReaderTabletData) {
+ auto msg = vget->Record.MutableReaderTabletData();
+ msg->SetId(ReaderTabletData->Id);
+ msg->SetGeneration(ReaderTabletData->Generation);
+ }
+ R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
+ << " vget# " << vget->ToString());
}
-}
-void TGetImpl::PrepareVPuts(TLogContext &logCtx,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) {
- for (ui32 diskOrderNumber = 0; diskOrderNumber < Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size();
- ++diskOrderNumber) {
- const TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber];
- ui32 endIdx = requests.PutsToSend.size();
- ui32 beginIdx = requests.FirstUnsentPutIdx;
-
- if (beginIdx < endIdx) {
- TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber);
- for (ui32 idx = beginIdx; idx < endIdx; ++idx) {
- const TDiskPutRequest &put = requests.PutsToSend[idx];
- ui64 cookie = TBlobCookie(diskOrderNumber, put.BlobIdx, put.Id.PartId(),
- VPutRequests);
- Y_DEBUG_ABORT_UNLESS(Info->Type.GetErasure() != TBlobStorageGroupType::ErasureMirror3of4 ||
- put.Id.PartId() != 3 || put.Buffer.IsEmpty());
- auto vPut = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vDiskId, true, &cookie,
- Deadline, Blackboard.PutHandleClass);
- R_LOG_DEBUG_SX(logCtx, "BPG15", "Send put to orderNumber# " << diskOrderNumber << " idx# " << idx
- << " vPut# " << vPut->ToString());
- outVPuts.push_back(std::move(vPut));
- ++VPutRequests;
- ReceivedVPutResponses.push_back(false);
- }
- Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentPutIdx = endIdx;
+ for (auto& vget : gets) {
+ if (vget) {
+ outVGets.push_back(std::move(vget));
+ ++RequestIndex;
}
}
+
+ Blackboard.GroupDiskRequests.GetsPending.clear();
}
-void TGetImpl::PrepareVPuts(TLogContext &logCtx,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts) {
- for (ui32 diskOrderNumber = 0; diskOrderNumber < Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size();
- ++diskOrderNumber) {
- const TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber];
- ui32 endIdx = requests.PutsToSend.size();
- ui32 beginIdx = requests.FirstUnsentPutIdx;
- if (beginIdx < endIdx) {
- TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber);
- // set cookie after adding items
- auto vMultiPut = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, Deadline, Blackboard.PutHandleClass,
- true, nullptr);
- ui64 bytes = 0;
- ui64 lastItemCount = 0;
- for (ui32 idx = beginIdx; idx < endIdx; ++idx) {
- const TDiskPutRequest &put = requests.PutsToSend[idx];
- ui64 cookie = TBlobCookie(diskOrderNumber, put.BlobIdx, put.Id.PartId(), VMultiPutRequests);
- ui64 itemSize = vMultiPut->Record.ItemsSize();
- if (itemSize == MaxBatchedPutRequests || bytes + put.Buffer.size() > MaxBatchedPutSize) {
- vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests));
- ++VMultiPutRequests;
- ReceivedVMultiPutResponses.push_back(false);
- R_LOG_DEBUG_SX(logCtx, "BPG16", "Send multiPut to orderNumber# " << diskOrderNumber << " count# "
- << vMultiPut->Record.ItemsSize() << " vMultiPut# " << vMultiPut->ToString());
- outVMultiPuts.push_back(std::move(vMultiPut));
- // set cookie after adding items
- vMultiPut = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, Deadline,
- Blackboard.PutHandleClass, true, nullptr);
- bytes = 0;
- lastItemCount = 0;
- }
- bytes += put.Buffer.size();
- lastItemCount++;
- vMultiPut->AddVPut(put.Id, TRcBuf(TRope(put.Buffer)), &cookie, nullptr, NWilson::TTraceId());
- }
- vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests));
- ++VMultiPutRequests;
- ReceivedVMultiPutResponses.push_back(false);
- R_LOG_DEBUG_SX(logCtx, "BPG17", "Send multiPut to orderNumber# " << diskOrderNumber << " count# "
- << vMultiPut->Record.ItemsSize() << " vMultiPut# " << vMultiPut->ToString());
- outVMultiPuts.push_back(std::move(vMultiPut));
- Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentPutIdx = endIdx;
- }
+void TGetImpl::PrepareVPuts(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) {
+ for (auto& put : Blackboard.GroupDiskRequests.PutsPending) {
+ const TVDiskID vdiskId = Info->GetVDiskId(put.OrderNumber);
+ Y_DEBUG_ABORT_UNLESS(Info->Type.GetErasure() != TBlobStorageGroupType::ErasureMirror3of4 ||
+ put.Id.PartId() != 3 || put.Buffer.IsEmpty());
+ auto vput = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vdiskId, true, nullptr, Deadline,
+ Blackboard.PutHandleClass);
+ R_LOG_DEBUG_SX(logCtx, "BPG15", "Send put to orderNumber# " << put.OrderNumber << " vput# " << vput->ToString());
+ outVPuts.push_back(std::move(vput));
+ ++VPutRequests;
}
+ Blackboard.GroupDiskRequests.PutsPending.clear();
}
EStrategyOutcome TGetImpl::RunBoldStrategy(TLogContext &logCtx) {
@@ -443,17 +375,6 @@ void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVPutResult &
ui32 orderNumber = Info->GetOrderNumber(shortId);
const TLogoBlobID blob = LogoBlobIDFromLogoBlobID(record.GetBlobID());
- Y_ABORT_UNLESS(record.HasCookie());
- TBlobCookie cookie(record.GetCookie());
- Y_ABORT_UNLESS(cookie.GetVDiskOrderNumber() == orderNumber);
- Y_ABORT_UNLESS(cookie.GetPartId() == blob.PartId());
-
- ui64 requestIdx = cookie.GetRequestIdx();
- Y_VERIFY_S(!ReceivedVPutResponses[requestIdx], "the response is received twice"
- << " Event# " << ev.ToString()
- << " State# " << DumpFullState());
- ReceivedVPutResponses[requestIdx] = true;
-
const NKikimrProto::EReplyStatus status = record.GetStatus();
++VPutResponses;
switch (status) {
@@ -472,50 +393,4 @@ void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVPutResult &
Step(logCtx, outVGets, outVPuts, outGetResult);
}
-void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVMultiPutResult &ev,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts,
- TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) {
- const NKikimrBlobStorage::TEvVMultiPutResult &record = ev.Record;
- Y_ABORT_UNLESS(record.HasVDiskID());
- TVDiskID vdisk = VDiskIDFromVDiskID(record.GetVDiskID());
- TVDiskIdShort shortId(vdisk);
- ui32 orderNumber = Info->GetOrderNumber(shortId);
-
- Y_ABORT_UNLESS(record.HasCookie());
- TVMultiPutCookie cookie(record.GetCookie());
- Y_ABORT_UNLESS(cookie.GetVDiskOrderNumber() == orderNumber);
- Y_ABORT_UNLESS(cookie.GetItemCount() == record.ItemsSize());
-
- ui64 requestIdx = cookie.GetRequestIdx();
- Y_VERIFY_S(!ReceivedVMultiPutResponses[requestIdx], "the response is received twice"
- << " Event# " << ev.ToString()
- << " State# " << DumpFullState());
- ReceivedVMultiPutResponses[requestIdx] = true;
-
- ++VMultiPutResponses;
- for (auto &item : record.GetItems()) {
- const NKikimrProto::EReplyStatus status = item.GetStatus();
- const TLogoBlobID blob = LogoBlobIDFromLogoBlobID(item.GetBlobID());
- Y_ABORT_UNLESS(item.HasCookie());
- TBlobCookie itemCookie(item.GetCookie());
- Y_ABORT_UNLESS(itemCookie.GetVDiskOrderNumber() == orderNumber);
- Y_ABORT_UNLESS(itemCookie.GetPartId() == blob.PartId());
- switch (status) {
- case NKikimrProto::ERROR:
- case NKikimrProto::VDISK_ERROR_STATE:
- case NKikimrProto::OUT_OF_SPACE:
- Blackboard.AddErrorResponse(blob, orderNumber);
- break;
- case NKikimrProto::OK:
- case NKikimrProto::ALREADY:
- Blackboard.AddPutOkResponse(blob, orderNumber);
- break;
- default:
- Y_ABORT("Unexpected status# %s", NKikimrProto::EReplyStatus_Name(status).data());
- }
- }
- Step(logCtx, outVGets, outVMultiPuts, outGetResult);
-}
-
}//NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
index 24ae81b8476..13cb325251c 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
@@ -2,7 +2,6 @@
#include "dsproxy.h"
#include "dsproxy_blackboard.h"
-#include "dsproxy_cookies.h"
#include "dsproxy_mon.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <util/generic/set.h>
@@ -31,8 +30,6 @@ class TGetImpl {
ui32 BlockedGeneration = 0;
ui32 VPutRequests = 0;
ui32 VPutResponses = 0;
- ui32 VMultiPutRequests = 0;
- ui32 VMultiPutResponses = 0;
bool IsNoData = false;
bool IsReplied = false;
@@ -43,9 +40,6 @@ class TGetImpl {
ui32 RequestIndex = 0;
ui32 ResponseIndex = 0;
- TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses;
- TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses;
-
const TString RequestPrefix;
const bool PhantomCheck;
@@ -146,14 +140,6 @@ public:
return VPutResponses;
}
- ui64 GetVMultiPutRequests() const {
- return VMultiPutRequests;
- }
-
- ui64 GetVMultiPutResponses() const {
- return VMultiPutResponses;
- }
-
ui64 GetRequestIndex() const {
return RequestIndex;
}
@@ -162,12 +148,11 @@ public:
return ResponseIndex;
}
-
void GenerateInitialRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets);
- template <typename TVPutEvent>
void OnVGetResult(TLogContext &logCtx, TEvBlobStorage::TEvVGetResult &ev,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts,
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts,
TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) {
const NKikimrBlobStorage::TEvVGetResult &record = ev.Record;
Y_ABORT_UNLESS(record.HasStatus());
@@ -195,16 +180,11 @@ public:
const NKikimrBlobStorage::TQueryResult &result = record.GetResult(i);
Y_ABORT_UNLESS(result.HasStatus());
const NKikimrProto::EReplyStatus replyStatus = result.GetStatus();
- Y_ABORT_UNLESS(result.HasCookie());
- const ui64 cookie = result.GetCookie();
Y_ABORT_UNLESS(result.HasBlobID());
const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(result.GetBlobID());
if (ReportDetailedPartMap) {
- Blackboard.ReportPartMapStatus(blobId,
- Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[orderNumber].GetsToSend[cookie].PartMapIndex,
- ResponseIndex,
- replyStatus);
+ Blackboard.ReportPartMapStatus(blobId, result.GetCookie(), ResponseIndex, replyStatus);
}
TRope resultBuffer = ev.GetBlobData(result);
@@ -266,17 +246,12 @@ public:
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts,
TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult);
- void OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVMultiPutResult &ev,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts,
- TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult);
-
void PrepareReply(NKikimrProto::EReplyStatus status, TString errorReason, TLogContext &logCtx,
TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult);
- template <typename TVPutEvent>
void AccelerateGet(TLogContext &logCtx, i32 slowDiskOrderNumber,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) {
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) {
TAutoPtr<TEvBlobStorage::TEvGetResult> outGetResult;
TBlackboard::EAccelerationMode prevMode = Blackboard.AccelerationMode;
Blackboard.AccelerationMode = TBlackboard::AccelerationModeSkipMarked;
@@ -294,9 +269,9 @@ public:
RequestPrefix.data(), outGetResult->Print(false).c_str(), DumpFullState().c_str());
}
- template <typename TVPutEvent>
void AcceleratePut(TLogContext &logCtx, i32 slowDiskOrderNumber,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) {
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) {
AccelerateGet(logCtx, slowDiskOrderNumber, outVGets, outVPuts);
}
@@ -312,9 +287,9 @@ protected:
EStrategyOutcome RunStrategies(TLogContext &logCtx);
// Returns true if there are additional requests to send
- template <typename TVPutEvent>
bool Step(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
- TDeque<std::unique_ptr<TVPutEvent>> &outVPuts, TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) {
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts,
+ TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult) {
switch (auto outcome = RunStrategies(logCtx)) {
case EStrategyOutcome::IN_PROGRESS: {
const ui32 numRequests = outVGets.size() + outVPuts.size();
@@ -337,8 +312,6 @@ protected:
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets);
void PrepareVPuts(TLogContext &logCtx,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts);
- void PrepareVPuts(TLogContext &logCtx,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &outVMultiPuts);
ui64 GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId);
}; //TGetImpl
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index 8f67e9ee6fb..5c499eda22a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -2,7 +2,6 @@
#include "dsproxy.h"
#include "dsproxy_blackboard.h"
-#include "dsproxy_cookies.h"
#include "dsproxy_mon.h"
#include "dsproxy_strategy_accelerate_put.h"
#include "dsproxy_strategy_accelerate_put_m3dc.h"
@@ -236,13 +235,8 @@ public:
// Group put requests together by VDiskID.
std::unordered_multimap<ui32, TDiskPutRequest*> puts;
- auto& perDiskRequests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber;
- for (ui32 orderNumber = 0, numDisks = perDiskRequests.size(); orderNumber < numDisks; ++orderNumber) {
- TDiskRequests& requests = perDiskRequests[orderNumber];
- while (requests.FirstUnsentPutIdx < requests.PutsToSend.size()) {
- auto& putRequest = requests.PutsToSend[requests.FirstUnsentPutIdx++];
- puts.emplace(orderNumber, &putRequest);
- }
+ for (auto& put : Blackboard.GroupDiskRequests.PutsPending) {
+ puts.emplace(put.OrderNumber, &put);
}
// Generate queries to VDisks.
@@ -279,6 +273,7 @@ public:
}
}
+ Blackboard.GroupDiskRequests.PutsPending.clear();
return events;
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
index 50a016384c8..2a7344105c7 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
@@ -61,7 +61,7 @@ namespace NKikimr {
reqID = Register(
CreateBlobStorageGroupGetRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
ev->Get(), ev->Cookie, std::move(ev->TraceId), TNodeLayoutInfoPtr(NodeLayoutInfo),
- kind, TActivationContext::Now(), StoragePoolCounters, false));
+ kind, TActivationContext::Now(), StoragePoolCounters));
ActiveRequests.insert(reqID);
} else {
Mon->EventMultiGet->Inc();
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h
index f196cc92561..c74ea2d3c32 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h
@@ -118,12 +118,12 @@ struct TDSProxyEnv {
}
std::unique_ptr<IActor> CreateGetRequestActor(TEvBlobStorage::TEvGet::TPtr &ev,
- NKikimrBlobStorage::EPutHandleClass handleClass, bool withMultiPut)
+ NKikimrBlobStorage::EPutHandleClass handleClass)
{
TMaybe<TGroupStat::EKind> kind = PutHandleClassToGroupStatKind(handleClass);
return std::unique_ptr<IActor>(CreateBlobStorageGroupGetRequest(Info, GroupQueues, ev->Sender, Mon,
ev->Get(), ev->Cookie, std::move(ev->TraceId), TNodeLayoutInfoPtr(NodeLayoutInfo),
- kind, TInstant::Now(), StoragePoolCounters, withMultiPut));
+ kind, TInstant::Now(), StoragePoolCounters));
}
std::unique_ptr<IActor> CreatePatchRequestActor(TEvBlobStorage::TEvPatch::TPtr &ev, bool useVPatch = false) {
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp
index 141cca4d851..730ef996b5f 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_get_ut.cpp
@@ -85,8 +85,6 @@ void TestIntervalsAndCrcAllOk(TErasureType::EErasureSpecies erasureSpecies, bool
TAutoPtr<TEvBlobStorage::TEvGetResult> getResult;
for (ui64 vGetIdx = 0; vGetIdx < vGets.size(); ++vGetIdx) {
bool isLast = (vGetIdx == vGets.size() - 1);
- auto &request = vGets[vGetIdx]->Record;
- Y_ABORT_UNLESS(request.HasCookie());
//ui64 messageCookie = request->Record.GetCookie();
TEvBlobStorage::TEvVGetResult vGetResult;
group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -154,11 +152,6 @@ Y_UNIT_TEST(TestMirror32GetBlobCrcCheck) {
}
class TTestWipedAllOkStep {
- enum ETVPutEventKind {
- TVPEK_VPUT,
- TVPEK_VMULTIPUT
- };
-
public:
struct TVPutInfo {
TVDiskID VDiskId;
@@ -195,8 +188,6 @@ private:
ui64 VPutRequests = 0;
ui64 VPutResponses = 0;
- ui64 VMultiPutRequests = 0;
- ui64 VMultiPutResponses = 0;
ui64 RequestIndex = 0;
ui64 ResponseIndex = 0;
@@ -241,16 +232,12 @@ public:
}
}
- void Run(bool useVMultiPut) {
+ void Run() {
for (ui64 qci = 0; qci < QueryCounts.size(); ++qci) {
ui64 queryCount = QueryCounts[qci];
for (ui64 bci = 0; bci < QueryCounts.size(); ++bci) {
ui64 blobCount = QueryCounts[bci];
- if (useVMultiPut) {
- SubStep<TVPEK_VMULTIPUT>(queryCount, blobCount);
- } else {
- SubStep<TVPEK_VPUT>(queryCount, blobCount);
- }
+ SubStep(queryCount, blobCount);
}
}
}
@@ -274,8 +261,6 @@ private:
void ClearCounters() {
VPutRequests = 0;
VPutResponses = 0;
- VMultiPutRequests = 0;
- VMultiPutResponses = 0;
RequestIndex = 0;
ResponseIndex = 0;
}
@@ -283,8 +268,6 @@ private:
void AssertCounters(TGetImpl &getImpl) {
UNIT_ASSERT_C(VPutResponses == getImpl.GetVPutResponses()
&& VPutRequests == getImpl.GetVPutRequests()
- && VMultiPutResponses == getImpl.GetVMultiPutResponses()
- && VMultiPutRequests == getImpl.GetVMultiPutRequests()
&& RequestIndex == getImpl.GetRequestIndex()
&& ResponseIndex == getImpl.GetResponseIndex(),
"Not equal expected VPutRequest and VPutResponse with given"
@@ -292,10 +275,6 @@ private:
<< " VPutResponses# " << VPutResponses
<< " getImpl.VPutRequests# " << getImpl.GetVPutRequests()
<< " getImpl.VPutResponses# " << getImpl.GetVPutResponses()
- << " VMultiPutRequests# " << VMultiPutRequests
- << " VMultiPutResponses# " << VMultiPutResponses
- << " getImpl.VMultiPutRequests# " << getImpl.GetVMultiPutRequests()
- << " getImpl.VMultiPutResponses# " << getImpl.GetVMultiPutResponses()
<< " RequestIndex# " << RequestIndex
<< " ResponseIndex# " << ResponseIndex
<< " getImpl.RequestIndex# " << getImpl.GetRequestIndex()
@@ -307,11 +286,9 @@ private:
TAutoPtr<TEvBlobStorage::TEvGetResult> &getResult) {
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
auto vdisk = VDiskIDFromVDiskID(putRequest.GetVDiskID());
auto blobId = LogoBlobIDFromLogoBlobID(putRequest.GetBlobID());
- TBlobCookie cookie(putRequest.GetCookie());
- SendVPuts.push_back({vdisk, blobId, cookie.GetBlobIdx()});
+ SendVPuts.push_back({vdisk, blobId, 0});
TEvBlobStorage::TEvVPutResult vPutResult;
vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest);
@@ -332,51 +309,7 @@ private:
vPuts.clear();
}
- void ProcessVPuts(TLogContext &logCtx, TGetImpl &getImpl,
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &vPuts,
- TAutoPtr<TEvBlobStorage::TEvGetResult> &getResult) {
- for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
- auto &multiPutRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(multiPutRequest.HasCookie());
-
- auto vdisk = VDiskIDFromVDiskID(multiPutRequest.GetVDiskID());
- UNIT_ASSERT(multiPutRequest.ItemsSize() <= MaxBatchedPutRequests);
- ui64 sendBytes = 0;
- for (auto &item : multiPutRequest.GetItems()) {
- Y_ABORT_UNLESS(item.HasCookie());
- TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID());
- TString buffer = item.GetBuffer();
- sendBytes += buffer.size();
- TBlobCookie cookie(item.GetCookie());
- SendVPuts.push_back({vdisk, blobId, cookie.GetBlobIdx()});
- }
- UNIT_ASSERT(sendBytes <= MaxBatchedPutSize);
-
- TEvBlobStorage::TEvVMultiPutResult vMultiPutResult;
- vMultiPutResult.MakeError(NKikimrProto::OK, TString(), multiPutRequest);
-
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> nextVGets;
- TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> nextVPuts;
- getImpl.OnVPutResult(logCtx, vMultiPutResult,
- nextVGets, nextVPuts, getResult);
- VMultiPutResponses++;
- RequestIndex += nextVGets.size();
- VMultiPutRequests += nextVPuts.size();
- AssertCounters(getImpl);
-
- std::move(nextVGets.begin(), nextVGets.end(), std::back_inserter(vGets));
- std::move(nextVPuts.begin(), nextVPuts.end(), std::back_inserter(vPuts));
- if (getResult) {
- break;
- }
- }
- vPuts.clear();
- }
-
- template <ETVPutEventKind TVPutEventKind>
void SubStep(ui64 queryCount, ui64 blobCount) {
- using TVPutEvent = std::conditional_t<TVPutEventKind == TVPEK_VPUT, TEvBlobStorage::TEvVPut,
- TEvBlobStorage::TEvVMultiPut>;
TArrayHolder<TEvBlobStorage::TEvGet::TQuery> queriesA(
new TEvBlobStorage::TEvGet::TQuery[MaxQueryCount]);
TArrayHolder<TEvBlobStorage::TEvGet::TQuery> queriesB(
@@ -394,7 +327,7 @@ private:
TGetImpl getImpl(Group->GetInfo(), GroupQueues, &ev, nullptr);
ClearCounters();
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
- TDeque<std::unique_ptr<TVPutEvent>> vPuts;
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
TLogContext logCtx(NKikimrServices::BS_PROXY_GET, false);
logCtx.LogAcc.IsLogEnabled = false;
getImpl.GenerateInitialRequests(logCtx, vGets);
@@ -405,23 +338,17 @@ private:
for (ui64 vGetIdx = 0; vGetIdx < vGets.size(); ++vGetIdx) {
bool isLast = (vGetIdx == vGets.size() - 1);
- auto &request = vGets[vGetIdx]->Record;
- Y_ABORT_UNLESS(request.HasCookie());
//ui64 messageCookie = request->Record.GetCookie();
TEvBlobStorage::TEvVGetResult vGetResult;
Group->OnVGet(*vGets[vGetIdx], vGetResult);
// TODO: generate result
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> nextVGets;
- TDeque<std::unique_ptr<TVPutEvent>> nextVPuts;
+ TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> nextVPuts;
getImpl.OnVGetResult(logCtx, vGetResult, nextVGets, nextVPuts, getResult);
ResponseIndex++;
RequestIndex += nextVGets.size();
- if constexpr (TVPutEventKind == TVPEK_VPUT) {
- VPutRequests += nextVPuts.size();
- } else {
- VMultiPutRequests += nextVPuts.size();
- }
+ VPutRequests += nextVPuts.size();
AssertCounters(getImpl);
std::move(nextVGets.begin(), nextVGets.end(), std::back_inserter(vGets));
@@ -456,7 +383,7 @@ private:
}
};
-void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool isVerboseNoDataEnabled, bool multiput) {
+void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool isVerboseNoDataEnabled) {
TActorSystemStub actorSystemStub;
const ui32 groupId = 0;
@@ -480,7 +407,7 @@ void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool
testStep.Init();
testStep.AddWipedVDisk(wiped1, error1);
testStep.AddWipedVDisk(wiped2, error2);
- testStep.Run(multiput);
+ testStep.Run();
}
}
}
@@ -488,61 +415,6 @@ void TestIntervalsWipedAllOk(TErasureType::EErasureSpecies erasureSpecies, bool
}
}
-void TestIntervalsWipedAllOkComparisonVMultiPutAndVPut(TErasureType::EErasureSpecies erasureSpecies,
- bool isVerboseNoDataEnabled = false) {
- TActorSystemStub actorSystemStub;
-
- const ui32 groupId = 0;
- TBlobStorageGroupType groupType(erasureSpecies);
- const ui32 domainCount = groupType.BlobSubgroupSize();
-
- const TVector<ui64> queryCounts = {1, 2};
-
- TVector<TLogoBlobID> blobIDs = {
- TLogoBlobID(72075186224047637, 1, 863, 1, 786, 24576),
- TLogoBlobID(72075186224047637, 1, 2194, 1, 142, 12288)
- };
-
- TVector<TBlobTestSet::TBlob> blobs;
- for (const auto& id : blobIDs) {
- TStringBuilder builder;
- for (size_t i = 0; i < id.BlobSize(); ++i) {
- builder << 'a';
- }
- blobs.emplace_back(id, builder);
- }
-
- for (ui64 wiped1 = 0; wiped1 < domainCount; ++wiped1) {
- for (ui64 wiped2 = 0; wiped2 <= wiped1; ++wiped2) {
- ui64 maxErrorMask = (wiped1 == wiped2 ? 4 : 24);
- for (ui64 errorMask = 0; errorMask <= maxErrorMask; ++errorMask) {
- ui64 error1 = errorMask % 5;
- ui64 error2 = errorMask / 5;
- TTestWipedAllOkStep testStep(
- groupId, erasureSpecies, domainCount, queryCounts,
- isVerboseNoDataEnabled, true);
- testStep.SetBlobs(blobs);
-
- testStep.Init();
- testStep.AddWipedVDisk(wiped1, error1);
- testStep.AddWipedVDisk(wiped2, error2);
- testStep.Run(false);
- auto sendVPuts = std::move(testStep.SendVPuts);
-
- testStep.Init();
- testStep.AddWipedVDisk(wiped1, error1);
- testStep.AddWipedVDisk(wiped2, error2);
- testStep.Run(true);
- auto sendVMultiPuts = std::move(testStep.SendVPuts);
-
- Sort(sendVPuts.begin(), sendVPuts.end());
- Sort(sendVMultiPuts.begin(), sendVMultiPuts.end());
- UNIT_ASSERT(sendVPuts == sendVMultiPuts);
- }
- }
- }
-}
-
class TGetSimulator {
TGroupMock Group;
TIntrusivePtr<TGroupQueues> GroupQueues;
@@ -590,8 +462,6 @@ public:
for (ui64 vGetIdx = 0; vGetIdx < vGets.size(); ++vGetIdx) {
bool isLast = (vGetIdx == vGets.size() - 1);
- auto &request = vGets[vGetIdx]->Record;
- Y_ABORT_UNLESS(request.HasCookie());
TEvBlobStorage::TEvVGetResult vGetResult;
Group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -610,7 +480,6 @@ public:
}
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
TEvBlobStorage::TEvVPutResult vPutResult;
vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest);
@@ -707,9 +576,7 @@ Y_UNIT_TEST(TestBlock42VGetCountWithErasure) {
continue;
}
bool isLast = (vGetIdx == vGets.size() - 1);
- auto &request = vGets[vGetIdx]->Record;
- Y_ABORT_UNLESS(request.HasCookie());
TEvBlobStorage::TEvVGetResult vGetResult;
group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -730,7 +597,6 @@ Y_UNIT_TEST(TestBlock42VGetCountWithErasure) {
}
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
TEvBlobStorage::TEvVPutResult vPutResult;
vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest);
@@ -854,9 +720,7 @@ Y_UNIT_TEST(TestBlock42WipedOneDiskAndErrorDurringGet) {
group.SetPredictedDelayNs(7, 1);
}
bool isLast = (vGetIdx == vGets.size() - 1);
- auto &request = vGets[vGetIdx]->Record;
- Y_ABORT_UNLESS(request.HasCookie());
TEvBlobStorage::TEvVGetResult vGetResult;
group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -876,7 +740,6 @@ Y_UNIT_TEST(TestBlock42WipedOneDiskAndErrorDurringGet) {
}
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
TEvBlobStorage::TEvVPutResult vPutResult;
vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest);
@@ -1123,13 +986,11 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo
std::swap(vGets[vGetIdx], vGets[vGetIdx + needIdx]);
bool isLast = (vGetIdx == vGets.size() - 1);
- auto &request = vGets[vGetIdx]->Record;
if ((ui32)errorIteration == vGetIdx) {
group.SetError(errorDisk, NKikimrProto::ERROR);
}
- Y_ABORT_UNLESS(request.HasCookie());
TEvBlobStorage::TEvVGetResult vGetResult;
group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -1150,7 +1011,6 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo
}
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
TEvBlobStorage::TEvVPutResult vPutResult;
vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest);
@@ -1201,15 +1061,7 @@ void TestWipedErrorWithTwoBlobs(TErasureType::EErasureSpecies erasureSpecies, bo
}
Y_UNIT_TEST(TestBlock42GetIntervalsWipedAllOk) {
- TestIntervalsWipedAllOk(TErasureType::Erasure4Plus2Block, false, false);
-}
-
-Y_UNIT_TEST(TestBlock42GetIntervalsWipedAllOkVMultiPut) {
- TestIntervalsWipedAllOk(TErasureType::Erasure4Plus2Block, false, true);
-}
-
-Y_UNIT_TEST(TestBlock42GetIntervalsWipedAllOkComparisonVMultiPutAndVPut) {
- TestIntervalsWipedAllOkComparisonVMultiPutAndVPut(TErasureType::Erasure4Plus2Block);
+ TestIntervalsWipedAllOk(TErasureType::Erasure4Plus2Block, false);
}
Y_UNIT_TEST(TestBlock42GetIntervalsWipedError) {
@@ -1221,15 +1073,7 @@ Y_UNIT_TEST(TestBlock42WipedErrorWithTwoBlobs) {
}
Y_UNIT_TEST(TestMirror32GetIntervalsWipedAllOk) {
- TestIntervalsWipedAllOk(TErasureType::ErasureMirror3Plus2, false, false);
-}
-
-Y_UNIT_TEST(TestMirror32GetIntervalsWipedAllOkVMultiPut) {
- TestIntervalsWipedAllOk(TErasureType::ErasureMirror3Plus2, false, true);
-}
-
-Y_UNIT_TEST(TestMirror32GetIntervalsWipedAllOkComparisonVMultiPutAndVPut) {
- TestIntervalsWipedAllOkComparisonVMultiPutAndVPut(TErasureType::ErasureMirror3Plus2);
+ TestIntervalsWipedAllOk(TErasureType::ErasureMirror3Plus2, false);
}
void SpecificTest(ui32 badA, ui32 badB, ui32 blobSize, TMap<i64, i64> sizeForOffset) {
@@ -1431,7 +1275,6 @@ public:
auto &request = vGets[vGetIdx]->Record;
VERBOSE("vGetIdx# " << vGetIdx);
VERBOSE("Send TEvVGet to VDiskID# " << VDiskIDFromVDiskID(request.GetVDiskID()));
- Y_ABORT_UNLESS(request.HasCookie());
//ui64 messageCookie = request->Record.GetCookie();
TEvBlobStorage::TEvVGetResult vGetResult;
Group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -1447,7 +1290,6 @@ public:
}
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
TEvBlobStorage::TEvVPutResult vPutResult;
vPutResult.MakeError(NKikimrProto::OK, TString(), putRequest);
@@ -1643,7 +1485,6 @@ public:
auto &request = vGets[vGetIdx]->Record;
VERBOSE("vGetIdx# " << vGetIdx << " request# " << vDIdx << " to domainIdx# " << domainIdx);
VERBOSE("Send TEvVGet to VDiskID# " << VDiskIDFromVDiskID(request.GetVDiskID()));
- Y_ABORT_UNLESS(request.HasCookie());
//ui64 messageCookie = request->Record.GetCookie();
TEvBlobStorage::TEvVGetResult vGetResult;
Group.OnVGet(*vGets[vGetIdx], vGetResult);
@@ -1684,7 +1525,6 @@ public:
if (!getResult) {
for (ui64 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) {
auto &putRequest = vPuts[vPutIdx]->Record;
- Y_ABORT_UNLESS(putRequest.HasCookie());
TEvBlobStorage::TEvVPutResult vPutResult;
if (Mode == ReadAndWriteErrors && vPutIdx == 0) {
vPutResult.MakeError(NKikimrProto::ERROR, TString(), putRequest);
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
index 64b8725e2d1..34bed8826bc 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
@@ -644,92 +644,6 @@ Y_UNIT_TEST(TestGivenBlock42GroupGenerationGreaterThanVDiskGenerations) {
testState.GrabEventPtr<TEvRequestEnd>();
}
-void MakeTestGivenBlock42GetRecoverMultiPutStatuses(NKikimrProto::EReplyStatus expectedStatus,
- const TVector<NKikimrProto::EReplyStatus> &statuses = {}) {
- Y_UNUSED(expectedStatus);
- TBlobStorageGroupType type = {TErasureType::Erasure4Plus2Block};
- TTestBasicRuntime runtime(1, false);
- Setup(runtime, type);
-
- constexpr ui64 blobCount = 2;
- TVector<TLogoBlobID> blobIds = {
- TLogoBlobID(72075186224047637, 1, 863, 1, 786, 24576),
- TLogoBlobID(72075186224047637, 1, 2194, 1, 142, 12288)
- };
- Y_ABORT_UNLESS(blobIds.size() == blobCount);
- Y_ABORT_UNLESS(statuses.empty() || statuses.size() == blobCount);
-
- TVector<TBlobTestSet::TBlob> blobs;
- for (const auto& id : blobIds) {
- TStringBuilder builder;
- for (size_t i = 0; i < id.BlobSize(); ++i) {
- builder << 'a';
- }
- blobs.emplace_back(id, builder);
- }
-
- TTestState testState(runtime, type, DSProxyEnv.Info);
-
- TGroupMock &groupMock = testState.GetGroupMock();
- testState.PutBlobsToGroupMock(blobs);
- groupMock.Wipe(3);
- groupMock.Wipe(4);
-
- TEvBlobStorage::TEvGet::TPtr ev = testState.CreateGetRequest(blobIds, true);
- runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog, true).release());
-
- testState.HandleVGetsWithMock(type.BlobSubgroupSize());
-
- if (statuses.empty()) {
- groupMock.SetError(3, expectedStatus);
- groupMock.SetError(4, expectedStatus);
- } else {
- TMap<TPartLocation, NKikimrProto::EReplyStatus> specialStatuses;
- for (ui64 idx = 0; idx < blobIds.size(); ++idx) {
- for (ui64 part = 3; part <= 4; ++part) {
- TLogoBlobID partBlobId(blobIds[idx], part - 2 * idx);
- TPartLocation id = testState.PrimaryVDiskForBlobPart(partBlobId);
- specialStatuses[id] = statuses[idx];
- }
- }
- groupMock.SetSpecialStatuses(specialStatuses);
- }
-
- testState.HandleVMultiPutsWithMock(3);
-
- TAutoPtr<IEventHandle> handle;
- auto getResult = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvGetResult>(handle);
- UNIT_ASSERT(getResult);
- UNIT_ASSERT(getResult->Status == expectedStatus);
-}
-
-Y_UNIT_TEST(TestGivenBlock42GetRecoverMultiPutStatuses) {
- constexpr ui64 statusCount = 3;
- NKikimrProto::EReplyStatus maybeStatuses[statusCount] = {
- NKikimrProto::OK,
- NKikimrProto::BLOCKED,
- NKikimrProto::DEADLINE
- };
- Y_ABORT_UNLESS(maybeStatuses[statusCount - 1] == NKikimrProto::DEADLINE);
- for (ui64 idx = 0; idx < statusCount; ++idx) {
- MakeTestGivenBlock42GetRecoverMultiPutStatuses(maybeStatuses[idx]);
- }
-}
-
-Y_UNIT_TEST(TestGivenBlock42GetRecoverMultiPut2ItemsStatuses) {
- constexpr ui64 statusCount = 3;
- NKikimrProto::EReplyStatus maybeStatuses[statusCount] = {
- NKikimrProto::OK,
- NKikimrProto::BLOCKED,
- NKikimrProto::DEADLINE
- };
- Y_ABORT_UNLESS(maybeStatuses[statusCount - 1] == NKikimrProto::DEADLINE);
- for (ui64 idx = 1; idx < statusCount; ++idx) {
- MakeTestGivenBlock42GetRecoverMultiPutStatuses(maybeStatuses[idx], {maybeStatuses[idx], maybeStatuses[0]});
- MakeTestGivenBlock42GetRecoverMultiPutStatuses(maybeStatuses[idx], {maybeStatuses[0], maybeStatuses[idx]});
- }
-}
-
Y_UNIT_TEST(TestGivenMirror3DCGetWithFirstSlowDisk) {
TBlobStorageGroupType type = {TErasureType::ErasureMirror3dc};
TTestBasicRuntime runtime(1, false);
@@ -741,7 +655,7 @@ Y_UNIT_TEST(TestGivenMirror3DCGetWithFirstSlowDisk) {
TEvBlobStorage::TEvGet::TPtr ev = testState.CreateGetRequest({blobId}, false);
- TActorId getActorId = runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog, false).release());
+ TActorId getActorId = runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog).release());
runtime.EnableScheduleForActor(getActorId);
testState.GrabEventPtr<TEvBlobStorage::TEvVGet>();
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h
index e4058925975..2eac0a80e84 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_vdisk_mock_ut.h
@@ -99,7 +99,6 @@ public:
void OnVGet(const TEvBlobStorage::TEvVGet &vGet, TEvBlobStorage::TEvVGetResult &outVGetResult) {
auto &request = vGet.Record;
- Y_ABORT_UNLESS(request.HasCookie());
if (IsError) {
outVGetResult.MakeError(Status, TString(), request);
return;
diff --git a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp
index 17ba3f7a16b..8b77cc4ee6b 100644
--- a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp
@@ -43,50 +43,48 @@ public:
}
void ProcessBlackboardRequests(TBlackboard& blackboard) {
- for (ui32 i = 0; i < blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); ++i) {
- auto& r = blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[i];
- Y_ABORT_UNLESS(i < DiskStates.size());
- auto& disk = DiskStates[i];
- for (auto& get : r.GetsToSend) {
- Ctest << "orderNumber# " << i << " get Id# " << get.Id;
- if (disk.InErrorState) {
- Ctest << " ERROR";
- blackboard.AddErrorResponse(get.Id, i);
- } else if (auto it = disk.Blobs.find(get.Id); it == disk.Blobs.end()) {
- Ctest << " NODATA";
- blackboard.AddNoDataResponse(get.Id, i);
- } else {
- std::visit(TOverloaded{
- [&](TNotYet&) {
- Ctest << " NOT_YET";
- blackboard.AddNotYetResponse(get.Id, i);
- },
- [&](TRope& buffer) {
- Ctest << " OK";
- size_t begin = Min<size_t>(get.Shift, buffer.size());
- size_t end = Min<size_t>(buffer.size(), begin + get.Size);
- TRope data(buffer.begin() + begin, buffer.begin() + end);
- blackboard.AddResponseData(get.Id, i, get.Shift, std::move(data));
- }
- }, it->second);
- }
- Ctest << Endl;
+ for (auto& get : blackboard.GroupDiskRequests.GetsPending) {
+ auto& disk = DiskStates[get.OrderNumber];
+ Ctest << "orderNumber# " << get.OrderNumber << " get Id# " << get.Id;
+ if (disk.InErrorState) {
+ Ctest << " ERROR";
+ blackboard.AddErrorResponse(get.Id, get.OrderNumber);
+ } else if (auto it = disk.Blobs.find(get.Id); it == disk.Blobs.end()) {
+ Ctest << " NODATA";
+ blackboard.AddNoDataResponse(get.Id, get.OrderNumber);
+ } else {
+ std::visit(TOverloaded{
+ [&](TNotYet&) {
+ Ctest << " NOT_YET";
+ blackboard.AddNotYetResponse(get.Id, get.OrderNumber);
+ },
+ [&](TRope& buffer) {
+ Ctest << " OK";
+ size_t begin = Min<size_t>(get.Shift, buffer.size());
+ size_t end = Min<size_t>(buffer.size(), begin + get.Size);
+ TRope data(buffer.begin() + begin, buffer.begin() + end);
+ blackboard.AddResponseData(get.Id, get.OrderNumber, get.Shift, std::move(data));
+ }
+ }, it->second);
}
- r.GetsToSend.clear();
- for (auto& put : r.PutsToSend) {
- Ctest << "orderNumber# " << i << " put Id# " << put.Id;
- if (disk.InErrorState) {
- Ctest << " ERROR";
- blackboard.AddErrorResponse(put.Id, i);
- } else {
- Ctest << " OK";
- disk.Blobs[put.Id] = std::move(put.Buffer);
- blackboard.AddPutOkResponse(put.Id, i);
- }
- Ctest << Endl;
+ Ctest << Endl;
+ }
+ blackboard.GroupDiskRequests.GetsPending.clear();
+
+ for (auto& put : blackboard.GroupDiskRequests.PutsPending) {
+ auto& disk = DiskStates[put.OrderNumber];
+ Ctest << "orderNumber# " << put.OrderNumber << " put Id# " << put.Id;
+ if (disk.InErrorState) {
+ Ctest << " ERROR";
+ blackboard.AddErrorResponse(put.Id, put.OrderNumber);
+ } else {
+ Ctest << " OK";
+ disk.Blobs[put.Id] = std::move(put.Buffer);
+ blackboard.AddPutOkResponse(put.Id, put.OrderNumber);
}
- r.PutsToSend.clear();
+ Ctest << Endl;
}
+ blackboard.GroupDiskRequests.PutsPending.clear();
}
};
@@ -223,21 +221,18 @@ void RunTestLevel(const TBlobStorageGroupInfo& info, TBlackboard& blackboard,
std::set<TOperation>& context, ui32& terminals) {
// see which operations we can add to the stock
const size_t stockSizeOnEntry = stock.size();
- auto& requests = blackboard.GroupDiskRequests.DiskRequestsForOrderNumber;
- for (ui32 i = 0; i < info.GetTotalVDisksNum(); ++i) {
- for (auto& j = requests[i].FirstUnsentRequestIdx; j < requests[i].GetsToSend.size(); ++j) {
- auto& get = requests[i].GetsToSend[j];
- stock.push_back(TGetQuery{i, get.Id, get.Shift, get.Size});
- const bool inserted = context.insert(stock.back()).second;
- UNIT_ASSERT(inserted);
- }
- for (auto& j = requests[i].FirstUnsentPutIdx; j < requests[i].PutsToSend.size(); ++j) {
- auto& put = requests[i].PutsToSend[j];
- stock.push_back(TPutQuery{i, put.Id});
- const bool inserted = context.insert(stock.back()).second;
- UNIT_ASSERT(inserted);
- }
+ for (auto& get : blackboard.GroupDiskRequests.GetsPending) {
+ stock.push_back(TGetQuery{get.OrderNumber, get.Id, get.Shift, get.Size});
+ const bool inserted = context.insert(stock.back()).second;
+ UNIT_ASSERT(inserted);
+ }
+ blackboard.GroupDiskRequests.GetsPending.clear();
+ for (auto& put : blackboard.GroupDiskRequests.PutsPending) {
+ stock.push_back(TPutQuery{put.OrderNumber, put.Id});
+ const bool inserted = context.insert(stock.back()).second;
+ UNIT_ASSERT(inserted);
}
+ blackboard.GroupDiskRequests.PutsPending.clear();
UNIT_ASSERT(!stock.empty());
bool canIssuePuts = true;
diff --git a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp
index ed4c397db8c..e9de1c0fab9 100644
--- a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp
@@ -84,7 +84,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
}
- void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType, ui32 alternative = 0)
+ void CountingEventsTest(TString typeOperation, ui32 eventsCount, TBlobStorageGroupType groupType)
{
TEnvironmentSetup env(true, groupType);
auto& runtime = env.Runtime;
@@ -127,7 +127,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
SendPut(test, originalBlobId3, data, NKikimrProto::OK);
finishEventsCount = test.Runtime->GetEventsProcessed();
- UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, alternative ? alternative : eventsCount);
+ UNIT_ASSERT_VALUES_EQUAL(finishEventsCount - startEventsCount, eventsCount);
} else if (typeOperation == "get") {
TLogoBlobID originalBlobId(tabletId, 1, 0, 0, size, 0);
NormalizePredictedDelays(queues);
@@ -164,7 +164,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) {
}
Y_UNIT_TEST(Put_Mirror3of4) {
- CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4, 114);
+ CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4);
}
Y_UNIT_TEST(Put_Mirror3dc) {