summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-02-28 10:22:53 +0300
committeralexvru <[email protected]>2023-02-28 10:22:53 +0300
commit6340e68e824b903a970de91f1b9fbf142a9ecc48 (patch)
tree12baf6095d216fe9837b6bf551dd694dadbadadf
parent5a7fcf0461802ef2229d6611dd6ce4886fbb9993 (diff)
Handle out-of-space condition for BlobDepot
-rw-r--r--ydb/core/blob_depot/agent.cpp50
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h4
-rw-r--r--ydb/core/blob_depot/agent/channel_kind.cpp7
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp8
-rw-r--r--ydb/core/blob_depot/assimilator.cpp27
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp29
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h4
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp35
-rw-r--r--ydb/core/blob_depot/group_metrics_exchange.cpp32
-rw-r--r--ydb/core/blob_depot/space_monitor.cpp32
-rw-r--r--ydb/core/blob_depot/space_monitor.h4
12 files changed, 144 insertions, 92 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index edc014ed8fc..5e59be28a0f 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -134,35 +134,35 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Executor()->Generation();
auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, ev->Get()->Record.GetChannelKind(), generation);
- auto *givenIdRange = record->MutableGivenIdRange();
-
std::vector<ui8> channels(ev->Get()->Record.GetCount());
- PickChannels(record->GetChannelKind(), channels);
-
- THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges;
- for (ui8 channelIndex : channels) {
- TChannelInfo& channel = Channels[channelIndex];
- const ui64 value = channel.NextBlobSeqId++;
-
- // fill in range item
- auto& range = issuedRanges[channelIndex];
- if (!range || range->GetEnd() != value) {
- range = givenIdRange->AddChannelRanges();
- range->SetChannel(channelIndex);
- range->SetBegin(value);
+ if (PickChannels(record->GetChannelKind(), channels)) {
+ auto *givenIdRange = record->MutableGivenIdRange();
+
+ THashMap<ui8, NKikimrBlobDepot::TGivenIdRange::TChannelRange*> issuedRanges;
+ for (ui8 channelIndex : channels) {
+ TChannelInfo& channel = Channels[channelIndex];
+ const ui64 value = channel.NextBlobSeqId++;
+
+ // fill in range item
+ auto& range = issuedRanges[channelIndex];
+ if (!range || range->GetEnd() != value) {
+ range = givenIdRange->AddChannelRanges();
+ range->SetChannel(channelIndex);
+ range->SetBegin(value);
+ }
+ range->SetEnd(value + 1);
}
- range->SetEnd(value + 1);
- }
- // register issued ranges in agent and global records
- TAgent& agent = GetAgent(ev->Recipient);
- for (const auto& range : givenIdRange->GetChannelRanges()) {
- agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
- Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
+ // register issued ranges in agent and global records
+ TAgent& agent = GetAgent(ev->Recipient);
+ for (const auto& range : givenIdRange->GetChannelRanges()) {
+ agent.GivenIdRanges[range.GetChannel()].IssueNewRange(range.GetBegin(), range.GetEnd());
+ Channels[range.GetChannel()].GivenIdRanges.IssueNewRange(range.GetBegin(), range.GetEnd());
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()),
- (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()),
- (Begin, range.GetBegin()), (End, range.GetEnd()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "IssueNewRange", (Id, GetLogId()),
+ (AgentId, agent.Connection->NodeId), (Channel, range.GetChannel()),
+ (Begin, range.GetBegin()), (End, range.GetEnd()));
+ }
}
TActivationContext::Send(response.release());
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index c998e1dfb5c..ebb975bf883 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -343,7 +343,7 @@ namespace NKikimr::NBlobDepot {
virtual void OnUpdateBlock() {}
virtual void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus /*status*/, TString /*dataOrErrorReason*/) {}
- virtual void OnIdAllocated() {}
+ virtual void OnIdAllocated(bool /*success*/) {}
virtual void OnDestroy(bool /*success*/) {}
protected: // reading logic
@@ -441,7 +441,7 @@ namespace NKikimr::NBlobDepot {
void RebuildHeap();
void EnqueueQueryWaitingForId(TQuery *query);
- void ProcessQueriesWaitingForId();
+ void ProcessQueriesWaitingForId(bool success);
};
THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp
index d28fcde537f..3451ce6097f 100644
--- a/ydb/core/blob_depot/agent/channel_kind.cpp
+++ b/ydb/core/blob_depot/agent/channel_kind.cpp
@@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot {
NumAvailableItems += range.GetEnd() - range.GetBegin();
}
- ProcessQueriesWaitingForId();
+ ProcessQueriesWaitingForId(true);
}
ui32 TBlobDepotAgent::TChannelKind::GetNumAvailableItems() const {
@@ -32,6 +32,7 @@ namespace NKikimr::NBlobDepot {
}
}
if (options.empty()) {
+ agent.IssueAllocateIdsIfNeeded(*this);
return std::nullopt;
}
@@ -67,10 +68,10 @@ namespace NKikimr::NBlobDepot {
QueriesWaitingForId.PushBack(query);
}
- void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId() {
+ void TBlobDepotAgent::TChannelKind::ProcessQueriesWaitingForId(bool success) {
TIntrusiveList<TQuery, TPendingId> temp;
temp.Swap(QueriesWaitingForId);
- temp.ForEach([&](TQuery *query) { query->OnIdAllocated(); });
+ temp.ForEach([&](TQuery *query) { query->OnIdAllocated(success); });
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 2682683f4e6..843bb4a0ba1 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -92,7 +92,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::IssueAllocateIdsIfNeeded(TChannelKind& kind) {
- if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && PipeId) {
+ if (!kind.IdAllocInFlight && kind.GetNumAvailableItems() < 100 && IsConnected) {
const ui64 id = NextTabletRequestId++;
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "IssueAllocateIdsIfNeeded", (AgentId, LogId),
(ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(kind.Kind)),
@@ -119,6 +119,8 @@ namespace NKikimr::NBlobDepot {
if (msg.HasGivenIdRange()) {
kind.IssueGivenIdRange(msg.GetGivenIdRange());
+ } else {
+ kind.ProcessQueriesWaitingForId(false);
}
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "TEvAllocateIdsResult", (AgentId, LogId), (Msg, msg),
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 9efff1c64f8..856d4e90198 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -189,8 +189,12 @@ namespace NKikimr::NBlobDepot {
CheckBlocks(); // just restart request
}
- void OnIdAllocated() override {
- IssuePuts();
+ void OnIdAllocated(bool success) override {
+ if (success) {
+ IssuePuts();
+ } else {
+ EndWithError(NKikimrProto::ERROR, "out of space");
+ }
}
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index 41e91e49e2d..850b94c8f9e 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -372,19 +372,20 @@ namespace NKikimr::NBlobDepot {
(Status, resp.Status), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size()));
if (resp.Status == NKikimrProto::OK) {
std::vector<ui8> channels(1);
- Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels);
- TChannelInfo& channel = Self->Channels[channels.front()];
- const ui64 value = channel.NextBlobSeqId++;
- const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value);
- const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize());
- const ui64 putId = NextPutId++;
- SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId);
- const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing
- Y_VERIFY(inserted);
- const bool inserted1 = PutIdToKey.try_emplace(putId, TData::TKey(resp.Id), it->first).second;
- Y_VERIFY(inserted1);
- ++it->second;
- getBytes += id.BlobSize();
+ if (!Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels)) {
+ TChannelInfo& channel = Self->Channels[channels.front()];
+ const ui64 value = channel.NextBlobSeqId++;
+ const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value);
+ const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, resp.Id.BlobSize());
+ const ui64 putId = NextPutId++;
+ SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, resp.Buffer, TInstant::Max()), putId);
+ const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing
+ Y_VERIFY(inserted);
+ const bool inserted1 = PutIdToKey.try_emplace(putId, TData::TKey(resp.Id), it->first).second;
+ Y_VERIFY(inserted1);
+ ++it->second;
+ }
+ getBytes += resp.Id.BlobSize();
} else if (resp.Status == NKikimrProto::NODATA) {
Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(resp.Id),
TEvPrivate::EvTxComplete, SelfId(), it->first);
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index aa815386804..60aa159111e 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -183,22 +183,31 @@ namespace NKikimr::NBlobDepot {
}
}
- void TBlobDepot::PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels) {
+ bool TBlobDepot::PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels) {
const auto kindIt = ChannelKinds.find(kind);
Y_VERIFY(kindIt != ChannelKinds.end());
auto& kindv = kindIt->second;
if (kindv.GroupAccumWeights.empty()) {
- // recalculate group weights
- ui64 accum = 0;
- THashSet<ui32> seenGroups;
- for (const auto& [channel, groupId] : kindv.ChannelGroups) {
- if (const auto& [_, inserted] = seenGroups.insert(groupId); inserted) {
- accum += SpaceMonitor->GetGroupAllocationWeight(groupId);
- kindv.GroupAccumWeights.emplace_back(groupId, accum);
+ for (const bool stopOnLightYellow : {true, false}) {
+ // recalculate group weights
+ ui64 accum = 0;
+ THashSet<ui32> seenGroups;
+ for (const auto& [channel, groupId] : kindv.ChannelGroups) {
+ if (const auto& [_, inserted] = seenGroups.insert(groupId); inserted) {
+ if (const ui64 w = SpaceMonitor->GetGroupAllocationWeight(groupId, stopOnLightYellow)) {
+ accum += w;
+ kindv.GroupAccumWeights.emplace_back(groupId, accum);
+ }
+ }
+ }
+ if (!kindv.GroupAccumWeights.empty()) {
+ break;
}
}
- Y_VERIFY(!kindv.GroupAccumWeights.empty());
+ if (kindv.GroupAccumWeights.empty()) {
+ return false; // no allocation possible
+ }
}
const auto [_, accum] = kindv.GroupAccumWeights.back();
@@ -220,6 +229,8 @@ namespace NKikimr::NBlobDepot {
const size_t channelIndex = RandomNumber(channels.size());
channel = channels[channelIndex];
}
+
+ return true;
}
IActor *CreateBlobDepot(const TActorId& tablet, TTabletStorageInfo *info) {
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index fc0ab0ea7f2..277257d211a 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -200,7 +200,7 @@ namespace NKikimr::NBlobDepot {
void InitChannelKinds();
void InvalidateGroupForAllocation(ui32 groupId);
- void PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels);
+ bool PickChannels(NKikimrBlobDepot::TChannelKind::E kind, std::vector<ui8>& channels);
TString GetLogId() const {
const auto *executor = Executor();
@@ -317,7 +317,7 @@ namespace NKikimr::NBlobDepot {
void DoGroupMetricsExchange();
void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev);
void Handle(TEvBlobDepot::TEvPushMetrics::TPtr ev);
- void UpdateThroughputs();
+ void UpdateThroughputs(bool reschedule = true);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Validation
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
index 4a90a0b60c0..bda6c479615 100644
--- a/ydb/core/blob_depot/data_decommit.cpp
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -237,20 +237,25 @@ namespace NKikimr::NBlobDepot {
void IssuePut(TKey key, TString&& buffer, bool keep, bool doNotKeep) {
std::vector<ui8> channels(1);
- Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels);
- TChannelInfo& channel = Self->Channels[channels.front()];
- const ui64 value = channel.NextBlobSeqId++;
- const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value);
- const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size());
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender),
- (Cookie, Ev->Cookie), (Key, key), (BlobId, id));
- SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()),
- (ui64)keep | (ui64)doNotKeep << 1);
- const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing
- Y_VERIFY(inserted);
- const bool inserted1 = IdToKey.try_emplace(id, std::move(key)).second;
- Y_VERIFY(inserted1);
- ++PutsInFlight;
+ if (Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels)) {
+ TChannelInfo& channel = Self->Channels[channels.front()];
+ const ui64 value = channel.NextBlobSeqId++;
+ const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value);
+ const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size());
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (Key, key), (BlobId, id));
+ SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()),
+ (ui64)keep | (ui64)doNotKeep << 1);
+ const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing
+ Y_VERIFY(inserted);
+ const bool inserted1 = IdToKey.try_emplace(id, std::move(key)).second;
+ Y_VERIFY(inserted1);
+ ++PutsInFlight;
+ } else { // we couldn't restore this blob -- there was no place to write it to
+ ResolutionErrors.insert(key.GetBlobId());
+ ++PutsInFlight;
+ HandleTxComplete();
+ }
}
void Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
@@ -271,7 +276,7 @@ namespace NKikimr::NBlobDepot {
TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep);
if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item
- ResolutionErrors.insert(msg.Id);
+ ResolutionErrors.insert(key.GetBlobId());
}
}
diff --git a/ydb/core/blob_depot/group_metrics_exchange.cpp b/ydb/core/blob_depot/group_metrics_exchange.cpp
index f10696815a4..052e5833954 100644
--- a/ydb/core/blob_depot/group_metrics_exchange.cpp
+++ b/ydb/core/blob_depot/group_metrics_exchange.cpp
@@ -104,28 +104,28 @@ namespace NKikimr::NBlobDepot {
BytesRead += record.GetBytesRead();
BytesWritten += record.GetBytesWritten();
MetricsQ.emplace_back(TActivationContext::Monotonic(), BytesRead, BytesWritten);
+ UpdateThroughputs(false);
}
- void TBlobDepot::UpdateThroughputs() {
+ void TBlobDepot::UpdateThroughputs(bool reschedule) {
static constexpr TDuration Window = TDuration::Seconds(3);
if (Config.HasVirtualGroupId() && !MetricsQ.empty()) {
const TMonotonic now = TActivationContext::Monotonic();
const TMonotonic left = now - Window;
const auto comp = [](TMonotonic x, const auto& y) { return x < std::get<0>(y); };
- const auto it = std::upper_bound(MetricsQ.begin(), MetricsQ.end(), left, comp);
- if (it != MetricsQ.begin()) { // interpolate
+ if (const auto it = std::upper_bound(MetricsQ.begin(), MetricsQ.end(), left, comp); it != MetricsQ.begin()) {
MetricsQ.erase(MetricsQ.begin(), std::prev(it)); // remove all obsolete entries
- Y_VERIFY(MetricsQ.size() >= 2);
- const auto& [xTimestamp, xRead, xWritten] = MetricsQ[0];
- auto& [yTimestamp, yRead, yWritten] = MetricsQ[1];
- Y_VERIFY(xTimestamp <= left && left < yTimestamp);
- const ui64 scale = 1'000'000;
- const ui64 factor = (left - xTimestamp).MicroSeconds() * scale / (yTimestamp - xTimestamp).MicroSeconds();
- yTimestamp = left;
- yRead = xRead + (yRead - xRead) * factor / scale;
- yWritten = xWritten + (yWritten - xWritten) * factor / scale;
- MetricsQ.pop_front();
+ if (MetricsQ.size() >= 2) {
+ auto& [xTimestamp, xRead, xWritten] = MetricsQ[0];
+ const auto& [yTimestamp, yRead, yWritten] = MetricsQ[1];
+ Y_VERIFY(xTimestamp <= left && left < yTimestamp);
+ static constexpr ui64 scale = 1'000'000;
+ const ui64 factor = (left - xTimestamp).MicroSeconds() * scale / (yTimestamp - xTimestamp).MicroSeconds();
+ xTimestamp = left;
+ xRead += (yRead - xRead) * factor / scale;
+ xWritten += (yWritten - xWritten) * factor / scale;
+ }
}
ui64 readThroughput = 0;
@@ -144,8 +144,10 @@ namespace NKikimr::NBlobDepot {
Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), ev.release());
}
- TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateThroughputs, 0,
- SelfId(), {}, nullptr, 0));
+ if (reschedule) {
+ TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateThroughputs, 0,
+ SelfId(), {}, nullptr, 0));
+ }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/space_monitor.cpp b/ydb/core/blob_depot/space_monitor.cpp
index 2809d3a4c79..033fdcd6594 100644
--- a/ydb/core/blob_depot/space_monitor.cpp
+++ b/ydb/core/blob_depot/space_monitor.cpp
@@ -23,9 +23,31 @@ namespace NKikimr::NBlobDepot {
group.StatusFlags = msg.StatusFlags;
group.ApproximateFreeSpaceShare = msg.ApproximateFreeSpaceShare;
Self->InvalidateGroupForAllocation(groupId);
+
+ if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) {
+ HandleYellowChannels();
+ }
}
}
+ void TSpaceMonitor::HandleYellowChannels() {
+ TVector<ui32> yellowMove, yellowStop;
+
+ for (const auto& [groupId, group] : Groups) {
+ if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) {
+ yellowMove.insert(yellowMove.end(), group.Channels.begin(), group.Channels.end());
+ } else if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop)) {
+ yellowStop.insert(yellowMove.end(), group.Channels.begin(), group.Channels.end());
+ }
+ }
+
+ Y_VERIFY(yellowMove || yellowStop);
+ STLOG(PRI_INFO, BLOB_DEPOT, BDT28, "asking to reassign channels", (Id, Self->GetLogId()),
+ (YellowMove, FormatList(yellowMove)),
+ (YellowStop, FormatList(yellowStop)));
+ Self->Executor()->OnYellowChannels(std::move(yellowMove), std::move(yellowStop));
+ }
+
void TSpaceMonitor::Kick() {
if (Groups.empty()) {
Init();
@@ -49,11 +71,11 @@ namespace NKikimr::NBlobDepot {
}
const TChannelKind& kind = it->second;
for (const auto& [channel, group] : kind.ChannelGroups) {
- Groups.try_emplace(group);
+ Groups[group].Channels.push_back(channel);
}
}
- ui64 TSpaceMonitor::GetGroupAllocationWeight(ui32 groupId) const {
+ ui64 TSpaceMonitor::GetGroupAllocationWeight(ui32 groupId, bool stopOnLightYellow) const {
const auto it = Groups.find(groupId);
if (it == Groups.end()) {
Y_VERIFY_DEBUG(false);
@@ -61,8 +83,10 @@ namespace NKikimr::NBlobDepot {
}
const TGroupRecord& group = it->second;
- if (group.StatusFlags.Check(NKikimrBlobStorage::StatusDiskSpaceLightYellowMove)) {
- return 0; // do not write data to this group
+ if (group.StatusFlags.Check(stopOnLightYellow
+ ? NKikimrBlobStorage::StatusDiskSpaceLightYellowMove
+ : NKikimrBlobStorage::StatusDiskSpaceYellowStop)) {
+ return 0;
}
if (!group.ApproximateFreeSpaceShare) { // not collected yet?
diff --git a/ydb/core/blob_depot/space_monitor.h b/ydb/core/blob_depot/space_monitor.h
index 8f4330063ed..d3876a3db70 100644
--- a/ydb/core/blob_depot/space_monitor.h
+++ b/ydb/core/blob_depot/space_monitor.h
@@ -12,6 +12,7 @@ namespace NKikimr::NBlobDepot {
bool StatusRequestInFlight = false;
TStorageStatusFlags StatusFlags;
float ApproximateFreeSpaceShare = 0.0f;
+ std::vector<ui8> Channels;
};
std::unordered_map<ui32, TGroupRecord> Groups;
@@ -27,13 +28,14 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvStatusResult::TPtr ev);
void Kick();
- ui64 GetGroupAllocationWeight(ui32 groupId) const;
+ ui64 GetGroupAllocationWeight(ui32 groupId, bool stopOnLightYellow) const;
void SetSpaceColor(NKikimrBlobStorage::TPDiskSpaceColor::E spaceColor, float approximateFreeSpaceShare);
NKikimrBlobStorage::TPDiskSpaceColor::E GetSpaceColor() const;
float GetApproximateFreeSpaceShare() const;
private:
void Init();
+ void HandleYellowChannels();
};
} // NKikimr::NBlobDepot