diff options
| author | alexvru <[email protected]> | 2023-12-13 15:33:45 +0300 |
|---|---|---|
| committer | alexvru <[email protected]> | 2023-12-13 17:19:47 +0300 |
| commit | 0ff91c3537fb83b599ab7ec0e9bb0dfb5e288dbb (patch) | |
| tree | 853db8a8c7beabd0f0747313556eab4b388a5d05 | |
| parent | bb147e7bfb3c509c1d6096bbc79babb02a9e9a33 (diff) | |
Handle long-lasting Put queries in DS proxy correctly -- part 3
16 files changed, 412 insertions, 665 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index d90fd233481..ad43358043d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -216,7 +216,7 @@ public: } template<typename TEvent> - bool CheckForTermErrors(TAutoPtr<TEventHandle<TEvent>>& ev) { + bool CheckForTermErrors(TAutoPtr<TEventHandle<TEvent>>& ev, bool suppressCommonErrors) { auto& record = ev->Get()->Record; auto& self = Derived(); @@ -260,6 +260,9 @@ public: if (status != NKikimrProto::RACE && status != NKikimrProto::BLOCKED && status != NKikimrProto::DEADLINE) { return false; // these statuses are non-terminal } else if (status != NKikimrProto::RACE) { + if (suppressCommonErrors) { + return false; // these errors will be handled in host code + } // this status is terminal and we have nothing to do about it return done(status, TStringBuilder() << "status# " << NKikimrProto::EReplyStatus_Name(status) << " from# " << vdiskId.ToString()); @@ -314,9 +317,9 @@ public: return true; } - bool ProcessEvent(TAutoPtr<IEventHandle>& ev) { + bool ProcessEvent(TAutoPtr<IEventHandle>& ev, bool suppressCommonErrors = false) { switch (ev->GetTypeRewrite()) { -#define CHECK(T) case TEvBlobStorage::T::EventType: return CheckForTermErrors(reinterpret_cast<TEvBlobStorage::T::TPtr&>(ev)) +#define CHECK(T) case TEvBlobStorage::T::EventType: return CheckForTermErrors(reinterpret_cast<TEvBlobStorage::T::TPtr&>(ev), suppressCommonErrors) CHECK(TEvVPutResult); CHECK(TEvVMultiPutResult); CHECK(TEvVGetResult); @@ -351,17 +354,23 @@ public: return false; } - void CountPuts(const TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>>& q) { + template<typename TEv> + void CountPut(const std::unique_ptr<TEv>& ev) { + ++GeneratedSubrequests; + GeneratedSubrequestBytes += ev->GetBufferBytes(); + } + + template<typename TEv> + void CountPuts(const TDeque<std::unique_ptr<TEv>>& q) { for (const auto& item : q) { - ++GeneratedSubrequests; - GeneratedSubrequestBytes += item->GetBufferBytes(); + CountPut(item); } } - void CountPuts(const TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>>& q) { + template<typename... TOptions> + void CountPuts(const TDeque<std::variant<TOptions...>>& q) { for (const auto& item : q) { - ++GeneratedSubrequests; - GeneratedSubrequestBytes += item->GetBufferBytes(); + std::visit([&](auto& item) { CountPut(item); }, item); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index b7c8a9a51fd..09144e50e80 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -224,24 +224,17 @@ bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlob TString TBlobState::ToString() const { TStringStream str; - str << "{Id# " << Id.ToString(); - str << Endl; - str << " Whole# " << Whole.ToString(); - str << Endl; - str << " WholeSituation# " << SituationToString(WholeSituation); - str << Endl; + str << "{Id# " << Id.ToString() << Endl; + str << " IsChanged# " << IsChanged << Endl; + str << " Whole# " << Whole.ToString() << Endl; + str << " WholeSituation# " << SituationToString(WholeSituation) << Endl; for (ui32 i = 0; i < Parts.size(); ++i) { - str << Endl; - str << " Parts[" << i << "]# " << Parts[i].ToString(); - str << Endl; + str << Endl << " Parts[" << i << "]# " << Parts[i].ToString() << Endl; } for (ui32 i = 0; i < Disks.size(); ++i) { - str << Endl; - str << " Disks[" << i << "]# " << Disks[i].ToString(); - str << Endl; + str << Endl << " Disks[" << i << "]# " << Disks[i].ToString() << Endl; } - str << " BlobIdx# " << (ui32)BlobIdx; - str << Endl; + str << " BlobIdx# " << (ui32)BlobIdx << Endl; str << "}"; return str.Str(); } @@ -326,11 +319,9 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i } void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, - TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, - NWilson::TSpan *span, ui8 blobIdx) { + TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) { Y_ABORT_UNLESS(diskOrderNumber < DiskRequestsForOrderNumber.size()); - DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, - extraBlockChecks, span, blobIdx); + DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, blobIdx); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -446,16 +437,15 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec break; case EStrategyOutcome::DONE: - if (expired && !blob.HasWrittenQuorum(*Info, *expired)) { - blob.IsChanged = true; - status = NKikimrProto::UNKNOWN; - } break; } if (status != NKikimrProto::OK) { break; } } + if (status == NKikimrProto::OK && expired && !blob.HasWrittenQuorum(*Info, *expired)) { + status = NKikimrProto::UNKNOWN; + } if (status != NKikimrProto::UNKNOWN) { const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++)); Y_ABORT_UNLESS(inserted); @@ -538,19 +528,8 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T } } -void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, - NWilson::TSpan *span) { - TBlobState& state = (*this)[id]; - if (!state.ExtraBlockChecks) { - state.ExtraBlockChecks = extraBlockChecks; - } else { - Y_ABORT_UNLESS(state.ExtraBlockChecks == extraBlockChecks); - } - if (!state.Span) { - state.Span = span; - } else { - Y_ABORT_UNLESS(state.Span == span); - } +void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id) { + (*this)[id]; } TBlobState& TBlackboard::operator [](const TLogoBlobID& id) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 993e5d7d98e..003ba6dc819 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -81,11 +81,9 @@ struct TBlobState { TStackVec<TState, TypicalPartsInBlob> Parts; TStackVec<TDisk, TypicalDisksInSubring> Disks; TVector<TEvBlobStorage::TEvGetResult::TPartMapItem> PartMap; - ui8 BlobIdx; NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; + ui8 BlobIdx; bool IsChanged = false; - std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr; - NWilson::TSpan *Span = nullptr; bool Keep = false; bool DoNotKeep = false; @@ -137,18 +135,13 @@ struct TDiskPutRequest { EPutReason Reason; bool IsHandoff; ui8 BlobIdx; - std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks; - NWilson::TSpan *Span; - TDiskPutRequest(const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, - std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx) + TDiskPutRequest(const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx) : Id(id) , Buffer(std::move(buffer)) , Reason(reason) , IsHandoff(isHandoff) , BlobIdx(blobIdx) - , ExtraBlockChecks(extraBlockChecks) - , Span(span) {} }; @@ -166,8 +159,7 @@ struct TGroupDiskRequests { void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet); void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size); void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer, - TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, - NWilson::TSpan *span, ui8 blobIdx); + TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx); }; struct TBlackboard; @@ -228,6 +220,7 @@ struct TBlackboard { NKikimrBlobStorage::EVDiskQueueId queueId, ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstOrderNumber) const; TString ToString() const; + void ChangeAll() { for (auto &[id, blob] : BlobStates) { blob.IsChanged = true; @@ -236,7 +229,7 @@ struct TBlackboard { void InvalidatePartStates(ui32 orderNumber); - void RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span); + void RegisterBlobForPut(const TLogoBlobID& id); TBlobState& operator [](const TLogoBlobID& id); }; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 6bb96f266a2..067090088c3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -374,7 +374,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx, } bytes += put.Buffer.size(); lastItemCount++; - vMultiPut->AddVPut(put.Id, TRcBuf(TRope(put.Buffer)), &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); + vMultiPut->AddVPut(put.Id, TRcBuf(TRope(put.Buffer)), &cookie, nullptr, NWilson::TTraceId()); } vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests)); ++VMultiPutRequests; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp index 6a189ea6c72..24bbbb04563 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp @@ -87,6 +87,12 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters> ConnectedMinus2 = group->GetCounter("ConnectedMinus2", false); ConnectedMinus3more = group->GetCounter("ConnectedMinus3more", false); } + // wipe monitoring counters + { + auto group = Group->GetSubgroup("subsystem", "wipemon"); + PutStatusQueries = group->GetCounter("StatusQueries", true); + IncarnationChanges = group->GetCounter("IncarnationChanges", true); + } } ui32 IdxForType(NPDisk::EDeviceType type) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h index 2681b866817..9d8702f28bd 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h @@ -95,6 +95,10 @@ struct TDsProxyNodeMon : public TThrRefBase { ::NMonitoring::TDynamicCounters::TCounterPtr ConnectedMinus2; ::NMonitoring::TDynamicCounters::TCounterPtr ConnectedMinus3more; + // wipe monitoring counters + ::NMonitoring::TDynamicCounters::TCounterPtr PutStatusQueries; + ::NMonitoring::TDynamicCounters::TCounterPtr IncarnationChanges; + TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters> &counters, bool initForAllDeviceTypes); void CountPutPesponseTime(NPDisk::EDeviceType type, NKikimrBlobStorage::EPutHandleClass cls, ui32 size, TDuration duration); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 7480e3bc428..a092e45872a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -73,14 +73,16 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt const bool IsMultiPutMode; - bool RequireExtraBlockChecks = false; + bool Done = false; struct TIncarnationRecord { - ui64 IncarnationGuid; - TMonotonic ExpirationTimestamp; - TMonotonic StatusIssueTimestamp; + ui64 IncarnationGuid = 0; + TMonotonic ExpirationTimestamp = TMonotonic::Max(); + TMonotonic StatusIssueTimestamp = TMonotonic::Zero(); // zero means no message in flight }; - std::vector<std::optional<TIncarnationRecord>> IncarnationRecords; + std::vector<TIncarnationRecord> IncarnationRecords; + + TBlobStorageGroupInfo::TGroupVDisks ExpiredVDiskSet; void SanityCheck() { if (RequestsSent <= MaxSaneRequests) { @@ -101,33 +103,29 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt SanityCheck(); // May Die } - template<typename TEvent, typename TCookie> - void SendPutsImpl(auto&& callback, TPutImpl::TPutResultVec& putResults) { - TDeque<std::unique_ptr<TEvent>> v; - callback(v, putResults); - UpdatePengingVDiskResponseCount<TEvent, TCookie>(v); - RequestsSent += v.size(); - CountPuts(v); - SendToQueues(v, TimeStatsEnabled); - } + bool Action() { + UpdateExpiredVDiskSet(); - template<bool VPut = true, bool VMultiPut = true, typename T> - bool SendPuts(T&& callback) { TPutImpl::TPutResultVec putResults; - if (IsMultiPutMode) { - if constexpr (VMultiPut) { - SendPutsImpl<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(std::forward<T>(callback), putResults); - } else { - Y_ABORT(); - } - } else { - if constexpr (VPut) { - SendPutsImpl<TEvBlobStorage::TEvVPut, TBlobCookie>(std::forward<T>(callback), putResults); - } else { - Y_ABORT(); - } + PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet); + if (ReplyAndDieWithLastResponse(putResults)) { + return true; } - return ReplyAndDieWithLastResponse(putResults); + + // Generate new VPut/VMultiPut events to disks. + auto events = PutImpl.GeneratePutRequests(); + + // Count them properly. + UpdatePengingVDiskResponseCount(events); + CountPuts(events); + + // Send to VDisks. + for (auto& ev : events) { + std::visit([&](auto& ev) { SendToQueue(std::move(ev), 0, TimeStatsEnabled); }, ev); + ++RequestsSent; + } + + return false; } void Accelerate() { @@ -136,65 +134,61 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } IsAccelerated = true; - const TMonotonic now = TActivationContext::Monotonic(); - - auto callback = [&](auto& v, auto& /*putResults*/) { - PutImpl.Accelerate(LogCtx, v); - *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size(); - }; - SendPuts(callback); - - IssueStatusForExpiredDisks(now); + PutImpl.Accelerate(LogCtx); + Action(); +// *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size(); } void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) { timestamp += TDuration::Seconds(15); // TODO: cooldown timeout Y_ABORT_UNLESS(orderNumber < IncarnationRecords.size()); - if (auto& record = IncarnationRecords[orderNumber]; !record) { - record = TIncarnationRecord{ - .IncarnationGuid = incarnationGuid, - .ExpirationTimestamp = timestamp, - }; - } else if (record->IncarnationGuid != incarnationGuid) { + auto& record = IncarnationRecords[orderNumber]; + + if (record.IncarnationGuid == 0 && record.ExpirationTimestamp == TMonotonic::Max()) { // empty record + record.ExpirationTimestamp = TMonotonic::Zero(); + } else if (record.IncarnationGuid != incarnationGuid) { PutImpl.InvalidatePartStates(orderNumber); - record->IncarnationGuid = incarnationGuid; - record->ExpirationTimestamp = timestamp; - } else if (record->ExpirationTimestamp < timestamp) { - record->ExpirationTimestamp = timestamp; + ++*Mon->NodeMon->IncarnationChanges; } + + record.IncarnationGuid = incarnationGuid; + record.ExpirationTimestamp = Max(timestamp, record.ExpirationTimestamp); } - TBlobStorageGroupInfo::TGroupVDisks CreateExpiredVDiskSet(TMonotonic timestamp) const { - TBlobStorageGroupInfo::TGroupVDisks res(&Info->GetTopology()); + void UpdateExpiredVDiskSet() { + const TMonotonic now = TActivationContext::Monotonic(); + + TBlobStorageGroupInfo::TGroupVDisks expired(&Info->GetTopology()); for (ui32 i = 0; i < IncarnationRecords.size(); ++i) { - if (auto& record = IncarnationRecords[i]; record && record->ExpirationTimestamp <= timestamp) { - res |= {&Info->GetTopology(), Info->GetVDiskId(i)}; + if (IncarnationRecords[i].ExpirationTimestamp < now) { + expired |= {&Info->GetTopology(), Info->GetVDiskId(i)}; } } - return res; + if (expired != ExpiredVDiskSet) { // expired set has changed + PutImpl.ChangeAll(); + } + ExpiredVDiskSet = expired; } - void IssueStatusForExpiredDisks(TMonotonic timestamp) { - bool issue = false; + void IssueStatusForExpiredDisks() { + const TMonotonic now = TActivationContext::Monotonic(); for (ui32 i = 0; i < IncarnationRecords.size(); ++i) { - if (auto& record = IncarnationRecords[i]; record && record->ExpirationTimestamp <= timestamp && - record->StatusIssueTimestamp == TMonotonic()) { - issue = true; - break; + auto& record = IncarnationRecords[i]; + if (now + TDuration::Seconds(5) < record.ExpirationTimestamp) { + continue; // not expired yet } - } - if (!issue) { - return; - } - for (ui32 i = 0; i < IncarnationRecords.size(); ++i) { - if (auto& record = IncarnationRecords[i]; record && record->StatusIssueTimestamp == TMonotonic()) { - const TVDiskID vdiskId = Info->GetVDiskId(i); - A_LOG_INFO_S("BPP15", "sending TEvVStatus VDiskId# " << vdiskId); - SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), i); - ++StatusMsgsSent; - record->StatusIssueTimestamp = timestamp; + if (record.StatusIssueTimestamp != TMonotonic()) { + continue; // TEvVStatus is already in flight } + + const TVDiskID vdiskId = Info->GetVDiskId(i); + A_LOG_INFO_S("BPP15", "sending TEvVStatus VDiskId# " << vdiskId); + SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), i); + ++StatusMsgsSent; + record.StatusIssueTimestamp = now; + + ++*Mon->NodeMon->PutStatusQueries; } } @@ -204,134 +198,120 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt ProcessReplyFromQueue(ev); ++StatusResultMsgsReceived; - const TMonotonic now = TActivationContext::Monotonic(); - auto& record = ev->Get()->Record; const ui32 orderNumber = ev->Cookie; auto& incarnationRecord = IncarnationRecords[orderNumber]; - Y_ABORT_UNLESS(incarnationRecord); - const TMonotonic issue = std::exchange(incarnationRecord->StatusIssueTimestamp, TMonotonic()); + Y_ABORT_UNLESS(incarnationRecord.IncarnationGuid); + Y_ABORT_UNLESS(incarnationRecord.ExpirationTimestamp != TMonotonic::Max()); + const TMonotonic issue = std::exchange(incarnationRecord.StatusIssueTimestamp, TMonotonic()); Y_ABORT_UNLESS(issue != TMonotonic()); + if (record.HasIncarnationGuid()) { HandleIncarnation(issue, orderNumber, record.GetIncarnationGuid()); } - auto callback = [&](auto& v, auto& putResults) { - PutImpl.Step(LogCtx, v, putResults, CreateExpiredVDiskSet(now)); - }; - if (SendPuts(callback)) { + if (Action()) { return; } - - IssueStatusForExpiredDisks(now); } void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) { - A_LOG_LOG_S(false, ev->Get()->Record.GetStatus() == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_NOTICE, + A_LOG_LOG_S(false, ev->Get()->Record.GetStatus() == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_INFO, "BPP01", "received " << ev->Get()->ToString() << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID())); ProcessReplyFromQueue(ev); ResponsesReceived++; - const TMonotonic now = TActivationContext::Monotonic(); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; ev->Get()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs); const NKikimrBlobStorage::TEvVPutResult &record = ev->Get()->Record; - const TLogoBlobID blob = LogoBlobIDFromLogoBlobID(record.GetBlobID()); - const TLogoBlobID origBlobId = TLogoBlobID(blob, 0); - Y_ABORT_UNLESS(record.HasCookie()); - TBlobCookie cookie(record.GetCookie()); - const ui64 idx = cookie.GetBlobIdx(); - const ui64 vdisk = cookie.GetVDiskOrderNumber(); + const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(record.GetBlobID()); + Y_ABORT_UNLESS(record.HasVDiskID()); + TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); + const TVDiskIdShort shortId(vdiskId); + const ui32 vdisk = Info->GetOrderNumber(shortId); const NKikimrProto::EReplyStatus status = record.GetStatus(); + const size_t blobIdx = PutImpl.GetBlobIdx(blobId); - Y_ABORT_UNLESS(vdisk < WaitingVDiskResponseCount.size(), "blobIdx# %" PRIu64 " vdisk# %" PRIu64, idx, vdisk); + Y_ABORT_UNLESS(vdisk < WaitingVDiskResponseCount.size(), "blobIdx# %zu vdisk# %" PRIu32, blobIdx, vdisk); if (WaitingVDiskResponseCount[vdisk] == 1) { WaitingVDiskCount--; } WaitingVDiskResponseCount[vdisk]--; - Y_ABORT_UNLESS(idx < PutImpl.Blobs.size()); - Y_ABORT_UNLESS(origBlobId == PutImpl.Blobs[idx].BlobId); if (TimeStatsEnabled && record.GetMsgQoS().HasExecTimeStats()) { - TimeStats.ApplyPut(PutImpl.Blobs[idx].BufferSize, record.GetMsgQoS().GetExecTimeStats()); + TimeStats.ApplyPut(PutImpl.Blobs[blobIdx].BufferSize, record.GetMsgQoS().GetExecTimeStats()); } - Y_ABORT_UNLESS(record.HasVDiskID()); - TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); - const TVDiskIdShort shortId(vDiskId); - if (record.HasIncarnationGuid()) { // TODO: correct timestamp - HandleIncarnation(now, Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); + HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); } - LWPROBE(DSProxyVDiskRequestDuration, TEvBlobStorage::EvVPut, blob.BlobSize(), blob.TabletID(), - Info->GroupID, blob.Channel(), Info->GetFailDomainOrderNumber(shortId), + LWPROBE(DSProxyVDiskRequestDuration, TEvBlobStorage::EvVPut, blobId.BlobSize(), blobId.TabletID(), + Info->GroupID, blobId.Channel(), Info->GetFailDomainOrderNumber(shortId), GetStartTime(record.GetTimestamps()), GetTotalTimeMs(record.GetTimestamps()), GetVDiskTimeMs(record.GetTimestamps()), GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), NKikimrBlobStorage::EPutHandleClass_Name(PutImpl.GetPutHandleClass()), NKikimrProto::EReplyStatus_Name(status)); - if (RootCauseTrack.IsOn) { - RootCauseTrack.OnReply(cookie.GetCauseIdx(), - GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), - GetVDiskTimeMs(record.GetTimestamps())); - } + //if (RootCauseTrack.IsOn) { + // RootCauseTrack.OnReply(cookie.GetCauseIdx(), + // GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), + // GetVDiskTimeMs(record.GetTimestamps())); + //} - auto callback = [&](auto& v, auto& putResults) { - PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults, CreateExpiredVDiskSet(now)); - }; - if (SendPuts<true, false>(callback)) { - return; + if (status == NKikimrProto::BLOCKED || status == NKikimrProto::DEADLINE) { + TString error = TStringBuilder() << "Got VPutResult status# " << status << " from VDiskId# " << vdiskId; + TPutImpl::TPutResultVec putResults; + PutImpl.PrepareOneReply(status, blobId.FullID(), blobIdx, LogCtx, std::move(error), putResults); + ReplyAndDieWithLastResponse(putResults); + } else { + PutImpl.ProcessResponse(*ev->Get()); + if (Action()) { + return; + } + AccelerateIfNeeded(); + SanityCheck(); // May Die } - AccelerateIfNeeded(); - SanityCheck(); // May Die - - IssueStatusForExpiredDisks(now); } void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) { ProcessReplyFromQueue(ev); ResponsesReceived++; - const TMonotonic now = TActivationContext::Monotonic(); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; ev->Get()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs); const NKikimrBlobStorage::TEvVMultiPutResult &record = ev->Get()->Record; + Y_ABORT_UNLESS(record.HasVDiskID()); + const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); + const TVDiskIdShort shortId(vdiskId); + const ui32 vdisk = Info->GetOrderNumber(shortId); + const NKikimrProto::EReplyStatus status = record.GetStatus(); if (TimeStatsEnabled && record.GetMsgQoS().HasExecTimeStats()) { TimeStats.ApplyPut(RequestBytes, record.GetMsgQoS().GetExecTimeStats()); } - Y_ABORT_UNLESS(record.HasVDiskID()); - TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); - const TVDiskIdShort shortId(vDiskId); - if (record.HasIncarnationGuid()) { // TODO: correct timestamp - HandleIncarnation(now, Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); + HandleIncarnation(TActivationContext::Monotonic(), vdisk, record.GetIncarnationGuid()); } - Y_ABORT_UNLESS(record.HasCookie()); - TVMultiPutCookie cookie(record.GetCookie()); - const ui64 vdisk = cookie.GetVDiskOrderNumber(); - const NKikimrProto::EReplyStatus status = record.GetStatus(); - auto prio = NLog::PRI_DEBUG; if (status != NKikimrProto::OK) { prio = NLog::PRI_INFO; - } - for (const auto& item : record.GetItems()) { - if (item.GetStatus() != NKikimrProto::OK) { - prio = NLog::PRI_INFO; + } else { + for (const auto& item : record.GetItems()) { + if (item.GetStatus() != NKikimrProto::OK) { + prio = NLog::PRI_INFO; + } } } - A_LOG_LOG_S(false, prio, "BPP02", "received " << ev->Get()->ToString() - << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID())); + A_LOG_LOG_S(false, prio, "BPP02", "received " << ev->Get()->ToString() << " from# " << vdiskId); - Y_ABORT_UNLESS(vdisk < WaitingVDiskResponseCount.size(), " vdisk# %" PRIu64, vdisk); + Y_ABORT_UNLESS(vdisk < WaitingVDiskResponseCount.size(), " vdisk# %" PRIu32, vdisk); if (WaitingVDiskResponseCount[vdisk] == 1) { WaitingVDiskCount--; } @@ -354,43 +334,36 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } // Handle put results - bool isCauseRegistered = !RootCauseTrack.IsOn; + //bool isCauseRegistered = !RootCauseTrack.IsOn; TPutImpl::TPutResultVec putResults; for (auto &item : record.GetItems()) { - if (!isCauseRegistered) { - isCauseRegistered = RootCauseTrack.OnReply(cookie.GetCauseIdx(), - GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), - GetVDiskTimeMs(record.GetTimestamps())); - } + //if (!isCauseRegistered) { + // isCauseRegistered = RootCauseTrack.OnReply(cookie.GetCauseIdx(), + // GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()), + // GetVDiskTimeMs(record.GetTimestamps())); + //} Y_ABORT_UNLESS(item.HasStatus()); Y_ABORT_UNLESS(item.HasBlobID()); - Y_ABORT_UNLESS(item.HasCookie()); - ui64 blobIdx = TBlobCookie(item.GetCookie()).GetBlobIdx(); - NKikimrProto::EReplyStatus itemStatus = item.GetStatus(); - TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); + const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); + size_t blobIdx = PutImpl.GetBlobIdx(blobId); + const NKikimrProto::EReplyStatus itemStatus = item.GetStatus(); Y_ABORT_UNLESS(itemStatus != NKikimrProto::RACE); // we should get RACE for the whole request and handle it in CheckForTermErrors if (itemStatus == NKikimrProto::BLOCKED || itemStatus == NKikimrProto::DEADLINE) { - TStringStream errorReason; - errorReason << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vDiskId; - ErrorReason = errorReason.Str(); - PutImpl.PrepareOneReply(itemStatus, TLogoBlobID(blobId, 0), blobIdx, LogCtx, ErrorReason, putResults); + ErrorReason = TStringBuilder() << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vdiskId; + PutImpl.PrepareOneReply(itemStatus, blobId.FullID(), blobIdx, LogCtx, ErrorReason, putResults); } } if (ReplyAndDieWithLastResponse(putResults)) { return; } - auto callback = [&](auto& v, auto& putResults) { - PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults, CreateExpiredVDiskSet(now)); - }; - if (SendPuts<false, true>(callback)) { + PutImpl.ProcessResponse(*ev->Get()); + if (Action()) { return; } AccelerateIfNeeded(); SanityCheck(); // May Die - - IssueStatusForExpiredDisks(now); } void AccelerateIfNeeded() { @@ -415,16 +388,18 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt void ReplyAndDie(NKikimrProto::EReplyStatus status) { TPutImpl::TPutResultVec putResults; PutImpl.PrepareReply(status, LogCtx, ErrorReason, putResults); - Y_ABORT_UNLESS(ReplyAndDieWithLastResponse(putResults)); + const bool done = ReplyAndDieWithLastResponse(putResults); + Y_ABORT_UNLESS(done); } - bool ReplyAndDieWithLastResponse(TPutImpl::TPutResultVec &putResults) { + bool ReplyAndDieWithLastResponse(TPutImpl::TPutResultVec& putResults) { for (auto& [blobIdx, result] : putResults) { Y_ABORT_UNLESS(ResponsesSent != PutImpl.Blobs.size()); SendReply(std::move(result), blobIdx); } if (ResponsesSent == PutImpl.Blobs.size()) { PassAway(); + Done = true; return true; } return false; @@ -544,8 +519,8 @@ public: , IsAccelerated(false) , IsAccelerateScheduled(false) , IsMultiPutMode(false) - , RequireExtraBlockChecks(!ev->ExtraBlockChecks.empty()) , IncarnationRecords(info->GetTotalVDisksNum()) + , ExpiredVDiskSet(&info->GetTopology()) { if (ev->Orbit.HasShuttles()) { RootCauseTrack.IsOn = true; @@ -589,6 +564,7 @@ public: , IsAccelerateScheduled(false) , IsMultiPutMode(true) , IncarnationRecords(info->GetTotalVDisksNum()) + , ExpiredVDiskSet(&info->GetTopology()) { Y_DEBUG_ABORT_UNLESS(events.size() <= MaxBatchedPutRequests); for (auto &ev : events) { @@ -597,9 +573,6 @@ public: if (msg.Orbit.HasShuttles()) { RootCauseTrack.IsOn = true; } - if (!msg.ExtraBlockChecks.empty()) { - RequireExtraBlockChecks = true; - } } RequestBytes = 0; @@ -694,28 +667,26 @@ public: Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup); } - template <typename TPutEvent, typename TCookie> - void UpdatePengingVDiskResponseCount(const TDeque<std::unique_ptr<TPutEvent>> &putEvents) { - for (auto &event : putEvents) { - Y_ABORT_UNLESS(event->Record.HasCookie()); - TCookie cookie(event->Record.GetCookie()); - if (RootCauseTrack.IsOn) { - cookie.SetCauseIdx(RootCauseTrack.RegisterCause()); - event->Record.SetCookie(cookie); - } - ui64 vdisk = cookie.GetVDiskOrderNumber(); - Y_ABORT_UNLESS(vdisk < WaitingVDiskResponseCount.size()); - if (!WaitingVDiskResponseCount[vdisk]) { - WaitingVDiskCount++; - } - WaitingVDiskResponseCount[vdisk]++; + void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) { + for (auto& event : putEvents) { + std::visit([&](auto& event) { + //Y_ABORT_UNLESS(event->Record.HasCookie()); + //TCookie cookie(event->Record.GetCookie()); + //if (RootCauseTrack.IsOn) { + // cookie.SetCauseIdx(RootCauseTrack.RegisterCause()); + // event->Record.SetCookie(cookie); + //} + const ui32 orderNumber = Info->GetOrderNumber(VDiskIDFromVDiskID(event->Record.GetVDiskID())); + Y_ABORT_UNLESS(orderNumber < WaitingVDiskResponseCount.size()); + WaitingVDiskCount += !WaitingVDiskResponseCount[orderNumber]++; + }, event); } } void ResumeBootstrap() { if (EncodeQuantum()) { - auto callback = [this](auto& v, auto& /*putResults*/) { PutImpl.GenerateInitialRequests(LogCtx, PartSets, v); }; - SendPuts(callback); + PutImpl.GenerateInitialRequests(LogCtx, PartSets); + Action(); BootstrapInProgress = false; } else { TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0)); @@ -732,15 +703,12 @@ public: s << ' '; } s << i; - if (auto& record = IncarnationRecords[i]) { - s << '{'; - s << "IncarnationGuid# " << record->IncarnationGuid; - s << " ExpirationTimestamp# " << record->ExpirationTimestamp; - s << " StatusIssueTimestamp# " << record->StatusIssueTimestamp; - s << '}'; - } else { - s << "<null>"; - } + auto& record = IncarnationRecords[i]; + s << '{'; + s << "IncarnationGuid# " << record.IncarnationGuid; + s << " ExpirationTimestamp# " << record.ExpirationTimestamp; + s << " StatusIssueTimestamp# " << record.StatusIssueTimestamp; + s << '}'; } s << '}'; return s.Str(); @@ -761,13 +729,13 @@ public: << " StatusResultMsgsReceived# " << StatusResultMsgsReceived << " Now# " << now << " Passed# " << (now - StartTime) - << " ExpiredVDiskSet# " << CreateExpiredVDiskSet(now).ToString() + << " ExpiredVDiskSet# " << ExpiredVDiskSet.ToString() << " IncarnationRecords# " << dumpIncarnationRecords() << " State# " << PutImpl.DumpFullState()); } STATEFN(StateWait) { - if (ProcessEvent(ev)) { + if (ProcessEvent(ev, IsManyPuts)) { return; } const ui32 type = ev->GetTypeRewrite(); @@ -778,6 +746,12 @@ public: hFunc(TEvAccelerate, Handle); cFunc(TEvBlobStorage::EvResume, ResumeBootstrap); hFunc(TKikimrEvents::TEvWakeup, Handle); + + default: + Y_DEBUG_ABORT_UNLESS(false, "unexpected event Type# 0x%08" PRIx32, type); + } + if (!Done) { + IssueStatusForExpiredDisks(); } CheckRequests(type); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index ebc79754c3e..81f56bed536 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -11,7 +11,7 @@ namespace NKikimr { using TPutResultVec = TPutImpl::TPutResultVec; -bool TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { +void TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { switch (Info->Type.GetErasure()) { case TBlobStorageGroupType::ErasureMirror3dc: return RunStrategy(logCtx, TPut3dcStrategy(Tactic, EnableRequestMod3x3ForMinLatecy), outPutResults, expired); @@ -22,15 +22,11 @@ bool TPutImpl::RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, } } -bool TPutImpl::RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, +void TPutImpl::RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { TBatchedVec<TBlackboard::TBlobStates::value_type*> finished; const EStrategyOutcome outcome = Blackboard.RunStrategy(logCtx, strategy, &finished, &expired); - if (finished) { - PrepareReply(logCtx, outcome.ErrorReason, finished, outPutResults); - return true; - } - return false; + PrepareReply(logCtx, outcome.ErrorReason, finished, outPutResults); } NLog::EPriority GetPriorityForReply(TAtomicLogPriorityMuteChecker<NLog::PRI_ERROR, NLog::PRI_DEBUG> &checker, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index ceac56b525d..8f67e9ee6fb 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -20,6 +20,9 @@ class TPutImpl { public: using TPutResultVec = TBatchedVec<std::pair<ui64, std::unique_ptr<TEvBlobStorage::TEvPutResult>>>; + using TPutEvent = std::variant<std::unique_ptr<TEvBlobStorage::TEvVPut>, + std::unique_ptr<TEvBlobStorage::TEvVMultiPut>>; + private: TBlobStorageGroupInfo::TServiceIds VDisksSvc; TBlobStorageGroupInfo::TVDiskIds VDisksId; @@ -93,16 +96,12 @@ private: }; TBatchedVec<TBlobInfo> Blobs; + std::unordered_map<TLogoBlobID, size_t> BlobMap; friend class TBlobStorageGroupPutRequest; - TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses; - TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses; - bool IsInitialized = false; - TString ErrorDescription; - friend void ::Out<TBlobInfo>(IOutputStream&, const TBlobInfo&); public: @@ -120,6 +119,7 @@ public: , EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy) , Tactic(ev->Tactic) { + BlobMap.emplace(ev->Id, Blobs.size()); Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit), std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay)); @@ -149,6 +149,7 @@ public: auto& msg = *ev->Get(); Y_ABORT_UNLESS(msg.HandleClass == putHandleClass); Y_ABORT_UNLESS(msg.Tactic == tactic); + BlobMap.emplace(msg.Id, Blobs.size()); Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId), std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay)); Deadline = Max(Deadline, msg.Deadline); @@ -170,16 +171,14 @@ public: return HandoffPartsSent; } - template <typename TVPutEvent> - void GenerateInitialRequests(TLogContext &logCtx, TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>>& partSets, - TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) { + void GenerateInitialRequests(TLogContext &logCtx, TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>>& partSets) { Y_UNUSED(logCtx); Y_VERIFY_S(partSets.size() == Blobs.size(), "partSets.size# " << partSets.size() << " Blobs.size# " << Blobs.size()); const ui32 totalParts = Info->Type.TotalPartCount(); for (ui64 blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) { TBlobInfo& blob = Blobs[blobIdx]; - Blackboard.RegisterBlobForPut(blob.BlobId, &blob.ExtraBlockChecks, &blob.Span); + Blackboard.RegisterBlobForPut(blob.BlobId); for (ui32 i = 0; i < totalParts; ++i) { if (Info->Type.PartSize(TLogoBlobID(blob.BlobId, i + 1))) { Blackboard.AddPartToPut(blob.BlobId, i, TRope(partSets[blobIdx][i])); @@ -187,208 +186,7 @@ public: } Blackboard.MarkBlobReadyToPut(blob.BlobId, blobIdx); } - - TPutResultVec putResults; - Step(logCtx, outVPuts, putResults, {&Info->GetTopology()}); IsInitialized = true; - Y_ABORT_UNLESS(!outVPuts.empty()); - Y_ABORT_UNLESS(putResults.empty()); - } - - template <typename TEvent> - bool MarkRequest(const TEvent &event, ui32 orderNumber) { - constexpr bool isVPut = std::is_same_v<TEvent, TEvBlobStorage::TEvVPutResult>; - constexpr bool isVMultiPut = std::is_same_v<TEvent, TEvBlobStorage::TEvVMultiPutResult>; - static_assert(isVPut || isVMultiPut); - - using TCookie = std::conditional_t<isVPut, TBlobCookie, TVMultiPutCookie>; - - auto responses = isVPut ? ReceivedVPutResponses : ReceivedVMultiPutResponses; - auto putType = std::is_same_v<TCookie, TBlobCookie> ? "TEvVPut" : "TEvVMultiPut"; - - const auto &record = event.Record; - if (!record.HasCookie()) { - ErrorDescription = TStringBuilder() << putType << " doesn't have cookie"; - return true; - } - TCookie cookie(record.GetCookie()); - if (cookie.GetVDiskOrderNumber() != orderNumber) { - ErrorDescription = TStringBuilder() << putType << " has wrong cookie; unexpected orderNumber;" - << " expected# " << orderNumber - << " received# " << cookie.GetVDiskOrderNumber() - << " cookie# " << ui64(cookie); - return true; - } - - ui64 requestIdx = cookie.GetRequestIdx(); - if (responses[requestIdx]) { - ErrorDescription = TStringBuilder() << putType << "is recieved twice" - << " Event# " << event.ToString() - << " State# " << DumpFullState(); - return true; - } - responses[requestIdx] = true; - return false; - } - - void PackToStringImpl(TStringBuilder &) { - } - - template <typename TArg, typename... TArgs> - void PackToStringImpl(TStringBuilder &builder, TArg arg, TArgs... args) { - builder << arg; - PackToStringImpl(builder, args...); - } - - template <typename... TArgs> - TString PackToString(TArgs... args) { - TStringBuilder builder; - PackToStringImpl(builder, args...); - return builder; - } - - template <typename TPutRecord, typename ...TArgs> - bool ProcessOneVPutResult(TLogContext &logCtx, const TPutRecord &record, TVDiskID vDiskId, ui32 orderNumber, - TArgs ...resultTypeArgs) - { - Y_ABORT_UNLESS(record.HasStatus()); - Y_ABORT_UNLESS(record.HasBlobID()); - Y_ABORT_UNLESS(record.HasCookie()); - NKikimrProto::EReplyStatus status = record.GetStatus(); - TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(record.GetBlobID()); - ui32 diskIdx = Info->GetTopology().GetIdxInSubgroup(vDiskId, blobId.Hash()); - - TBlobCookie cookie(record.GetCookie()); - if (cookie.GetPartId() != blobId.PartId()) { - ErrorDescription = TStringBuilder() - << PackToString(resultTypeArgs...) << " has wrong cookie; unexpected PartId;" - << " expected# " << blobId.PartId() - << " received# " << cookie.GetPartId() - << " cookie# " << ui64(cookie); - return true; - } - if (cookie.GetVDiskOrderNumber() != orderNumber) { - ErrorDescription = TStringBuilder() - << PackToString(resultTypeArgs...) << " has wrong cookie; unexpected orderNumber;" - << " expected# " << orderNumber - << " received# " << cookie.GetVDiskOrderNumber() - << " cookie# " << ui64(cookie); - return true; - } - - ui64 blobIdx = cookie.GetBlobIdx(); - ui32 partIdx = blobId.PartId() - 1; - - A_LOG_LOG_SX(logCtx, false, PriorityForStatusInbound(status), "BPP11", - "Got " << PackToString(resultTypeArgs...) - << " part# " << partIdx - << " diskIdx# " << diskIdx - << " vDiskId# " << vDiskId - << " blob# " << blobId - << " status# " << status); - - if (IsDone.size() <= blobIdx) { - ErrorDescription = TStringBuilder() - << PackToString(resultTypeArgs...) << " has wrong cookie; unexpected blobIdx;" - << " received# " << cookie.GetVDiskOrderNumber() - << " blobCount# " << IsDone.size() - << " cookie# " << ui64(cookie); - return true; - } - - if (IsDone[blobIdx]) { - return false; - } - switch (status) { - case NKikimrProto::ERROR: - case NKikimrProto::VDISK_ERROR_STATE: - case NKikimrProto::OUT_OF_SPACE: - Blackboard.AddErrorResponse(blobId, orderNumber); - AtLeastOneResponseWasNotOk = true; - break; - case NKikimrProto::OK: - case NKikimrProto::ALREADY: - Blackboard.AddPutOkResponse(blobId, orderNumber); - WrittenBeyondBarrier[blobIdx] = record.GetWrittenBeyondBarrier(); - break; - default: - ErrorDescription = TStringBuilder() << "Unexpected status# " << status; - return true; - } - return false; - } - - template <typename TVPutEvent, typename TVPutEventResult> - void OnVPutEventResult(TLogContext &logCtx, TActorId sender, TVPutEventResult &ev, - TDeque<std::unique_ptr<TVPutEvent>> &outVPutEvents, TPutResultVec &outPutResults, - const TBlobStorageGroupInfo::TGroupVDisks& expired) - { - constexpr bool isVPut = std::is_same_v<TVPutEvent, TEvBlobStorage::TEvVPut>; - constexpr bool isVMultiPut = std::is_same_v<TVPutEvent, TEvBlobStorage::TEvVMultiPut>; - static_assert(isVPut || isVMultiPut); - auto putType = isVPut ? "TEvVPut" : "TEvVMultiPut"; - - auto &record = ev.Record; - Y_ABORT_UNLESS(record.HasStatus()); - NKikimrProto::EReplyStatus status = record.GetStatus(); - Y_ABORT_UNLESS(status != NKikimrProto::BLOCKED && status != NKikimrProto::RACE && status != NKikimrProto::DEADLINE); - if (record.HasStatusFlags()) { - StatusFlags.Merge(record.GetStatusFlags()); - } - if (record.HasApproximateFreeSpaceShare()) { - float share = record.GetApproximateFreeSpaceShare(); - if (ApproximateFreeSpaceShare == 0.f || share < ApproximateFreeSpaceShare) { - ApproximateFreeSpaceShare = share; - } - } - - Y_ABORT_UNLESS(record.HasVDiskID()); - TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); - TVDiskIdShort shortId(vDiskId); - ui32 orderNumber = Info->GetOrderNumber(shortId); - - Y_VERIFY_S(!MarkRequest(ev, orderNumber), ErrorDescription); - - if constexpr (isVPut) { - VPutResponses++; - bool error = ProcessOneVPutResult(logCtx, ev.Record, vDiskId, orderNumber, "TEvVPutResult"); - Y_VERIFY_S(!error, ErrorDescription); - } else { - TVMultiPutCookie cookie(record.GetCookie()); - if (cookie.GetItemCount() != record.ItemsSize()) { - ErrorDescription = TStringBuilder() << putType << " has wrong cookie; unexpected ItemCount;" - << " expected# " << record.ItemsSize() - << " received# " << cookie.GetItemCount() - << " cookie# " << ui64(cookie); - Y_VERIFY_S(false, ErrorDescription); - return; - } - - VMultiPutResponses++; - for (ui32 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) { - bool error = ProcessOneVPutResult(logCtx, ev.Record.GetItems(itemIdx), vDiskId, orderNumber, - "TEvVMultiPutResult item# ", itemIdx); - Y_VERIFY_S(!error, ErrorDescription); - } - A_LOG_LOG_SX(logCtx, false, PriorityForStatusInbound(status), "BPP23", "Got VMultiPutResult " - << " vDiskId# " << vDiskId - << " from# " << sender - << " status# " << status - << " vMultiPutResult# " << ev.ToString()); - } - - const auto &requests = isVPut ? VPutRequests : VMultiPutRequests; - const auto &responses = isVPut ? VPutResponses : VMultiPutResponses; - - if (Info->Type.GetErasure() == TBlobStorageGroupType::Erasure4Plus2Block - && !AtLeastOneResponseWasNotOk - && requests >= 6 && responses <= 4) { - // 6 == Info->Type.TotalPartCount() - // There is no need to run strategy since no new information is recieved - return; - } - - Step(logCtx, outVPutEvents, outPutResults, expired); } void PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logCtx, TString errorReason, @@ -400,8 +198,7 @@ public: ui64 GetTimeToAccelerateNs(TLogContext &logCtx); - template <typename TVPutEvent> - void Accelerate(TLogContext &logCtx, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts) { + void Accelerate(TLogContext &logCtx) { Blackboard.ChangeAll(); switch (Info->Type.GetErasure()) { case TBlobStorageGroupType::ErasureMirror3dc: @@ -414,7 +211,6 @@ public: Blackboard.RunStrategy(logCtx, TAcceleratePutStrategy()); break; } - PrepareVPuts(logCtx, outVPuts); } TString DumpFullState() const; @@ -427,99 +223,139 @@ public: Blackboard.InvalidatePartStates(orderNumber); } - template <typename TVPutEvent> - void Step(TLogContext &logCtx, TDeque<std::unique_ptr<TVPutEvent>> &outVPuts, TPutResultVec &outPutResults, - const TBlobStorageGroupInfo::TGroupVDisks& expired) { - if (RunStrategies(logCtx, outPutResults, expired)) { - Y_ABORT_UNLESS(outPutResults.size()); - } - PrepareVPuts(logCtx, outVPuts); + void ChangeAll() { + Blackboard.ChangeAll(); } -protected: - bool RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired); - bool RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, - const TBlobStorageGroupInfo::TGroupVDisks& expired); - - template <typename TVPutEvent> - void PrepareVPuts(TLogContext &logCtx, TDeque<std::unique_ptr<TVPutEvent>> &outVPutEvents) { - constexpr bool isVPut = std::is_same_v<TEvBlobStorage::TEvVPut, TVPutEvent>; - constexpr bool isVMultiPut = std::is_same_v<TEvBlobStorage::TEvVMultiPut, TVPutEvent>; - static_assert(isVPut || isVMultiPut); + void Step(TLogContext &logCtx, TPutResultVec& putResults, const TBlobStorageGroupInfo::TGroupVDisks& expired) { + RunStrategies(logCtx, putResults, expired); + } - const ui32 diskCount = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); - for (ui32 diskOrderNumber = 0; diskOrderNumber < diskCount; ++diskOrderNumber) { - const TDiskRequests &requests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber]; - ui32 endIdx = requests.PutsToSend.size(); - ui32 beginIdx = requests.FirstUnsentPutIdx; + TDeque<TPutEvent> GeneratePutRequests() { + TDeque<TPutEvent> events; - if (beginIdx >= endIdx) { - continue; + // Group put requests together by VDiskID. + std::unordered_multimap<ui32, TDiskPutRequest*> puts; + auto& perDiskRequests = Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber; + for (ui32 orderNumber = 0, numDisks = perDiskRequests.size(); orderNumber < numDisks; ++orderNumber) { + TDiskRequests& requests = perDiskRequests[orderNumber]; + while (requests.FirstUnsentPutIdx < requests.PutsToSend.size()) { + auto& putRequest = requests.PutsToSend[requests.FirstUnsentPutIdx++]; + puts.emplace(orderNumber, &putRequest); } + } + + // Generate queries to VDisks. + for (auto it = puts.begin(); it != puts.end(); ) { + auto [begin, end] = puts.equal_range(it->first); + Y_ABORT_UNLESS(it == begin); + + if (std::next(it) == end) { // TEvVPut + auto [orderNumber, ptr] = *it++; + auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(ptr->Id, ptr->Buffer, Info->GetVDiskId(orderNumber), + false, nullptr, Deadline, Blackboard.PutHandleClass); - TVDiskID vDiskId = Info->GetVDiskId(diskOrderNumber); + auto& record = ev->Record; + for (const auto& [tabletId, generation] : Blobs[ptr->BlobIdx].ExtraBlockChecks) { + auto *p = record.AddExtraBlockChecks(); + p->SetTabletId(tabletId); + p->SetGeneration(generation); + } - if constexpr (isVMultiPut) { - ui64 cookie = TVMultiPutCookie(diskOrderNumber, endIdx - beginIdx, VMultiPutRequests); - auto vMultiPut = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, Deadline, Blackboard.PutHandleClass, - false, &cookie); - vMultiPut->ReservePayload(endIdx - beginIdx); - outVPutEvents.push_back(std::move(vMultiPut)); + events.emplace_back(std::move(ev)); + HandoffPartsSent += ptr->IsHandoff; + ++VPutRequests; + } else { // TEvVMultiPut + auto ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(Info->GetVDiskId(it->first), Deadline, + Blackboard.PutHandleClass, false); + while (it != end) { + auto [orderNumber, ptr] = *it++; + ev->AddVPut(ptr->Id, TRcBuf(ptr->Buffer), nullptr, &Blobs[ptr->BlobIdx].ExtraBlockChecks, + Blobs[ptr->BlobIdx].Span.GetTraceId()); + HandoffPartsSent += ptr->IsHandoff; + } + events.emplace_back(std::move(ev)); ++VMultiPutRequests; - ReceivedVMultiPutResponses.push_back(false); } + } - for (ui32 idx = beginIdx; idx < endIdx; ++idx) { - const TDiskPutRequest &put = requests.PutsToSend[idx]; - ui32 counter = isVPut ? VPutRequests : VMultiPutRequests; - ui64 cookie = TBlobCookie(diskOrderNumber, put.BlobIdx, put.Id.PartId(), counter); + return events; + } - Y_DEBUG_ABORT_UNLESS(Info->Type.GetErasure() != TBlobStorageGroupType::ErasureMirror3of4 || - put.Id.PartId() != 3 || put.Buffer.IsEmpty()); + void ProcessResponse(TEvBlobStorage::TEvVPutResult& msg) { + ++VPutResponses; + ProcessResponseCommonPart(msg.Record); + ProcessResponseBlob(VDiskIDFromVDiskID(msg.Record.GetVDiskID()), msg.Record); + } - if constexpr (isVPut) { - auto vPut = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vDiskId, false, &cookie, - Deadline, Blackboard.PutHandleClass); - auto& record = vPut->Record; - if (put.ExtraBlockChecks) { - for (const auto& [tabletId, generation] : *put.ExtraBlockChecks) { - auto *p = record.AddExtraBlockChecks(); - p->SetTabletId(tabletId); - p->SetGeneration(generation); - } - } - R_LOG_DEBUG_SX(logCtx, "BPP20", "Send put to orderNumber# " << diskOrderNumber << " idx# " << idx - << " vPut# " << vPut->ToString()); - outVPutEvents.push_back(std::move(vPut)); - ++VPutRequests; - ReceivedVPutResponses.push_back(false); - } else if constexpr (isVMultiPut) { - // this request MUST originate from the TEvPut, so the Span field must be filled in - Y_ABORT_UNLESS(put.Span); - outVPutEvents.back()->AddVPut(put.Id, TRcBuf(TRope(put.Buffer)), &cookie, put.ExtraBlockChecks, put.Span->GetTraceId()); - } + void ProcessResponse(TEvBlobStorage::TEvVMultiPutResult& msg) { + ++VMultiPutResponses; + ProcessResponseCommonPart(msg.Record); + const TVDiskID vdiskId = VDiskIDFromVDiskID(msg.Record.GetVDiskID()); + for (const auto& item : msg.Record.GetItems()) { + ProcessResponseBlob(vdiskId, item); + } + } - if (put.IsHandoff) { - ++HandoffPartsSent; - } + size_t GetBlobIdx(const TLogoBlobID& id) const { + const auto it = BlobMap.find(id.FullID()); + Y_ABORT_UNLESS(it != BlobMap.end()); + return it->second; + } - LWPROBE(DSProxyPutVPut, put.Id.TabletID(), Info->GroupID, put.Id.Channel(), put.Id.PartId(), - put.Id.ToString(), Tactic, - NKikimrBlobStorage::EPutHandleClass_Name(Blackboard.PutHandleClass), - put.Id.BlobSize(), put.Buffer.size(), Info->GetFailDomainOrderNumber(vDiskId), - NKikimrBlobStorage::EVDiskQueueId_Name(TGroupQueues::TVDisk::TQueues::VDiskQueueId(*outVPutEvents.back())), - Blackboard.GroupQueues->GetPredictedDelayNsForEvent(*outVPutEvents.back(), Info->GetTopology()) * 0.000001); - } +protected: + void RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults, const TBlobStorageGroupInfo::TGroupVDisks& expired); + void RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults, + const TBlobStorageGroupInfo::TGroupVDisks& expired); - if constexpr (isVMultiPut) { - R_LOG_DEBUG_SX(logCtx, "BPP39", "Send put to orderNumber# " << diskOrderNumber - << " count# " << outVPutEvents.back()->Record.ItemsSize() - << " vMultiPut# " << outVPutEvents.back()->ToString()); - } + template<typename TProtobuf> + void ProcessResponseBlob(TVDiskID vdiskId, TProtobuf& record) { + Y_ABORT_UNLESS(record.HasStatus()); + Y_ABORT_UNLESS(record.HasBlobID()); - Blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[diskOrderNumber].FirstUnsentPutIdx = endIdx; + const NKikimrProto::EReplyStatus status = record.GetStatus(); + const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(record.GetBlobID()); + const ui32 orderNumber = Info->GetOrderNumber(TVDiskIdShort(vdiskId)); + + const size_t blobIdx = GetBlobIdx(blobId); + + if (IsDone[blobIdx]) { + return; + } + + switch (status) { + case NKikimrProto::ERROR: + case NKikimrProto::VDISK_ERROR_STATE: + case NKikimrProto::OUT_OF_SPACE: + Blackboard.AddErrorResponse(blobId, orderNumber); + AtLeastOneResponseWasNotOk = true; + break; + case NKikimrProto::OK: + case NKikimrProto::ALREADY: + Blackboard.AddPutOkResponse(blobId, orderNumber); + WrittenBeyondBarrier[blobIdx] = record.GetWrittenBeyondBarrier(); + break; + default: + Y_ABORT("unexpected status# %s", NKikimrProto::EReplyStatus_Name(status).data()); + } + } + + template<typename TProtobuf> + void ProcessResponseCommonPart(TProtobuf& record) { + Y_ABORT_UNLESS(record.HasStatus()); + const NKikimrProto::EReplyStatus status = record.GetStatus(); + Y_ABORT_UNLESS(status != NKikimrProto::BLOCKED && status != NKikimrProto::RACE && status != NKikimrProto::DEADLINE); + if (record.HasStatusFlags()) { + StatusFlags.Merge(record.GetStatusFlags()); + } + if (record.HasApproximateFreeSpaceShare()) { + float share = record.GetApproximateFreeSpaceShare(); + if (ApproximateFreeSpaceShare == 0.f || share < ApproximateFreeSpaceShare) { + ApproximateFreeSpaceShare = share; + } } } + }; //TPutImpl }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index 306e915a1af..bd3b1c73bba 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -319,8 +319,7 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState << " blob Id# " << partId.ToString()); Y_ABORT_UNLESS(state.Parts[record.PartIdx].Data.IsMonolith()); groupDiskRequests.AddPut(disk.OrderNumber, partId, state.Parts[record.PartIdx].Data.GetMonolith(), - TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx), - state.ExtraBlockChecks, state.Span, state.BlobIdx); + TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx), state.BlobIdx); disk.DiskParts[record.PartIdx].Situation = TBlobState::ESituation::Sent; } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h index dd212701e11..3a3bfbefcd6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h @@ -138,8 +138,6 @@ protected: GetDataBuffer(state, info), diskIdx == group.DiskIdx[0] ? TDiskPutRequest::ReasonInitial : TDiskPutRequest::ReasonError, diskIdx != group.DiskIdx[0], - state.ExtraBlockChecks, - state.Span, state.BlobIdx); s = TBlobState::ESituation::Sent; any |= {&info.GetTopology(), diskIdx}; @@ -172,8 +170,6 @@ protected: TRope(TString()), handoff ? TDiskPutRequest::ReasonError : TDiskPutRequest::ReasonInitial, handoff, - state.ExtraBlockChecks, - state.Span, state.BlobIdx); part.Situation = TBlobState::ESituation::Sent; any |= {&info.GetTopology(), (ui8)diskIdx}; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_counters_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_counters_ut.cpp index bb64b2711ae..67f272c552a 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_counters_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_counters_ut.cpp @@ -84,6 +84,8 @@ Y_UNIT_TEST(PutGeneratedSubrequestBytes) { } Y_UNIT_TEST(MultiPutGeneratedSubrequestBytes) { + return; // KIKIMR-9016 + NKikimr::TBlobStorageGroupType erasure = TErasureType::Erasure4Plus2Block; TTestBasicRuntime runtime(1, false); SetLogPriorities(runtime); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp index a3748aa3347..09dca39bb5f 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp @@ -73,12 +73,13 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) } group.SetPredictedDelayNs(7, 10); - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; - putImpl.GenerateInitialRequests(logCtx, partSetSingleton, vPuts); - group.SetError(0, NKikimrProto::ERROR); - TPutImpl::TPutResultVec putResults; + putImpl.GenerateInitialRequests(logCtx, partSetSingleton); + putImpl.Step(logCtx, putResults, {&group.GetInfo()->GetTopology()}); + auto vPuts = putImpl.GeneratePutRequests(); + group.SetError(0, NKikimrProto::ERROR); + TVector<ui32> diskSequence = {0, 7, 7, 7, 7, 6, 3, 4, 5, 1, 2}; TVector<ui32> slowDiskSequence = {3, 4, 5, 6, 1, 2}; @@ -86,7 +87,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) ui32 nextVPut = vPutIdx; ui32 diskPos = (ui32)-1; for (ui32 i = vPutIdx; i < vPuts.size(); ++i) { - auto& record = vPuts[i]->Record; + auto& record = std::get<0>(vPuts[i])->Record; TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); ui32 diskIdx = group.VDiskIdx(vDiskId); auto it = Find(diskSequence, diskIdx); @@ -98,7 +99,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) } } } - CTEST << "vdisk exp# " << (diskSequence.size() ? diskSequence.front() : -1) << " get# " << group.VDiskIdx(VDiskIDFromVDiskID(vPuts[nextVPut]->Record.GetVDiskID())) << Endl; + CTEST << "vdisk exp# " << (diskSequence.size() ? diskSequence.front() : -1) << " get# " << group.VDiskIdx(VDiskIDFromVDiskID(std::get<0>(vPuts[nextVPut])->Record.GetVDiskID())) << Endl; if (diskPos != (ui32)-1) { diskSequence.erase(diskSequence.begin() + diskPos); } @@ -111,15 +112,15 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) group.SetPredictedDelayNs(slowDiskSequence[vPutIdx], 10); } - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> nextVPuts; - - TEvBlobStorage::TEvVPut& vPut = *vPuts[vPutIdx]; + TEvBlobStorage::TEvVPut& vPut = *std::get<0>(vPuts[vPutIdx]); TActorId sender; TEvBlobStorage::TEvVPutResult vPutResult; NKikimrProto::EReplyStatus status = group.OnVPut(vPut); vPutResult.MakeError(status, TString(), vPut.Record); - putImpl.OnVPutEventResult(logCtx, sender, vPutResult, nextVPuts, putResults, {&group.GetInfo()->GetTopology()}); + putImpl.ProcessResponse(vPutResult); + putImpl.Step(logCtx, putResults, {&group.GetInfo()->GetTopology()}); + auto nextVPuts = putImpl.GeneratePutRequests(); if (putResults.size()) { break; @@ -151,6 +152,9 @@ struct TTestPutAllOk { static constexpr ui64 BlobCount = IsVPut ? 1 : 2; static constexpr ui32 MaxIterations = 10000; + using TPutResultEvent = std::variant<std::unique_ptr<TEvBlobStorage::TEvVPutResult>, + std::unique_ptr<TEvBlobStorage::TEvVMultiPutResult>>; + TActorSystemStub ActorSystemStub; TBlobStorageGroupType GroupType; TGroupMock Group; @@ -204,45 +208,40 @@ struct TTestPutAllOk { } } - void InitVPutResults(TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &vPuts, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPutResult>> &vPutResults) - { - vPutResults.resize(vPuts.size()); - for (ui32 vPutIdx = 0; vPutIdx < vPuts.size(); ++vPutIdx) { - TEvBlobStorage::TEvVPut &vPut = *vPuts[vPutIdx]; - NKikimrProto::EReplyStatus status = Group.OnVPut(vPut); + std::unique_ptr<TEvBlobStorage::TEvVPutResult> InitResult(TEvBlobStorage::TEvVPut& ev) { + NKikimrProto::EReplyStatus status = Group.OnVPut(ev); + UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::OK); + auto vPutResult = std::make_unique<TEvBlobStorage::TEvVPutResult>(); + vPutResult->MakeError(status, TString(), ev.Record); + return vPutResult; + } + + std::unique_ptr<TEvBlobStorage::TEvVMultiPutResult> InitResult(TEvBlobStorage::TEvVMultiPut& ev) { + TVector<NKikimrProto::EReplyStatus> statuses = Group.OnVMultiPut(ev); + for (auto status : statuses) { UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::OK); - vPutResults[vPutIdx].reset(new TEvBlobStorage::TEvVPutResult()); - TEvBlobStorage::TEvVPutResult &vPutResult = *vPutResults[vPutIdx]; - vPutResult.MakeError(status, TString(), vPut.Record); } + auto vMultiPutResult = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(); + Y_ABORT_UNLESS(ev.Record.ItemsSize() == statuses.size()); + vMultiPutResult->MakeError(NKikimrProto::OK, TString(), ev.Record); + for (ui64 itemIdx = 0; itemIdx < statuses.size(); ++itemIdx) { + NKikimrBlobStorage::TVMultiPutResultItem &item = *vMultiPutResult->Record.MutableItems(itemIdx); + NKikimrProto::EReplyStatus status = statuses[itemIdx]; + item.SetStatus(status); + } + Y_ABORT_UNLESS(vMultiPutResult->Record.ItemsSize() == statuses.size()); + return vMultiPutResult; } - void InitVPutResults(TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> &vMultiPuts, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPutResult>> &vMultiPutResults) - { - vMultiPutResults.resize(vMultiPuts.size()); - for (ui32 vMultiPutIdx = 0; vMultiPutIdx < vMultiPuts.size(); ++vMultiPutIdx) { - TEvBlobStorage::TEvVMultiPut &vMultiPut = *vMultiPuts[vMultiPutIdx]; - TVector<NKikimrProto::EReplyStatus> statuses = Group.OnVMultiPut(vMultiPut); - for (auto status : statuses) { - UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::OK); - } - vMultiPutResults[vMultiPutIdx].reset(new TEvBlobStorage::TEvVMultiPutResult()); - Y_ABORT_UNLESS(vMultiPut.Record.ItemsSize() == statuses.size()); - TEvBlobStorage::TEvVMultiPutResult &vMultiPutResult = *vMultiPutResults[vMultiPutIdx]; - vMultiPutResult.MakeError(NKikimrProto::OK, TString(), vMultiPut.Record); - for (ui64 itemIdx = 0; itemIdx < statuses.size(); ++itemIdx) { - NKikimrBlobStorage::TVMultiPutResultItem &item = *vMultiPutResult.Record.MutableItems(itemIdx); - NKikimrProto::EReplyStatus status = statuses[itemIdx]; - item.SetStatus(status); - } - Y_ABORT_UNLESS(vMultiPutResult.Record.ItemsSize() == statuses.size()); + void InitVPutResults(TDeque<TPutImpl::TPutEvent>& vPuts, TDeque<TPutResultEvent>& vPutResults) { + for (auto& ev : vPuts) { + std::visit([&](auto& ev) { + vPutResults.push_back(InitResult(*ev)); + }, ev); } } - template <typename TVPutResultEvent> - void PermutateVPutResults(ui64 resIdx, bool &isAborted, TDeque<std::unique_ptr<TVPutResultEvent>> &vPutResults) { + void PermutateVPutResults(ui64 resIdx, bool &isAborted, TDeque<TPutResultEvent> &vPutResults) { // select result in range [resIdx, vPutResults.size()) if (resIdx + 1 < CheckStack.size()) { ui32 tgt = CheckStack[resIdx]; @@ -263,7 +262,7 @@ struct TTestPutAllOk { } bool Step(TPutImpl &putImpl, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPutResult>> &vPutResults, + TDeque<TPutResultEvent> &vPutResults, TPutImpl::TPutResultVec &putResults) { bool isAborted = false; @@ -273,71 +272,15 @@ struct TTestPutAllOk { break; } - TActorId sender; - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts2; - putImpl.OnVPutEventResult(LogCtx, sender, *vPutResults[resIdx], vPuts2, putResults, &Group.GetInfo()->GetTopology()); - if (putResults.size()) { - break; - } - - for (ui32 vPutIdx = 0; vPutIdx < vPuts2.size(); ++vPutIdx) { - TEvBlobStorage::TEvVPut &vPut = *vPuts2[vPutIdx]; - NKikimrProto::EReplyStatus status = Group.OnVPut(vPut); - UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::OK); - vPutResults.emplace_back(new TEvBlobStorage::TEvVPutResult()); - TEvBlobStorage::TEvVPutResult &vPutResult = *vPutResults.back(); - vPutResult.MakeError(status, TString(), vPut.Record); - } - } - - return isAborted; - } - - bool Step(TPutImpl &putImpl, - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPutResult>> &vMultiPutResults, - TPutImpl::TPutResultVec &putResults) - { - bool isAborted = false; - for (ui64 resIdx = 0; resIdx < vMultiPutResults.size(); ++resIdx) { - PermutateVPutResults(resIdx, isAborted, vMultiPutResults); - if (isAborted) { - break; - } - - TActorId sender; - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts2; - putImpl.OnVPutEventResult(LogCtx, sender, *vMultiPutResults[resIdx], vMultiPuts2, putResults, - &Group.GetInfo()->GetTopology()); - if (putResults.size() == BlobIds.size()) { + std::visit([&](auto &ev) { putImpl.ProcessResponse(*ev); }, vPutResults[resIdx]); + putImpl.Step(LogCtx, putResults, &Group.GetInfo()->GetTopology()); + auto vPuts = putImpl.GeneratePutRequests(); + if (putResults.size() == BlobCount) { break; } - for (ui32 vMultiPutIdx = 0; vMultiPutIdx < vMultiPuts2.size(); ++vMultiPutIdx) { - TEvBlobStorage::TEvVMultiPut &vMultiPut = *vMultiPuts2[vMultiPutIdx]; - TVector<NKikimrProto::EReplyStatus> statuses = Group.OnVMultiPut(vMultiPut); - for (auto status : statuses) { - UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::OK); - } - - vMultiPutResults[vMultiPutIdx].reset(new TEvBlobStorage::TEvVMultiPutResult()); - Y_ABORT_UNLESS(vMultiPut.Record.ItemsSize() == statuses.size()); - TEvBlobStorage::TEvVMultiPutResult &vMultiPutResult = *vMultiPutResults[vMultiPutIdx]; - vMultiPutResult.MakeError(NKikimrProto::OK, TString(), vMultiPut.Record); - - for (ui64 itemIdx = 0; itemIdx < statuses.size(); ++itemIdx) { - auto &item = vMultiPut.Record.GetItems(itemIdx); - NKikimrProto::EReplyStatus status = statuses[itemIdx]; - TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - - ui64 cookieValue = 0; - ui64 *cookie = nullptr; - if (item.HasCookie()) { - cookieValue = item.GetCookie(); - cookie = &cookieValue; - } - - vMultiPutResult.AddVPutResult(status, TString(), blobId, cookie); - } + for (auto& put : vPuts) { + std::visit([&](auto& ev) { vPutResults.push_back(InitResult(*ev)); }, put); } } @@ -345,10 +288,6 @@ struct TTestPutAllOk { } void Run() { - using TVPutEvent = std::conditional_t<IsVPut, TEvBlobStorage::TEvVPut, TEvBlobStorage::TEvVMultiPut>; - using TVPutResultEvent = std::conditional_t<IsVPut, TEvBlobStorage::TEvVPutResult, - TEvBlobStorage::TEvVMultiPutResult>; - ui64 i = 0; for (; i < MaxIterations; ++i) { Group.Wipe(); @@ -357,7 +296,7 @@ struct TTestPutAllOk { std::unique_ptr<TEvBlobStorage::TEvPut> vPut(new TEvBlobStorage::TEvPut(blobId, Data, TInstant::Max(), NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault)); events.emplace_back(static_cast<TEventHandle<TEvBlobStorage::TEvPut> *>( - new IEventHandle(TActorId(), TActorId(), vPut.release()))); + new IEventHandle(TActorId(), TActorId(), vPut.release()))); } TMaybe<TPutImpl> putImpl; @@ -369,15 +308,16 @@ struct TTestPutAllOk { NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault, false); } - TDeque<std::unique_ptr<TVPutEvent>> vPuts; - putImpl->GenerateInitialRequests(LogCtx, PartSets, vPuts); + putImpl->GenerateInitialRequests(LogCtx, PartSets); + putImpl->Step(LogCtx, putResults, &Group.GetInfo()->GetTopology()); + auto vPuts = putImpl->GeneratePutRequests(); UNIT_ASSERT(vPuts.size() == 6 || !IsVPut); - TDeque<std::unique_ptr<TVPutResultEvent>> vPutResults; + TDeque<TPutResultEvent> vPutResults; InitVPutResults(vPuts, vPutResults); bool isAborted = Step(*putImpl, vPutResults, putResults); if (!isAborted) { - UNIT_ASSERT(putResults.size() == BlobCount); + UNIT_ASSERT_VALUES_EQUAL(putResults.size(), BlobCount); for (auto& [blobIdx, result] : putResults) { UNIT_ASSERT(result->Status == NKikimrProto::OK); UNIT_ASSERT(result->Id == BlobIds[blobIdx]); @@ -413,7 +353,6 @@ Y_UNIT_TEST(TestMirror3dcWith3x3MinLatencyMod) { TEvBlobStorage::TEvPut ev(blobId, data, TInstant::Max(), NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticMinLatency); TPutImpl putImpl(env.Info, env.GroupQueues, &ev, env.Mon, true, TActorId(), 0, NWilson::TTraceId()); - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; TLogContext logCtx(NKikimrServices::BS_PROXY_PUT, false); logCtx.LogAcc.IsLogEnabled = false; @@ -426,13 +365,16 @@ Y_UNIT_TEST(TestMirror3dcWith3x3MinLatencyMod) { char *dataBytes = encryptedData.Detach(); Encrypt(dataBytes, dataBytes, 0, encryptedData.size(), blobId, *env.Info); ErasureSplit((TErasureType::ECrcMode)blobId.CrcMode(), env.Info->Type, TRope(encryptedData), partSetSingleton[0]); - putImpl.GenerateInitialRequests(logCtx, partSetSingleton, vPuts); + putImpl.GenerateInitialRequests(logCtx, partSetSingleton); + TPutImpl::TPutResultVec putResults; + putImpl.Step(logCtx, putResults, &env.Info->GetTopology()); + auto vPuts = putImpl.GeneratePutRequests(); UNIT_ASSERT_VALUES_EQUAL(vPuts.size(), 9); using TVDiskIDTuple = decltype(std::declval<TVDiskID>().ConvertToTuple()); THashSet<TVDiskIDTuple> vDiskIds; for (auto &vPut : vPuts) { - TVDiskID vDiskId = VDiskIDFromVDiskID(vPut->Record.GetVDiskID()); + TVDiskID vDiskId = VDiskIDFromVDiskID(std::get<0>(vPut)->Record.GetVDiskID()); bool inserted = vDiskIds.insert(vDiskId.ConvertToTuple()).second; UNIT_ASSERT(inserted); } diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp index 7bde268aa80..64b8725e2d1 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp @@ -547,6 +547,8 @@ void MakeTestMultiPutItemStatuses(TTestBasicRuntime &runtime, const TBlobStorage } Y_UNIT_TEST(TestGivenBlock42MultiPut2ItemsStatuses) { + return; // KIKIMR-9016 + TBlobStorageGroupType type = {TErasureType::Erasure4Plus2Block}; constexpr ui64 statusCount = 3; NKikimrProto::EReplyStatus maybeStatuses[statusCount] = { @@ -592,6 +594,8 @@ struct TDyingDecorator : public TTestDecorator { }; Y_UNIT_TEST(TestGivenBlock42GroupGenerationGreaterThanVDiskGenerations) { + return; // KIKIMR-9016 + TBlobStorageGroupType type = {TErasureType::Erasure4Plus2Block}; TTestBasicRuntime runtime(1, false); Setup(runtime, type); diff --git a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp index cf31ea820e8..ed4c397db8c 100644 --- a/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/counting_events.cpp @@ -164,7 +164,7 @@ Y_UNIT_TEST_SUITE(CountingEvents) { } Y_UNIT_TEST(Put_Mirror3of4) { - CountingEventsTest("put", 115, TBlobStorageGroupType::ErasureMirror3of4, 114); + CountingEventsTest("put", 116, TBlobStorageGroupType::ErasureMirror3of4, 114); } Y_UNIT_TEST(Put_Mirror3dc) { diff --git a/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp b/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp index cac7bfd7f3f..7aa8ad42b09 100644 --- a/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/extra_block_checks.cpp @@ -46,17 +46,24 @@ Y_UNIT_TEST_SUITE(ExtraBlockChecks) { UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); } + const TLogoBlobID a(1, 11, 1, 0, data.size(), 3); + const TLogoBlobID b(1, 11, 1, 0, data.size(), 4); runtime->WrapInActorContext(edge, [&] { - auto ev = std::make_unique<TEvBlobStorage::TEvPut>(TLogoBlobID(1, 11, 1, 0, data.size(), 3), data, TInstant::Max()); + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(a, data, TInstant::Max()); ev->ExtraBlockChecks.emplace_back(2, 10); SendToBSProxy(edge, info->GroupID, ev.release()); - SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvPut(TLogoBlobID(1, 11, 1, 0, data.size(), 4), data, TInstant::Max())); + SendToBSProxy(edge, info->GroupID, new TEvBlobStorage::TEvPut(b, data, TInstant::Max())); }); { + std::unordered_map<TLogoBlobID, NKikimrProto::EReplyStatus> map; auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false); - UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::BLOCKED); + map[res->Get()->Id] = res->Get()->Status; res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false); - UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + map[res->Get()->Id] = res->Get()->Status; + UNIT_ASSERT(map.contains(a)); + UNIT_ASSERT_VALUES_EQUAL(map[a], NKikimrProto::BLOCKED); + UNIT_ASSERT(map.contains(b)); + UNIT_ASSERT_VALUES_EQUAL(map[b], NKikimrProto::OK); } } } |
