aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-08-16 22:02:35 +0300
committeralexvru <alexvru@ydb.tech>2022-08-16 22:02:35 +0300
commit33d3585006743476f5c5df72f192c47d141ea6f7 (patch)
tree7060cea3045cc622920faa0b38f54960ec4518b3
parent54cb0f7a108ced72164520dee2637522fe2f7c81 (diff)
downloadydb-33d3585006743476f5c5df72f192c47d141ea6f7.tar.gz
BlobDepot work in progress: data decommission and group deletion
-rw-r--r--ydb/core/base/blobstorage.h6
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h24
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp2
-rw-r--r--ydb/core/blob_depot/assimilator.cpp213
-rw-r--r--ydb/core/blob_depot/assimilator.h28
-rw-r--r--ydb/core/blob_depot/data.cpp15
-rw-r--r--ydb/core/blob_depot/data.h3
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp14
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp14
-rw-r--r--ydb/core/blob_depot/schema.h1
-rw-r--r--ydb/core/blobstorage/base/blobstorage_events.h18
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp3
-rw-r--r--ydb/core/mind/bscontroller/bsc.cpp1
-rw-r--r--ydb/core/mind/bscontroller/config.cpp5
-rw-r--r--ydb/core/mind/bscontroller/impl.h4
-rw-r--r--ydb/core/mind/bscontroller/virtual_group.cpp63
-rw-r--r--ydb/core/protos/blobstorage.proto9
19 files changed, 394 insertions, 36 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 50b464aaf5..6bb4523ac9 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -788,6 +788,8 @@ struct TEvBlobStorage {
EvControllerScrubQuantumFinished,
EvControllerScrubReportQuantumInProgress,
EvControllerUpdateNodeDrives,
+ EvControllerGroupDecommittedNotify,
+ EvControllerGroupDecommittedResponse,
// EvControllerReadSchemeStringResult = EvPut + 12 * 512,
// EvControllerReadDataStringResult,
@@ -1801,6 +1803,8 @@ struct TEvBlobStorage {
bool IsMultiCollectAllowed;
bool IsMonitored = true;
+ bool Decommission = false;
+
ui32 RestartCounter = 0;
TEvCollectGarbage(ui64 tabletId, ui32 recordGeneration, ui32 perGenerationCounter, ui32 channel,
@@ -2241,6 +2245,8 @@ struct TEvBlobStorage {
struct TEvResponseControllerInfo;
struct TEvTestLoadRequest;
struct TEvTestLoadResponse;
+ struct TEvControllerGroupDecommittedNotify;
+ struct TEvControllerGroupDecommittedResponse;
struct TEvMonStreamQuery;
struct TEvMonStreamActorDeathNote;
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 40c4fe2283..623c474168 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -120,17 +120,21 @@ namespace NKikimr::NBlobDepot {
}
void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) {
- const auto& info = ev->Get()->Info;
- Y_VERIFY(info);
- Y_VERIFY(info->BlobDepotId);
- if (TabletId != *info->BlobDepotId) {
- TabletId = *info->BlobDepotId;
- if (TabletId) {
- ConnectToBlobDepot();
+ if (const auto& info = ev->Get()->Info) {
+ Y_VERIFY(info->BlobDepotId);
+ if (TabletId != *info->BlobDepotId) {
+ TabletId = *info->BlobDepotId;
+ if (TabletId) {
+ ConnectToBlobDepot();
+ }
+
+ for (auto& ev : std::exchange(PendingEventQ, {})) {
+ TActivationContext::Send(ev.release());
+ }
}
-
- for (auto& ev : std::exchange(PendingEventQ, {})) {
- TActivationContext::Send(ev.release());
+ if (!info->GetTotalVDisksNum()) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0));
+ return;
}
}
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
index 864b5a1266..e07da12a97 100644
--- a/ydb/core/blob_depot/agent/blocks.cpp
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -8,14 +8,14 @@ namespace NKikimr::NBlobDepot {
auto& block = Blocks[tabletId];
const TMonotonic now = TActivationContext::Monotonic();
if (generation <= block.BlockedGeneration) {
- status = NKikimrProto::ALREADY;
+ status = NKikimrProto::BLOCKED;
} else if (now < block.ExpirationTimestamp) {
if (blockedGeneration) {
*blockedGeneration = block.BlockedGeneration;
}
status = NKikimrProto::OK;
}
- if (status != NKikimrProto::ALREADY && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) {
+ if (status != NKikimrProto::BLOCKED && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) {
NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
queryBlocks.AddTabletIds(tabletId);
Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>(
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 2ad5a5cae7..04d6a0af6c 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot {
// zero step -- for decommission blobs just issue put immediately
if (msg.Decommission) {
- IssuePuts();
+ return IssuePuts();
}
// first step -- check blocks
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index 7f7e218cdb..ba66d03b68 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -37,8 +37,8 @@ namespace NKikimr::NBlobDepot {
}
}
- SendRequest();
Become(&TThis::StateFunc);
+ Action();
}
void TAssimilator::PassAway() {
@@ -51,15 +51,34 @@ namespace NKikimr::NBlobDepot {
switch (const ui32 type = ev->GetTypeRewrite()) {
hFunc(TEvBlobStorage::TEvAssimilateResult, Handle);
+ hFunc(TEvBlobStorage::TEvGetResult, Handle);
+ hFunc(TEvBlobStorage::TEvPutResult, Handle);
+ hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle);
+ cFunc(TEvPrivate::EvResume, Action);
default:
Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
- STLOG(PRI_CRIT, BLOB_DEPOT, BDTxx, "unexpected event", (Id, Self->GetLogId()), (Type, type));
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDT00, "unexpected event", (Id, Self->GetLogId()), (Type, type));
break;
}
}
- void TAssimilator::SendRequest() {
+ void TAssimilator::Action() {
+ if (Self->DecommitState < EDecommitState::BlobsFinished) {
+ SendAssimilateRequest();
+ } else if (Self->DecommitState < EDecommitState::BlobsCopied) {
+ ScanDataForCopying();
+ } else if (Self->DecommitState == EDecommitState::BlobsCopied) {
+ CreatePipe();
+ } else if (Self->DecommitState != EDecommitState::Done) {
+ Y_UNREACHABLE();
+ }
+ }
+
+ void TAssimilator::SendAssimilateRequest() {
SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), new TEvBlobStorage::TEvAssimilate(SkipBlocksUpTo,
SkipBarriersUpTo, SkipBlobsUpTo));
}
@@ -95,15 +114,15 @@ namespace NKikimr::NBlobDepot {
}
for (const auto& block : Ev->Blocks) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block));
Self->Self->BlocksManager->AddBlockOnDecommit(block, txc);
}
for (const auto& barrier : Ev->Barriers) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier));
Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc);
}
for (const auto& blob : Ev->Blobs) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob));
Self->Self->Data->AddDataOnDecommit(blob, txc);
}
@@ -134,17 +153,189 @@ namespace NKikimr::NBlobDepot {
Self->Self->ProcessRegisterAgentQ();
}
- if (EDecommitState::BlobsFinished <= Self->Self->DecommitState) {
- // finished metadata replication
- } else {
- Self->SendRequest();
- }
+ Self->Action();
}
};
Self->Execute(std::make_unique<TTxPutAssimilatedData>(this, ev));
}
+ void TAssimilator::ScanDataForCopying() {
+ const bool fromTheBeginning = !LastScannedKey;
+
+ TData::TKey lastScannedKey;
+ if (LastScannedKey) {
+ lastScannedKey = TData::TKey(*LastScannedKey);
+ }
+
+ struct TScanQueueItem {
+ TLogoBlobID Key;
+ TLogoBlobID OriginalBlobId;
+ };
+ std::deque<TScanQueueItem> scanQ;
+ ui32 totalSize = 0;
+
+ auto callback = [&](const TData::TKey& key, const TData::TValue& value) {
+ if (!value.OriginalBlobId) {
+ LastScannedKey.emplace(key.GetBlobId());
+ return true; // keep scanning
+ } else if (const TLogoBlobID& id = *value.OriginalBlobId; scanQ.empty() ||
+ scanQ.front().OriginalBlobId.TabletID() == id.TabletID()) {
+ LastScannedKey.emplace(key.GetBlobId());
+ scanQ.push_back({.Key = *LastScannedKey, .OriginalBlobId = id});
+ totalSize += id.BlobSize();
+ NeedfulBlobs.insert(id);
+ return totalSize < MaxSizeToQuery;
+ } else {
+ return false; // a blob belonging to different tablet
+ }
+ };
+
+ // FIXME: reentrable as it shares mailbox with the BlobDepot tablet itself
+ Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback);
+
+ if (!scanQ.empty()) {
+ using TQuery = TEvBlobStorage::TEvGet::TQuery;
+ const ui32 sz = scanQ.size();
+ TArrayHolder<TQuery> queries(new TQuery[sz]);
+ TQuery *query = queries.Get();
+ for (const TScanQueueItem& item : scanQ) {
+ query->Set(item.OriginalBlobId);
+ ++query;
+ }
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, sz, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead);
+ ev->Decommission = true;
+ SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release());
+ } else if (fromTheBeginning) {
+ OnCopyDone();
+ } else {
+ // restart the scan from the beginning and find other keys to copy or finish it
+ LastScannedKey.reset();
+ TActivationContext::Send(new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, nullptr, 0));
+ }
+ }
+
+ void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
+ auto& msg = *ev->Get();
+ for (ui32 i = 0; i < msg.ResponseSz; ++i) {
+ auto& resp = msg.Responses[i];
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT34, "got TEvGetResult", (Id, Self->GetLogId()), (BlobId, resp.Id),
+ (Status, resp.Status));
+ if (resp.Status == NKikimrProto::OK) {
+ auto ev = std::make_unique<TEvBlobStorage::TEvPut>(resp.Id, resp.Buffer, TInstant::Max());
+ ev->Decommission = true;
+ SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release());
+ ++NumPutsInFlight;
+ }
+ }
+ if (!NumPutsInFlight) {
+ Action();
+ }
+ }
+
+ void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg));
+ if (msg.Status == NKikimrProto::OK) {
+ const size_t numErased = NeedfulBlobs.erase(msg.Id);
+ Y_VERIFY(numErased == 1);
+ }
+ if (!--NumPutsInFlight) {
+ IssueCollects();
+ }
+ }
+
+ void TAssimilator::IssueCollects() {
+ // FIXME: do it
+ Action();
+ }
+
+ void TAssimilator::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) {
+ (void)ev;
+ }
+
+ void TAssimilator::OnCopyDone() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT38, "data copying is done", (Id, Self->GetLogId()));
+
+ class TTxFinishCopying : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ TAssimilator* const Self;
+
+ public:
+ TTxFinishCopying(TAssimilator *self)
+ : TTransactionBase(self->Self)
+ , Self(self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+ Self->Self->DecommitState = EDecommitState::BlobsCopied;
+ db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
+ NIceDb::TUpdate<Schema::Config::DecommitState>(Self->Self->DecommitState)
+ );
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ Self->Action();
+ }
+ };
+
+ Self->Execute(std::make_unique<TTxFinishCopying>(this));
+ }
+
+ void TAssimilator::CreatePipe() {
+ const TGroupID groupId(Self->Config.GetDecommitGroupId());
+ const ui64 tabletId = MakeBSControllerID(groupId.AvailabilityDomainID());
+ PipeId = Register(NTabletPipe::CreateClient(SelfId(), tabletId, NTabletPipe::TClientRetryPolicy::WithRetries()));
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobStorage::TEvControllerGroupDecommittedNotify(groupId.GetRaw()));
+ }
+
+ void TAssimilator::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientConnected", (Id, Self->GetLogId()), (Status, msg.Status));
+ if (msg.Status != NKikimrProto::OK) {
+ CreatePipe();
+ }
+ }
+
+ void TAssimilator::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr /*ev*/) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientDestroyed", (Id, Self->GetLogId()));
+ CreatePipe();
+ }
+
+ void TAssimilator::Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev) {
+ auto& msg = *ev->Get();
+ const NKikimrProto::EReplyStatus status = msg.Record.GetStatus();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvControllerGroupDecommittedResponse", (Id, Self->GetLogId()),
+ (Status, status));
+ if (status == NKikimrProto::OK) {
+ class TTxFinishDecommission : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ public:
+ TTxFinishDecommission(TAssimilator *self)
+ : TTransactionBase(self->Self)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+ Self->DecommitState = EDecommitState::Done;
+ db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
+ NIceDb::TUpdate<Schema::Config::DecommitState>(Self->DecommitState)
+ );
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {}
+ };
+
+ Self->GroupAssimilatorId = {};
+ Self->Execute(std::make_unique<TTxFinishDecommission>(this));
+ PassAway();
+ } else {
+ NTabletPipe::CloseAndForgetClient(SelfId(), PipeId);
+ Action();
+ }
+ }
+
TString TAssimilator::SerializeAssimilatorState() const {
TStringStream stream;
diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h
index bab6d171bf..2a1a711728 100644
--- a/ydb/core/blob_depot/assimilator.h
+++ b/ydb/core/blob_depot/assimilator.h
@@ -6,6 +6,12 @@
namespace NKikimr::NBlobDepot {
class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> {
+ struct TEvPrivate {
+ enum {
+ EvResume = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+ };
+
std::weak_ptr<TToken> Token;
TBlobDepot *Self;
@@ -13,6 +19,15 @@ namespace NKikimr::NBlobDepot {
std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo;
std::optional<TLogoBlobID> SkipBlobsUpTo;
+ std::optional<TLogoBlobID> LastScannedKey;
+ std::set<TLogoBlobID> NeedfulBlobs; // in current tablet, original blob ids
+
+ static constexpr ui32 MaxSizeToQuery = 10'000'000;
+
+ ui32 NumPutsInFlight = 0;
+
+ TActorId PipeId;
+
public:
TGroupAssimilator(TBlobDepot *self)
: Token(self->Token)
@@ -26,8 +41,19 @@ namespace NKikimr::NBlobDepot {
STATEFN(StateFunc);
private:
- void SendRequest();
+ void Action();
+ void SendAssimilateRequest();
void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev);
+ void ScanDataForCopying();
+ void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
+ void Handle(TEvBlobStorage::TEvPutResult::TPtr ev);
+ void IssueCollects();
+ void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
+ void OnCopyDone();
+ void CreatePipe();
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
+ void Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev);
TString SerializeAssimilatorState() const;
};
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 937daf5abe..cf92fccc7a 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -5,6 +5,11 @@ namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
+ NKikimrBlobDepot::EKeepState TData::GetKeepState(const TKey& key) const {
+ const auto it = Data.find(key);
+ return it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default;
+ }
+
TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(TLogoBlobID id) {
TTabletStorageInfo *info = Self->Info();
const ui32 groupId = info->GroupFor(id.Channel(), id.Generation());
@@ -35,10 +40,11 @@ namespace NKikimr::NBlobDepot {
TKey key(blob.Id);
- NKikimrBlobDepot::EKeepState keepState = blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default;
- if (const auto it = Data.find(key); it != Data.end()) {
- keepState = Max(keepState, it->second.KeepState);
- }
+ // calculate keep state for this blob
+ const auto it = Data.find(key);
+ const NKikimrBlobDepot::EKeepState keepState = Max(it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default,
+ blob.DoNotKeep ? NKikimrBlobDepot::EKeepState::DoNotKeep :
+ blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default);
NKikimrBlobDepot::TValue value;
value.SetKeepState(keepState);
@@ -53,6 +59,7 @@ namespace NKikimr::NBlobDepot {
db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(valueData);
PutKey(key, TValue(std::move(value)));
+ LastAssimilatedKey = key;
}
void TData::AddTrashOnLoad(TLogoBlobID id) {
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index e6f83d2056..95ef9aa44b 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -286,6 +286,7 @@ namespace NKikimr::NBlobDepot {
THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash;
std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order
+ std::optional<TKey> LastAssimilatedKey;
THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
@@ -332,6 +333,8 @@ namespace NKikimr::NBlobDepot {
}
}
+ NKikimrBlobDepot::EKeepState GetKeepState(const TKey& key) const;
+
TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id);
void AddDataOnLoad(TKey key, TString value);
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index 56574ad072..6b14a919d5 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -258,6 +258,20 @@ namespace NKikimr::NBlobDepot {
void TData::Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (Id, Self->GetLogId()), (Msg, ev->Get()->ToString()),
(Sender, ev->Sender), (Cookie, ev->Cookie));
+
+ if (Self->Config.HasDecommitGroupId() && Self->DecommitState <= EDecommitState::BlobsFinished) {
+ for (const auto& item : ev->Get()->Record.GetItems()) {
+ std::optional<TKey> end = item.HasEndingKey()
+ ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config))
+ : std::nullopt;
+
+ if (!end || !LastAssimilatedKey || *LastAssimilatedKey < *end) {
+ // see if we have to query BS for this range and to apply items here
+ Y_VERIFY_DEBUG(false, "going to return corrupt data");
+ }
+ }
+ }
+
Self->Execute(std::make_unique<TTxResolve>(Self, ev));
}
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 5a8524a267..714142fddf 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -26,6 +26,8 @@ namespace NKikimr::NBlobDepot {
const ui32 generation = Self->Executor()->Generation();
for (const auto& item : Request->Get()->Record.GetItems()) {
+ auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
+
auto *responseItem = responseRecord->AddItems();
responseItem->SetStatus(NKikimrProto::OK);
@@ -33,6 +35,9 @@ namespace NKikimr::NBlobDepot {
if (item.HasMeta()) {
value.SetMeta(item.GetMeta());
}
+ if (const auto keepState = Self->Data->GetKeepState(key); keepState != NKikimrBlobDepot::EKeepState::Default) {
+ value.SetKeepState(keepState);
+ }
auto *chain = value.AddValueChain();
auto *locator = chain->MutableLocator();
locator->CopyFrom(item.GetBlobLocator());
@@ -54,9 +59,7 @@ namespace NKikimr::NBlobDepot {
continue;
}
- auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
-
- if (!CheckKeyAgainstBarrier(key)) {
+ if (!CheckKeyAgainstBarrier(key, value)) {
responseItem->SetStatus(NKikimrProto::ERROR);
responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString()
<< " is being put beyond the barrier");
@@ -93,10 +96,11 @@ namespace NKikimr::NBlobDepot {
}
}
- bool CheckKeyAgainstBarrier(const TData::TKey& key) {
+ bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value) {
const auto& v = key.AsVariant();
const auto *id = std::get_if<TLogoBlobID>(&v);
- return !id || Self->BarrierServer->CheckBlobForBarrier(*id);
+ return !id || Self->BarrierServer->CheckBlobForBarrier(*id) ||
+ value.GetKeepState() == NKikimrBlobDepot::EKeepState::Keep;
}
void Complete(const TActorContext&) override {
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index 0ff0ecd678..65248b2a46 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -11,6 +11,7 @@ namespace NKikimr::NBlobDepot {
BarriersFinished = 2,
BlobsFinished = 3,
BlobsCopied = 4,
+ Done = 5,
};
struct Schema : NIceDb::Schema {
diff --git a/ydb/core/blobstorage/base/blobstorage_events.h b/ydb/core/blobstorage/base/blobstorage_events.h
index bd36342fbc..7e7fa781ee 100644
--- a/ydb/core/blobstorage/base/blobstorage_events.h
+++ b/ydb/core/blobstorage/base/blobstorage_events.h
@@ -524,6 +524,24 @@ namespace NKikimr {
}
};
+ struct TEvBlobStorage::TEvControllerGroupDecommittedNotify : TEventPB<TEvControllerGroupDecommittedNotify,
+ NKikimrBlobStorage::TEvControllerGroupDecommittedNotify, EvControllerGroupDecommittedNotify> {
+ TEvControllerGroupDecommittedNotify() = default;
+
+ TEvControllerGroupDecommittedNotify(ui32 groupId) {
+ Record.SetGroupId(groupId);
+ }
+ };
+
+ struct TEvBlobStorage::TEvControllerGroupDecommittedResponse : TEventPB<TEvControllerGroupDecommittedResponse,
+ NKikimrBlobStorage::TEvControllerGroupDecommittedResponse, EvControllerGroupDecommittedResponse> {
+ TEvControllerGroupDecommittedResponse() = default;
+
+ TEvControllerGroupDecommittedResponse(NKikimrProto::EReplyStatus status) {
+ Record.SetStatus(status);
+ }
+ };
+
struct TEvNodeWardenQueryGroupInfo : TEventPB<TEvNodeWardenQueryGroupInfo, NKikimrBlobStorage::TEvNodeWardenQueryGroupInfo,
TEvBlobStorage::EvNodeWardenQueryGroupInfo> {
TEvNodeWardenQueryGroupInfo() = default;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
index 492d57bb0b..d5875e7095 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
@@ -24,6 +24,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
const ui32 CollectStep;
const bool Hard;
const bool Collect;
+ const bool Decommission;
TGroupQuorumTracker QuorumTracker;
TInstant StartTime;
@@ -127,6 +128,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(TabletId, RecordGeneration, PerGenerationCounter,
Channel, Collect, CollectGeneration, CollectStep, Keep.release(), DoNotKeep.release(), Deadline, false, Hard);
ev->RestartCounter = counter;
+ ev->Decommission = Decommission;
return ev;
}
@@ -157,6 +159,7 @@ public:
, CollectStep(ev->CollectStep)
, Hard(ev->Hard)
, Collect(ev->Collect)
+ , Decommission(ev->Decommission)
, QuorumTracker(Info.Get())
, StartTime(now)
{}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
index d338291c6f..28d623e7d7 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
@@ -28,6 +28,7 @@ class TBlobStorageGroupMultiCollectRequest
const ui32 CollectStep;
const bool Hard;
const bool Collect;
+ const bool Decommission;
ui64 FlagRequestsInFlight;
ui64 CollectRequestsInFlight;
@@ -108,6 +109,7 @@ public:
, CollectStep(ev->CollectStep)
, Hard(ev->Hard)
, Collect(ev->Collect)
+ , Decommission(ev->Decommission)
, FlagRequestsInFlight(0)
, CollectRequestsInFlight(0)
, StartTime(now)
@@ -152,6 +154,7 @@ public:
TabletId, RecordGeneration, PerGenerationCounter + idx, Channel,
isCollect, CollectGeneration, CollectStep, keepPart.release(), doNotKeepPart.release(), Deadline, false,
Hard));
+ ev->Decommission = Decommission; // retain decommission flag
R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx
<< " isLast# " << isLast
<< " ev# " << ev->ToString());
diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp
index 02c57a38f8..7393fb33bc 100644
--- a/ydb/core/mind/bscontroller/bsc.cpp
+++ b/ydb/core/mind/bscontroller/bsc.cpp
@@ -262,6 +262,7 @@ ui32 TBlobStorageController::GetEventPriority(IEventHandle *ev) {
case TEvBlobStorage::EvControllerNodeReport: return 1;
case TEvBlobStorage::EvControllerProposeGroupKey: return 1;
case TEvBlobStorage::EvControllerGetGroup: return 1;
+ case TEvBlobStorage::EvControllerGroupDecommittedNotify: return 1;
// auxiliary messages that are not usually urgent (also includes RW transactions in TConfigRequest and UpdateDiskStatus)
case TEvBlobStorage::EvControllerGroupReconfigureWipe: return 2;
diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp
index 46b20dc4e4..d3f8e6369f 100644
--- a/ydb/core/mind/bscontroller/config.cpp
+++ b/ydb/core/mind/bscontroller/config.cpp
@@ -237,8 +237,9 @@ namespace NKikimr::NBsController {
meta->SetCurrentGeneration(cur.Generation);
}
}
- Y_VERIFY(prev.VDisksInGroup.size() == cur.VDisksInGroup.size());
- for (size_t i = 0; i < prev.VDisksInGroup.size(); ++i) {
+ Y_VERIFY(prev.VDisksInGroup.size() == cur.VDisksInGroup.size() ||
+ (cur.VDisksInGroup.empty() && cur.DecommitStatus == NKikimrBlobStorage::TGroupDecommitStatus::DONE));
+ for (size_t i = 0; i < cur.VDisksInGroup.size(); ++i) {
const TVSlotInfo& prevSlot = *prev.VDisksInGroup[i];
const TVSlotInfo& curSlot = *cur.VDisksInGroup[i];
if (prevSlot.VSlotId != curSlot.VSlotId) {
diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h
index f1f5d66443..5afdba9c65 100644
--- a/ydb/core/mind/bscontroller/impl.h
+++ b/ydb/core/mind/bscontroller/impl.h
@@ -1857,6 +1857,7 @@ public:
hFunc(TEvBlobStorage::TEvControllerScrubQueryStartQuantum, Handle);
hFunc(TEvBlobStorage::TEvControllerScrubQuantumFinished, Handle);
hFunc(TEvBlobStorage::TEvControllerScrubReportQuantumInProgress, Handle);
+ hFunc(TEvBlobStorage::TEvControllerGroupDecommittedNotify, Handle);
cFunc(TEvPrivate::EvScrub, ScrubState.HandleTimer);
cFunc(TEvPrivate::EvVSlotReadyUpdate, VSlotReadyUpdate);
}
@@ -1900,6 +1901,7 @@ public:
fFunc(TEvBlobStorage::EvControllerScrubQueryStartQuantum, EnqueueIncomingEvent);
fFunc(TEvBlobStorage::EvControllerScrubQuantumFinished, EnqueueIncomingEvent);
fFunc(TEvBlobStorage::EvControllerScrubReportQuantumInProgress, EnqueueIncomingEvent);
+ fFunc(TEvBlobStorage::EvControllerGroupDecommittedNotify, EnqueueIncomingEvent);
fFunc(TEvPrivate::EvScrub, EnqueueIncomingEvent);
fFunc(TEvPrivate::EvVSlotReadyUpdate, EnqueueIncomingEvent);
cFunc(TEvPrivate::EvVSlotNotReadyHistogramUpdate, VSlotNotReadyHistogramUpdate);
@@ -1993,6 +1995,8 @@ public:
void StartVirtualGroupSetupMachine(TGroupInfo *group);
+ void Handle(TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr ev);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// VSLOT READINESS EVALUATION
diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp
index 91afd86726..b64acaee9a 100644
--- a/ydb/core/mind/bscontroller/virtual_group.cpp
+++ b/ydb/core/mind/bscontroller/virtual_group.cpp
@@ -442,4 +442,67 @@ namespace NKikimr::NBsController {
group->VirtualGroupSetupMachineId = RegisterWithSameMailbox(new TVirtualGroupSetupMachine(this, *group));
}
+ void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr ev) {
+ class TTxDecommitGroup : public TTransactionBase<TBlobStorageController> {
+ TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr Ev;
+ std::optional<TConfigState> State;
+ NKikimrProto::EReplyStatus Status = NKikimrProto::OK;
+ TString ErrorReason;
+
+ public:
+ TTxDecommitGroup(TBlobStorageController *controller, TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr ev)
+ : TTransactionBase(controller)
+ , Ev(ev)
+ {}
+
+// TTxType GetTxType() const override { return NBlobStorageController::TXTYPE_DROP_DONOR; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ State.emplace(*Self, Self->HostRecords, TActivationContext::Now());
+ State->CheckConsistency();
+ Action(*State);
+ State->CheckConsistency();
+ TString error;
+ if (State->Changed() && !Self->CommitConfigUpdates(*State, false, false, txc, &error)) {
+ State->Rollback();
+ State.reset();
+ }
+ return true;
+ }
+
+ void Action(TConfigState& state) {
+ const ui32 groupId = Ev->Get()->Record.GetGroupId();
+ TGroupInfo *group = state.Groups.FindForUpdate(groupId);
+ if (!group) {
+ std::tie(Status, ErrorReason) = std::make_tuple(NKikimrProto::ERROR, "group not found");
+ return;
+ } else if (group->DecommitStatus == NKikimrBlobStorage::TGroupDecommitStatus::DONE) {
+ Status = NKikimrProto::ALREADY;
+ } else if (group->DecommitStatus != NKikimrBlobStorage::TGroupDecommitStatus::IN_PROGRESS) {
+ std::tie(Status, ErrorReason) = std::make_tuple(NKikimrProto::ERROR, "group is not being decommitted");
+ } else {
+ for (const TVSlotInfo *vslot : group->VDisksInGroup) {
+ state.DestroyVSlot(vslot->VSlotId);
+ }
+ group->VDisksInGroup.clear();
+ group->DecommitStatus = NKikimrBlobStorage::TGroupDecommitStatus::DONE;
+ group->ContentChanged = true;
+ }
+ }
+
+ void Complete(const TActorContext&) override {
+ if (State) {
+ State->ApplyConfigUpdates();
+ }
+ auto ev = std::make_unique<TEvBlobStorage::TEvControllerGroupDecommittedResponse>(Status);
+ if (ErrorReason) {
+ ev->Record.SetErrorReason(ErrorReason);
+ }
+ Self->Send(Ev->Sender, ev.release(), 0, Ev->Cookie);
+ }
+ };
+
+ Execute(new TTxDecommitGroup(this, ev));
+ }
+
} // NKikimr::NBsController
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index d0009a446e..ec552aca45 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1341,6 +1341,15 @@ message TEvControllerNodeReport {
repeated TVDiskReport VDiskReports = 2;
}
+message TEvControllerGroupDecommittedNotify {
+ optional uint32 GroupId = 1;
+}
+
+message TEvControllerGroupDecommittedResponse {
+ optional NKikimrProto.EReplyStatus Status = 1;
+ optional string ErrorReason = 2;
+}
+
message TEvTestLoadRequest {
// an item for interval distribution setting
message TIntervalInfo {