diff options
author | alexvru <alexvru@ydb.tech> | 2023-12-07 19:55:02 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-12-07 20:56:15 +0300 |
commit | e8437d81a04fbfae5c0de3e701a3c039205eaab1 (patch) | |
tree | 0913cdb7b274d69e6c338676151057cc4b83e53a | |
parent | 5ddac82c510633e79e4a48d13f708c85765cb5e9 (diff) | |
download | ydb-e8437d81a04fbfae5c0de3e701a3c039205eaab1.tar.gz |
Handle long-lasting Put queries in DS proxy correctly -- part 1 KIKIMR-9016
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp | 171 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h | 7 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_put.cpp | 108 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h | 5 |
4 files changed, 167 insertions, 124 deletions
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 3dc9dba1b4..0de81447ee 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -92,7 +92,7 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB ui32 shift, TRope&& data, bool keep, bool doNotKeep) { // Add actual data to Parts Y_ABORT_UNLESS(id.PartId() != 0); - ui32 partIdx = id.PartId() - 1; + const ui32 partIdx = id.PartId() - 1; Y_ABORT_UNLESS(partIdx < Parts.size()); const ui32 partSize = info.Type.PartSize(id); const ui32 dataSize = data.size(); @@ -100,114 +100,89 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB Parts[partIdx].AddResponseData(partSize, shift, std::move(data)); } IsChanged = true; + + const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash()); + Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize()); + TDisk& disk = Disks[diskIdx]; + Y_ABORT_UNLESS(disk.OrderNumber == orderNumber); + // Mark part as present for the disk - bool isFound = false; - for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { - TDisk &disk = Disks[diskIdx]; - if (disk.OrderNumber == orderNumber) { - isFound = true; - Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); - TDiskPart &diskPart = disk.DiskParts[partIdx]; - //Cerr << Endl << "present diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; - diskPart.Situation = ESituation::Present; - if (partSize) { - TIntervalVec<i32> responseInterval(shift, shift + dataSize); - diskPart.Requested.Subtract(responseInterval); - } - break; - } + Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); + TDiskPart &diskPart = disk.DiskParts[partIdx]; + diskPart.Situation = ESituation::Present; + if (partSize) { + TIntervalVec<i32> responseInterval(shift, shift + dataSize); + diskPart.Requested.Subtract(responseInterval); } - Y_ABORT_UNLESS(isFound); + Keep |= keep; DoNotKeep |= doNotKeep; } void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { - Y_UNUSED(info); Y_ABORT_UNLESS(id.PartId() != 0); - ui32 partIdx = id.PartId() - 1; + const ui32 partIdx = id.PartId() - 1; IsChanged = true; - // Mark part as absent for the disk - bool isFound = false; - for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { - TDisk &disk = Disks[diskIdx]; - if (disk.OrderNumber == orderNumber) { - isFound = true; - Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); - TDiskPart &diskPart = disk.DiskParts[partIdx]; - //Cerr << Endl << "absent diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; - diskPart.Situation = ESituation::Absent; - diskPart.Requested.Clear(); - break; - } - } - Y_ABORT_UNLESS(isFound); + + const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash()); + Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize()); + TDisk& disk = Disks[diskIdx]; + Y_ABORT_UNLESS(disk.OrderNumber == orderNumber); + + Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); + TDiskPart &diskPart = disk.DiskParts[partIdx]; + diskPart.Situation = ESituation::Absent; + diskPart.Requested.Clear(); } void TBlobState::AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { - Y_UNUSED(info); Y_ABORT_UNLESS(id.PartId() != 0); - ui32 partIdx = id.PartId() - 1; + const ui32 partIdx = id.PartId() - 1; IsChanged = true; - // Mark part as put ok for the disk - bool isFound = false; - for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { - TDisk &disk = Disks[diskIdx]; - if (disk.OrderNumber == orderNumber) { - isFound = true; - Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); - TDiskPart &diskPart = disk.DiskParts[partIdx]; - //Cerr << Endl << "put ok diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; - diskPart.Situation = ESituation::Present; - break; - } - } - Y_ABORT_UNLESS(isFound); + + const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash()); + Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize()); + TDisk& disk = Disks[diskIdx]; + Y_ABORT_UNLESS(disk.OrderNumber == orderNumber); + + Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); + TDiskPart& diskPart = disk.DiskParts[partIdx]; + diskPart.Situation = ESituation::Present; } void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { - Y_UNUSED(info); Y_ABORT_UNLESS(id.PartId() != 0); ui32 partIdx = id.PartId() - 1; IsChanged = true; - // Mark part as error for the disk - bool isFound = false; - for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { - TDisk &disk = Disks[diskIdx]; - if (disk.OrderNumber == orderNumber) { - isFound = true; - Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); - TDiskPart &diskPart = disk.DiskParts[partIdx]; - //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; - diskPart.Situation = ESituation::Error; - diskPart.Requested.Clear(); - break; - } - } - Y_ABORT_UNLESS(isFound); + + const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash()); + Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize()); + TDisk& disk = Disks[diskIdx]; + Y_ABORT_UNLESS(disk.OrderNumber == orderNumber); + + Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); + TDiskPart &diskPart = disk.DiskParts[partIdx]; + diskPart.Situation = ESituation::Error; + diskPart.Requested.Clear(); } void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) { - Y_UNUSED(info); Y_ABORT_UNLESS(id.PartId() != 0); - ui32 partIdx = id.PartId() - 1; + const ui32 partIdx = id.PartId() - 1; IsChanged = true; - // Mark part as error for the disk - bool isFound = false; - for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { - TDisk &disk = Disks[diskIdx]; - if (disk.OrderNumber == orderNumber) { - isFound = true; - Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); - TDiskPart &diskPart = disk.DiskParts[partIdx]; - //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; - diskPart.Situation = ESituation::Lost; - diskPart.Requested.Clear(); - break; - } - } - Y_ABORT_UNLESS(isFound); + + const ui32 diskIdx = info.GetIdxInSubgroup(info.GetVDiskId(orderNumber), id.Hash()); + Y_ABORT_UNLESS(diskIdx != info.Type.BlobSubgroupSize()); + TDisk& disk = Disks[diskIdx]; + Y_ABORT_UNLESS(disk.OrderNumber == orderNumber); + + Y_ABORT_UNLESS(partIdx < disk.DiskParts.size()); + TDiskPart &diskPart = disk.DiskParts[partIdx]; + //Cerr << Endl << "error diskIdx# " << diskIdx << " partIdx# " << partIdx << Endl << Endl; + diskPart.Situation = ESituation::Lost; + diskPart.Requested.Clear(); + Keep |= keep; DoNotKeep |= doNotKeep; } @@ -236,6 +211,20 @@ void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TG } } +bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const { + TSubgroupPartLayout layout; + for (ui32 diskIdx = 0, numDisks = Disks.size(); diskIdx < numDisks; ++diskIdx) { + const TDisk& disk = Disks[diskIdx]; + for (ui32 partIdx = 0, numParts = disk.DiskParts.size(); partIdx < numParts; ++partIdx) { + const TDiskPart& part = disk.DiskParts[partIdx]; + if (part.Situation == ESituation::Present && !expired[disk.OrderNumber]) { + layout.AddItem(diskIdx, partIdx, info.Type); + } + } + } + return info.GetQuorumChecker().GetBlobState(layout, {&info.GetTopology()}) == TBlobStorageGroupInfo::EBS_FULL; +} + TString TBlobState::ToString() const { TStringStream str; str << "{Id# " << Id.ToString(); @@ -615,5 +604,21 @@ TString TBlackboard::ToString() const { return str.Str(); } +void TBlackboard::InvalidatePartStates(ui32 orderNumber) { + const TVDiskID vdiskId = Info->GetVDiskId(orderNumber); + for (auto& [id, state] : BlobStates) { + Y_ABORT_UNLESS(!state.IsDone); + if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) { + TBlobState::TDisk& disk = state.Disks[diskIdx]; + for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) { + TBlobState::TDiskPart& part = disk.DiskParts[partIdx]; + if (part.Situation == TBlobState::ESituation::Sent || part.Situation == TBlobState::ESituation::Present) { + part.Situation = TBlobState::ESituation::Unknown; + state.IsChanged = true; + } + } + } + } +} }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index d61e250f32..f61e226dd6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -5,6 +5,7 @@ #include <ydb/core/blobstorage/base/batched_vec.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> +#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h> #include <ydb/core/util/fragmented_buffer.h> #include <ydb/core/util/interval_set.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -63,12 +64,14 @@ struct TBlobState { struct TDiskPart { TIntervalSet<i32> Requested; ESituation Situation = ESituation::Unknown; + TString ToString() const; }; struct TDisk { ui32 OrderNumber; bool IsSlow = false; TStackVec<TDiskPart, TypicalPartsInBlob> DiskParts; + TString ToString() const; }; @@ -105,6 +108,8 @@ struct TBlobState { NKikimrBlobStorage::EVDiskQueueId queueId, ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const; TString ToString() const; + bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const; + static TString SituationToString(ESituation situation); }; @@ -228,6 +233,8 @@ struct TBlackboard { } } + void InvalidatePartStates(ui32 orderNumber); + void RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span); TBlobState& operator [](const TLogoBlobID& id); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 35d9cc681d..39112508ae 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -70,6 +70,12 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt bool RequireExtraBlockChecks = false; + struct TIncarnationRecord { + ui64 IncarnationGuid; + TMonotonic ExpirationTimestamp; + }; + std::vector<std::optional<TIncarnationRecord>> IncarnationRecords; + void SanityCheck() { if (RequestsSent <= MaxSaneRequests) { return; @@ -89,29 +95,58 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt SanityCheck(); // May Die } + template<typename TEvent, typename TCookie> + void SendPuts(auto&& callback) { + TDeque<std::unique_ptr<TEvent>> v; + callback(v); + UpdatePengingVDiskResponseCount<TEvent, TCookie>(v); + RequestsSent += v.size(); + CountPuts(v); + SendToQueues(v, TimeStatsEnabled); + } + void Accelerate() { if (IsAccelerated) { return; } IsAccelerated = true; + auto callback = [this](auto& v) { + PutImpl.Accelerate(LogCtx, v); + *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size(); + }; + if (IsMultiPutMode) { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts; - PutImpl.Accelerate(LogCtx, vMultiPuts); - UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts); - RequestsSent += vMultiPuts.size(); - *Mon->NodeMon->AccelerateEvVMultiPutCount += vMultiPuts.size(); - CountPuts(vMultiPuts); - SendToQueues(vMultiPuts, TimeStatsEnabled); + SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(callback); } else { - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; - PutImpl.Accelerate(LogCtx, vPuts); - UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts); - RequestsSent += vPuts.size(); - *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); - CountPuts(vPuts); - SendToQueues(vPuts, TimeStatsEnabled); + SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>(callback); + } + } + + void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) { + Y_ABORT_UNLESS(orderNumber < IncarnationRecords.size()); + if (auto& record = IncarnationRecords[orderNumber]; !record) { + record = TIncarnationRecord{ + .IncarnationGuid = incarnationGuid, + .ExpirationTimestamp = timestamp, + }; + } else if (record->IncarnationGuid != incarnationGuid) { + PutImpl.InvalidatePartStates(orderNumber); + record->IncarnationGuid = incarnationGuid; + record->ExpirationTimestamp = timestamp; + } else if (record->ExpirationTimestamp < timestamp) { + record->ExpirationTimestamp = timestamp; + } + } + + TBlobStorageGroupInfo::TGroupVDisks CreateExpiredVDiskSet(TMonotonic timestamp) const { + TBlobStorageGroupInfo::TGroupVDisks res(&Info->GetTopology()); + for (ui32 i = 0; i < IncarnationRecords.size(); ++i) { + if (auto& record = IncarnationRecords[i]; record && record->ExpirationTimestamp <= timestamp) { + res |= {&Info->GetTopology(), Info->GetVDiskId(i)}; + } } + return res; } void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) { @@ -148,6 +183,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); const TVDiskIdShort shortId(vDiskId); + if (record.HasIncarnationGuid()) { + HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); + } + LWPROBE(DSProxyVDiskRequestDuration, TEvBlobStorage::EvVPut, blob.BlobSize(), blob.TabletID(), Info->GroupID, blob.Channel(), Info->GetFailDomainOrderNumber(shortId), GetStartTime(record.GetTimestamps()), @@ -162,13 +201,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt GetVDiskTimeMs(record.GetTimestamps())); } - TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; TPutImpl::TPutResultVec putResults; - PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), vPuts, putResults); - UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVPut, TBlobCookie>(vPuts); - RequestsSent += vPuts.size(); - CountPuts(vPuts); - SendToQueues(vPuts, TimeStatsEnabled); + SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>([&](auto& v) { + PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults); + }); if (ReplyAndDieWithLastResponse(putResults)) { return; } @@ -209,6 +245,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt TVDiskID vDiskId = VDiskIDFromVDiskID(record.GetVDiskID()); const TVDiskIdShort shortId(vDiskId); + if (record.HasIncarnationGuid()) { + HandleIncarnation(TActivationContext::Monotonic(), Info->GetOrderNumber(shortId), record.GetIncarnationGuid()); + } + Y_ABORT_UNLESS(record.HasCookie()); TVMultiPutCookie cookie(record.GetCookie()); const ui64 vdisk = cookie.GetVDiskOrderNumber(); @@ -277,12 +317,9 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } putResults.clear(); - TDeque<std::unique_ptr<TEvBlobStorage::TEvVMultiPut>> vMultiPuts; - PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), vMultiPuts, putResults); - UpdatePengingVDiskResponseCount<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(vMultiPuts); - RequestsSent += vMultiPuts.size(); - CountPuts(vMultiPuts); - SendToQueues(vMultiPuts, TimeStatsEnabled); + SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>([&](auto& v) { + PutImpl.OnVPutEventResult(LogCtx, ev->Sender, *ev->Get(), v, putResults); + }); if (ReplyAndDieWithLastResponse(putResults)) { return; } @@ -443,6 +480,7 @@ public: , IsAccelerateScheduled(false) , IsMultiPutMode(false) , RequireExtraBlockChecks(!ev->ExtraBlockChecks.empty()) + , IncarnationRecords(info->GetTotalVDisksNum()) { if (ev->Orbit.HasShuttles()) { RootCauseTrack.IsOn = true; @@ -486,6 +524,7 @@ public: , IsAccelerated(false) , IsAccelerateScheduled(false) , IsMultiPutMode(true) + , IncarnationRecords(info->GetTotalVDisksNum()) { Y_DEBUG_ABORT_UNLESS(events.size() <= MaxBatchedPutRequests); for (auto &ev : events) { @@ -606,24 +645,13 @@ public: } } - template<typename TEvV, typename TCookie> - struct TIssue { - void operator ()(TThis& self) { - TDeque<std::unique_ptr<TEvV>> events; - self.PutImpl.GenerateInitialRequests(self.LogCtx, self.PartSets, events); - self.UpdatePengingVDiskResponseCount<TEvV, TCookie>(events); - self.RequestsSent += events.size(); - self.CountPuts(events); - self.SendToQueues(events, self.TimeStatsEnabled); - } - }; - void ResumeBootstrap() { if (EncodeQuantum()) { + auto callback = [this](auto& v) { PutImpl.GenerateInitialRequests(LogCtx, PartSets, v); }; if (IsMultiPutMode) { - TIssue<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>()(*this); + SendPuts<TEvBlobStorage::TEvVMultiPut, TVMultiPutCookie>(callback); } else { - TIssue<TEvBlobStorage::TEvVPut, TBlobCookie>()(*this); + SendPuts<TEvBlobStorage::TEvVPut, TBlobCookie>(callback); } } else { TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvResume, 0, SelfId(), {}, nullptr, 0)); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 53f2483a80..e883bb960c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -427,10 +427,13 @@ public: TString DumpFullState() const; bool MarkBlobAsSent(ui64 blobIdx); - bool MarkBlobAsSent(TMap<TLogoBlobID, TBlobState>::iterator it); TString ToString() const; + void InvalidatePartStates(ui32 orderNumber) { + Blackboard.InvalidatePartStates(orderNumber); + } + protected: bool RunStrategies(TLogContext &logCtx, TPutResultVec &outPutResults); bool RunStrategy(TLogContext &logCtx, const IStrategy& strategy, TPutResultVec &outPutResults); |