diff options
author | alexvru <[email protected]> | 2023-02-28 10:22:53 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2023-02-28 10:22:53 +0300 |
commit | 6340e68e824b903a970de91f1b9fbf142a9ecc48 (patch) | |
tree | 12baf6095d216fe9837b6bf551dd694dadbadadf | |
parent | 5a7fcf0461802ef2229d6611dd6ce4886fbb9993 (diff) |
Handle out-of-space condition for BlobDepot
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 50 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/channel_kind.cpp | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 8 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 27 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 29 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_decommit.cpp | 35 | ||||
-rw-r--r-- | ydb/core/blob_depot/group_metrics_exchange.cpp | 32 | ||||
-rw-r--r-- | ydb/core/blob_depot/space_monitor.cpp | 32 | ||||
-rw-r--r-- | ydb/core/blob_depot/space_monitor.h | 4 |
12 files changed, 144 insertions, 92 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index edc014ed8fc..5e59be28a0f 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -134,35 +134,35 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Executor()->Generation(); auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, ev->Get()->Record.GetChannelKind(), generation); - auto *givenIdRange = record->MutableGivenIdRange(); - std::vector<ui8> channels(ev->Get()->Record.GetCount()); - PickChannels(record->GetChannelKind(), channels); - - THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges; - for (ui8 channelIndex : channels) { - TChannelInfo& channel = Channels[channelIndex]; - const ui64 value = channel.NextBlobSeqId++; - - // fill in range item - auto& range = issuedRanges[channelIndex]; - if (!range || range->GetEnd() != value) { - range = givenIdRange->AddChannelRanges(); - range->SetChannel(channelIndex); - range->SetBegin(value); + if (PickChannels(record->GetChannelKind(), channels)) { + auto *givenIdRange = record->MutableGivenIdRange(); + + THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges; + for (ui8 channelIndex : channels) { + TChannelInfo& channel = Channels[channelIndex]; + const ui64 value = channel.NextBlobSeqId++; + + // fill in range item + auto& range = issuedRanges[channelIndex]; + if (!range || range->GetEnd() != value) { + range = givenIdRange->AddChannelRanges(); + range->SetChannel(channelIndex); + range->SetBegin(value); + } + range->SetEnd(value + 1); } - range->SetEnd(value + 1); - } - // register issued ranges in agent and global records - TAgent& agent = GetAgent(ev->Recipient); - for (const auto& range : givenIdRange->GetChannelRanges()) { - agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); - Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); + // register issued ranges in agent and global records + TAgent& agent = GetAgent(ev->Recipient); + for (const auto& range : givenIdRange->GetChannelRanges()) { + agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd()); + Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd()); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()), - (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()), - (Begin, range.GetBegin()), (End, range.GetEnd())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()), + (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()), + (Begin, range.GetBegin()), (End, range.GetEnd())); + } } TActivationContext::Send(response.release()); diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index c998e1dfb5c..ebb975bf883 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -343,7 +343,7 @@ namespace NKikimr::NBlobDepot { virtual void OnUpdateBlock() {} virtual void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus /*status*/, TString /*dataOrErrorReason*/) {} - virtual void OnIdAllocated() {} + virtual void OnIdAllocated(bool /*success*/) {} virtual void OnDestroy(bool /*success*/) {} protected: // reading logic @@ -441,7 +441,7 @@ namespace NKikimr::NBlobDepot { void RebuildHeap(); void EnqueueQueryWaitingForId(TQuery *query); - void ProcessQueriesWaitingForId(); + void ProcessQueriesWaitingForId(bool success); }; THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp index d28fcde537f..3451ce6097f 100644 --- a/ydb/core/blob_depot/agent/channel_kind.cpp +++ b/ydb/core/blob_depot/agent/channel_kind.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { NumAvailableItems += range.GetEnd() - range.GetBegin(); } - ProcessQueriesWaitingForId(); + ProcessQueriesWaitingForId(true); } ui32 TBlobDepotAgent::TChannelKind::GetNumAvailableItems() const { @@ -32,6 +32,7 @@ namespace NKikimr::NBlobDepot { } } if (options.empty()) { + agent.IssueAllocateIdsIfNeeded(*this); return std::nullopt; } @@ -67,10 +68,10 @@ namespace NKikimr::NBlobDepot { QueriesWaitingForId.PushBack(query); } - void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId() { + void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId(bool success) { TIntrusiveList<TQuery, TPendingId> temp; temp.Swap(QueriesWaitingForId); - temp.ForEach([&](TQuery *query) { query->OnIdAllocated(); }); + temp.ForEach([&](TQuery *query) { query->OnIdAllocated(success); }); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 2682683f4e6..843bb4a0ba1 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -92,7 +92,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) { - if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) { + if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && IsConnected) { const ui64 id = NextTabletRequestId++; STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (AgentId, LogId), (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)), @@ -119,6 +119,8 @@ namespace NKikimr::NBlobDepot { if (msg.HasGivenIdRange()) { kind.IssueGivenIdRange(msg.GetGivenIdRange()); + } else { + kind.ProcessQueriesWaitingForId(false); } STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (AgentId, LogId), (Msg, msg), diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 9efff1c64f8..856d4e90198 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -189,8 +189,12 @@ namespace NKikimr::NBlobDepot { CheckBlocks(); // just restart request } - void OnIdAllocated() override { - IssuePuts(); + void OnIdAllocated(bool success) override { + if (success) { + IssuePuts(); + } else { + EndWithError(NKikimrProto::ERROR, "out of space"); + } } void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index 41e91e49e2d..850b94c8f9e 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -372,19 +372,20 @@ namespace NKikimr::NBlobDepot { (Status, resp.Status), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size())); if (resp.Status == NKikimrProto::OK) { std::vector<ui8> channels(1); - Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels); - TChannelInfo& channel = Self->Channels[channels.front()]; - const ui64 value = channel.NextBlobSeqId++; - const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); - const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize()); - const ui64 putId = NextPutId++; - SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId); - const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing - Y_VERIFY(inserted); - const bool inserted1 = PutIdToKey.try_emplace(putId, TData::TKey(resp.Id), it->first).second; - Y_VERIFY(inserted1); - ++it->second; - getBytes += id.BlobSize(); + if (!Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels)) { + TChannelInfo& channel = Self->Channels[channels.front()]; + const ui64 value = channel.NextBlobSeqId++; + const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); + const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize()); + const ui64 putId = NextPutId++; + SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId); + const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing + Y_VERIFY(inserted); + const bool inserted1 = PutIdToKey.try_emplace(putId, TData::TKey(resp.Id), it->first).second; + Y_VERIFY(inserted1); + ++it->second; + } + getBytes += resp.Id.BlobSize(); } else if (resp.Status == NKikimrProto::NODATA) { Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(resp.Id), TEvPrivate::EvTxComplete, SelfId(), it->first); diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index aa815386804..60aa159111e 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -183,22 +183,31 @@ namespace NKikimr::NBlobDepot { } } - void TBlobDepot::PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels) { + bool TBlobDepot::PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels) { const auto kindIt = ChannelKinds.find(kind); Y_VERIFY(kindIt != ChannelKinds.end()); auto& kindv = kindIt->second; if (kindv.GroupAccumWeights.empty()) { - // recalculate group weights - ui64 accum = 0; - THashSet<ui32> seenGroups; - for (const auto& [channel, groupId] : kindv.ChannelGroups) { - if (const auto& [_, inserted] = seenGroups.insert(groupId); inserted) { - accum += SpaceMonitor->GetGroupAllocationWeight(groupId); - kindv.GroupAccumWeights.emplace_back(groupId, accum); + for (const bool stopOnLightYellow : {true, false}) { + // recalculate group weights + ui64 accum = 0; + THashSet<ui32> seenGroups; + for (const auto& [channel, groupId] : kindv.ChannelGroups) { + if (const auto& [_, inserted] = seenGroups.insert(groupId); inserted) { + if (const ui64 w = SpaceMonitor->GetGroupAllocationWeight(groupId, stopOnLightYellow)) { + accum += w; + kindv.GroupAccumWeights.emplace_back(groupId, accum); + } + } + } + if (!kindv.GroupAccumWeights.empty()) { + break; } } - Y_VERIFY(!kindv.GroupAccumWeights.empty()); + if (kindv.GroupAccumWeights.empty()) { + return false; // no allocation possible + } } const auto [_, accum] = kindv.GroupAccumWeights.back(); @@ -220,6 +229,8 @@ namespace NKikimr::NBlobDepot { const size_t channelIndex = RandomNumber(channels.size()); channel = channels[channelIndex]; } + + return true; } IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) { diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index fc0ab0ea7f2..277257d211a 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -200,7 +200,7 @@ namespace NKikimr::NBlobDepot { void InitChannelKinds(); void InvalidateGroupForAllocation(ui32 groupId); - void PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels); + bool PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels); TString GetLogId() const { const auto *executor = Executor(); @@ -317,7 +317,7 @@ namespace NKikimr::NBlobDepot { void DoGroupMetricsExchange(); void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev); void Handle(TEvBlobDepot::TEvPushMetrics::TPtr ev); - void UpdateThroughputs(); + void UpdateThroughputs(bool reschedule = true); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Validation diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp index 4a90a0b60c0..bda6c479615 100644 --- a/ydb/core/blob_depot/data_decommit.cpp +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -237,20 +237,25 @@ namespace NKikimr::NBlobDepot { void IssuePut(TKey key, TString&& buffer, bool keep, bool doNotKeep) { std::vector<ui8> channels(1); - Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels); - TChannelInfo& channel = Self->Channels[channels.front()]; - const ui64 value = channel.NextBlobSeqId++; - const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); - const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size()); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender), - (Cookie, Ev->Cookie), (Key, key), (BlobId, id)); - SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()), - (ui64)keep | (ui64)doNotKeep << 1); - const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing - Y_VERIFY(inserted); - const bool inserted1 = IdToKey.try_emplace(id, std::move(key)).second; - Y_VERIFY(inserted1); - ++PutsInFlight; + if (Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels)) { + TChannelInfo& channel = Self->Channels[channels.front()]; + const ui64 value = channel.NextBlobSeqId++; + const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); + const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size()); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (Key, key), (BlobId, id)); + SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()), + (ui64)keep | (ui64)doNotKeep << 1); + const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing + Y_VERIFY(inserted); + const bool inserted1 = IdToKey.try_emplace(id, std::move(key)).second; + Y_VERIFY(inserted1); + ++PutsInFlight; + } else { // we couldn't restore this blob -- there was no place to write it to + ResolutionErrors.insert(key.GetBlobId()); + ++PutsInFlight; + HandleTxComplete(); + } } void Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { @@ -271,7 +276,7 @@ namespace NKikimr::NBlobDepot { TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep); if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item - ResolutionErrors.insert(msg.Id); + ResolutionErrors.insert(key.GetBlobId()); } } diff --git a/ydb/core/blob_depot/group_metrics_exchange.cpp b/ydb/core/blob_depot/group_metrics_exchange.cpp index f10696815a4..052e5833954 100644 --- a/ydb/core/blob_depot/group_metrics_exchange.cpp +++ b/ydb/core/blob_depot/group_metrics_exchange.cpp @@ -104,28 +104,28 @@ namespace NKikimr::NBlobDepot { BytesRead += record.GetBytesRead(); BytesWritten += record.GetBytesWritten(); MetricsQ.emplace_back(TActivationContext::Monotonic(), BytesRead, BytesWritten); + UpdateThroughputs(false); } - void TBlobDepot::UpdateThroughputs() { + void TBlobDepot::UpdateThroughputs(bool reschedule) { static constexpr TDuration Window = TDuration::Seconds(3); if (Config.HasVirtualGroupId() && !MetricsQ.empty()) { const TMonotonic now = TActivationContext::Monotonic(); const TMonotonic left = now - Window; const auto comp = [](TMonotonic x, const auto& y) { return x < std::get<0>(y); }; - const auto it = std::upper_bound(MetricsQ.begin(), MetricsQ.end(), left, comp); - if (it != MetricsQ.begin()) { // interpolate + if (const auto it = std::upper_bound(MetricsQ.begin(), MetricsQ.end(), left, comp); it != MetricsQ.begin()) { MetricsQ.erase(MetricsQ.begin(), std::prev(it)); // remove all obsolete entries - Y_VERIFY(MetricsQ.size() >= 2); - const auto& [xTimestamp, xRead, xWritten] = MetricsQ[0]; - auto& [yTimestamp, yRead, yWritten] = MetricsQ[1]; - Y_VERIFY(xTimestamp <= left && left < yTimestamp); - const ui64 scale = 1'000'000; - const ui64 factor = (left - xTimestamp).MicroSeconds() * scale / (yTimestamp - xTimestamp).MicroSeconds(); - yTimestamp = left; - yRead = xRead + (yRead - xRead) * factor / scale; - yWritten = xWritten + (yWritten - xWritten) * factor / scale; - MetricsQ.pop_front(); + if (MetricsQ.size() >= 2) { + auto& [xTimestamp, xRead, xWritten] = MetricsQ[0]; + const auto& [yTimestamp, yRead, yWritten] = MetricsQ[1]; + Y_VERIFY(xTimestamp <= left && left < yTimestamp); + static constexpr ui64 scale = 1'000'000; + const ui64 factor = (left - xTimestamp).MicroSeconds() * scale / (yTimestamp - xTimestamp).MicroSeconds(); + xTimestamp = left; + xRead += (yRead - xRead) * factor / scale; + xWritten += (yWritten - xWritten) * factor / scale; + } } ui64 readThroughput = 0; @@ -144,8 +144,10 @@ namespace NKikimr::NBlobDepot { Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), ev.release()); } - TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateThroughputs, 0, - SelfId(), {}, nullptr, 0)); + if (reschedule) { + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateThroughputs, 0, + SelfId(), {}, nullptr, 0)); + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/space_monitor.cpp b/ydb/core/blob_depot/space_monitor.cpp index 2809d3a4c79..033fdcd6594 100644 --- a/ydb/core/blob_depot/space_monitor.cpp +++ b/ydb/core/blob_depot/space_monitor.cpp @@ -23,9 +23,31 @@ namespace NKikimr::NBlobDepot { group.StatusFlags = msg.StatusFlags; group.ApproximateFreeSpaceShare = msg.ApproximateFreeSpaceShare; Self->InvalidateGroupForAllocation(groupId); + + if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) { + HandleYellowChannels(); + } } } + void TSpaceMonitor::HandleYellowChannels() { + TVector<ui32> yellowMove, yellowStop; + + for (const auto& [groupId, group] : Groups) { + if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) { + yellowMove.insert(yellowMove.end(), group.Channels.begin(), group.Channels.end()); + } else if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop)) { + yellowStop.insert(yellowMove.end(), group.Channels.begin(), group.Channels.end()); + } + } + + Y_VERIFY(yellowMove || yellowStop); + STLOG(PRI_INFO, BLOB_DEPOT, BDT28, "asking to reassign channels", (Id, Self->GetLogId()), + (YellowMove, FormatList(yellowMove)), + (YellowStop, FormatList(yellowStop))); + Self->Executor()->OnYellowChannels(std::move(yellowMove), std::move(yellowStop)); + } + void TSpaceMonitor::Kick() { if (Groups.empty()) { Init(); @@ -49,11 +71,11 @@ namespace NKikimr::NBlobDepot { } const TChannelKind& kind = it->second; for (const auto& [channel, group] : kind.ChannelGroups) { - Groups.try_emplace(group); + Groups[group].Channels.push_back(channel); } } - ui64 TSpaceMonitor::GetGroupAllocationWeight(ui32 groupId) const { + ui64 TSpaceMonitor::GetGroupAllocationWeight(ui32 groupId, bool stopOnLightYellow) const { const auto it = Groups.find(groupId); if (it == Groups.end()) { Y_VERIFY_DEBUG(false); @@ -61,8 +83,10 @@ namespace NKikimr::NBlobDepot { } const TGroupRecord& group = it->second; - if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) { - return 0; // do not write data to this group + if (group.StatusFlags.Check(stopOnLightYellow + ? NKikimrBlobStorage::StatusDiskSpaceLightYellowMove + : NKikimrBlobStorage::StatusDiskSpaceYellowStop)) { + return 0; } if (!group.ApproximateFreeSpaceShare) { // not collected yet? diff --git a/ydb/core/blob_depot/space_monitor.h b/ydb/core/blob_depot/space_monitor.h index 8f4330063ed..d3876a3db70 100644 --- a/ydb/core/blob_depot/space_monitor.h +++ b/ydb/core/blob_depot/space_monitor.h @@ -12,6 +12,7 @@ namespace NKikimr::NBlobDepot { bool StatusRequestInFlight = false; TStorageStatusFlags StatusFlags; float ApproximateFreeSpaceShare = 0.0f; + std::vector<ui8> Channels; }; std::unordered_map<ui32, TGroupRecord> Groups; @@ -27,13 +28,14 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvStatusResult::TPtr ev); void Kick(); - ui64 GetGroupAllocationWeight(ui32 groupId) const; + ui64 GetGroupAllocationWeight(ui32 groupId, bool stopOnLightYellow) const; void SetSpaceColor(NKikimrBlobStorage::TPDiskSpaceColor::E spaceColor, float approximateFreeSpaceShare); NKikimrBlobStorage::TPDiskSpaceColor::E GetSpaceColor() const; float GetApproximateFreeSpaceShare() const; private: void Init(); + void HandleYellowChannels(); }; } // NKikimr::NBlobDepot |