diff options
author | alexvru <alexvru@ydb.tech> | 2022-08-16 22:02:35 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-08-16 22:02:35 +0300 |
commit | 33d3585006743476f5c5df72f192c47d141ea6f7 (patch) | |
tree | 7060cea3045cc622920faa0b38f54960ec4518b3 | |
parent | 54cb0f7a108ced72164520dee2637522fe2f7c81 (diff) | |
download | ydb-33d3585006743476f5c5df72f192c47d141ea6f7.tar.gz |
BlobDepot work in progress: data decommission and group deletion
-rw-r--r-- | ydb/core/base/blobstorage.h | 6 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 24 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 213 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.h | 28 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 15 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 14 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 14 | ||||
-rw-r--r-- | ydb/core/blob_depot/schema.h | 1 | ||||
-rw-r--r-- | ydb/core/blobstorage/base/blobstorage_events.h | 18 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp | 3 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/bsc.cpp | 1 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/config.cpp | 5 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/impl.h | 4 | ||||
-rw-r--r-- | ydb/core/mind/bscontroller/virtual_group.cpp | 63 | ||||
-rw-r--r-- | ydb/core/protos/blobstorage.proto | 9 |
19 files changed, 394 insertions, 36 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 50b464aaf5..6bb4523ac9 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -788,6 +788,8 @@ struct TEvBlobStorage { EvControllerScrubQuantumFinished, EvControllerScrubReportQuantumInProgress, EvControllerUpdateNodeDrives, + EvControllerGroupDecommittedNotify, + EvControllerGroupDecommittedResponse, // EvControllerReadSchemeStringResult = EvPut + 12 * 512, // EvControllerReadDataStringResult, @@ -1801,6 +1803,8 @@ struct TEvBlobStorage { bool IsMultiCollectAllowed; bool IsMonitored = true; + bool Decommission = false; + ui32 RestartCounter = 0; TEvCollectGarbage(ui64 tabletId, ui32 recordGeneration, ui32 perGenerationCounter, ui32 channel, @@ -2241,6 +2245,8 @@ struct TEvBlobStorage { struct TEvResponseControllerInfo; struct TEvTestLoadRequest; struct TEvTestLoadResponse; + struct TEvControllerGroupDecommittedNotify; + struct TEvControllerGroupDecommittedResponse; struct TEvMonStreamQuery; struct TEvMonStreamActorDeathNote; diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 40c4fe2283..623c474168 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -120,17 +120,21 @@ namespace NKikimr::NBlobDepot { } void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) { - const auto& info = ev->Get()->Info; - Y_VERIFY(info); - Y_VERIFY(info->BlobDepotId); - if (TabletId != *info->BlobDepotId) { - TabletId = *info->BlobDepotId; - if (TabletId) { - ConnectToBlobDepot(); + if (const auto& info = ev->Get()->Info) { + Y_VERIFY(info->BlobDepotId); + if (TabletId != *info->BlobDepotId) { + TabletId = *info->BlobDepotId; + if (TabletId) { + ConnectToBlobDepot(); + } + + for (auto& ev : std::exchange(PendingEventQ, {})) { + TActivationContext::Send(ev.release()); + } } - - for (auto& ev : std::exchange(PendingEventQ, {})) { - TActivationContext::Send(ev.release()); + if (!info->GetTotalVDisksNum()) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0)); + return; } } diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index 864b5a1266..e07da12a97 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -8,14 +8,14 @@ namespace NKikimr::NBlobDepot { auto& block = Blocks[tabletId]; const TMonotonic now = TActivationContext::Monotonic(); if (generation <= block.BlockedGeneration) { - status = NKikimrProto::ALREADY; + status = NKikimrProto::BLOCKED; } else if (now < block.ExpirationTimestamp) { if (blockedGeneration) { *blockedGeneration = block.BlockedGeneration; } status = NKikimrProto::OK; } - if (status != NKikimrProto::ALREADY && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) { + if (status != NKikimrProto::BLOCKED && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) { NKikimrBlobDepot::TEvQueryBlocks queryBlocks; queryBlocks.AddTabletIds(tabletId); Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>( diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 2ad5a5cae7..04d6a0af6c 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot { // zero step -- for decommission blobs just issue put immediately if (msg.Decommission) { - IssuePuts(); + return IssuePuts(); } // first step -- check blocks diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index 7f7e218cdb..ba66d03b68 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -37,8 +37,8 @@ namespace NKikimr::NBlobDepot { } } - SendRequest(); Become(&TThis::StateFunc); + Action(); } void TAssimilator::PassAway() { @@ -51,15 +51,34 @@ namespace NKikimr::NBlobDepot { switch (const ui32 type = ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvAssimilateResult, Handle); + hFunc(TEvBlobStorage::TEvGetResult, Handle); + hFunc(TEvBlobStorage::TEvPutResult, Handle); + hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle); + cFunc(TEvPrivate::EvResume, Action); default: Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); - STLOG(PRI_CRIT, BLOB_DEPOT, BDTxx, "unexpected event", (Id, Self->GetLogId()), (Type, type)); + STLOG(PRI_CRIT, BLOB_DEPOT, BDT00, "unexpected event", (Id, Self->GetLogId()), (Type, type)); break; } } - void TAssimilator::SendRequest() { + void TAssimilator::Action() { + if (Self->DecommitState < EDecommitState::BlobsFinished) { + SendAssimilateRequest(); + } else if (Self->DecommitState < EDecommitState::BlobsCopied) { + ScanDataForCopying(); + } else if (Self->DecommitState == EDecommitState::BlobsCopied) { + CreatePipe(); + } else if (Self->DecommitState != EDecommitState::Done) { + Y_UNREACHABLE(); + } + } + + void TAssimilator::SendAssimilateRequest() { SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), new TEvBlobStorage::TEvAssimilate(SkipBlocksUpTo, SkipBarriersUpTo, SkipBlobsUpTo)); } @@ -95,15 +114,15 @@ namespace NKikimr::NBlobDepot { } for (const auto& block : Ev->Blocks) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block)); Self->Self->BlocksManager->AddBlockOnDecommit(block, txc); } for (const auto& barrier : Ev->Barriers) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier)); Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc); } for (const auto& blob : Ev->Blobs) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob)); Self->Self->Data->AddDataOnDecommit(blob, txc); } @@ -134,17 +153,189 @@ namespace NKikimr::NBlobDepot { Self->Self->ProcessRegisterAgentQ(); } - if (EDecommitState::BlobsFinished <= Self->Self->DecommitState) { - // finished metadata replication - } else { - Self->SendRequest(); - } + Self->Action(); } }; Self->Execute(std::make_unique<TTxPutAssimilatedData>(this, ev)); } + void TAssimilator::ScanDataForCopying() { + const bool fromTheBeginning = !LastScannedKey; + + TData::TKey lastScannedKey; + if (LastScannedKey) { + lastScannedKey = TData::TKey(*LastScannedKey); + } + + struct TScanQueueItem { + TLogoBlobID Key; + TLogoBlobID OriginalBlobId; + }; + std::deque<TScanQueueItem> scanQ; + ui32 totalSize = 0; + + auto callback = [&](const TData::TKey& key, const TData::TValue& value) { + if (!value.OriginalBlobId) { + LastScannedKey.emplace(key.GetBlobId()); + return true; // keep scanning + } else if (const TLogoBlobID& id = *value.OriginalBlobId; scanQ.empty() || + scanQ.front().OriginalBlobId.TabletID() == id.TabletID()) { + LastScannedKey.emplace(key.GetBlobId()); + scanQ.push_back({.Key = *LastScannedKey, .OriginalBlobId = id}); + totalSize += id.BlobSize(); + NeedfulBlobs.insert(id); + return totalSize < MaxSizeToQuery; + } else { + return false; // a blob belonging to different tablet + } + }; + + // FIXME: reentrable as it shares mailbox with the BlobDepot tablet itself + Self->Data->ScanRange(LastScannedKey ? &lastScannedKey : nullptr, nullptr, {}, callback); + + if (!scanQ.empty()) { + using TQuery = TEvBlobStorage::TEvGet::TQuery; + const ui32 sz = scanQ.size(); + TArrayHolder<TQuery> queries(new TQuery[sz]); + TQuery *query = queries.Get(); + for (const TScanQueueItem& item : scanQ) { + query->Set(item.OriginalBlobId); + ++query; + } + auto ev = std::make_unique<TEvBlobStorage::TEvGet>(queries, sz, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead); + ev->Decommission = true; + SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release()); + } else if (fromTheBeginning) { + OnCopyDone(); + } else { + // restart the scan from the beginning and find other keys to copy or finish it + LastScannedKey.reset(); + TActivationContext::Send(new IEventHandle(TEvPrivate::EvResume, 0, SelfId(), {}, nullptr, 0)); + } + } + + void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { + auto& msg = *ev->Get(); + for (ui32 i = 0; i < msg.ResponseSz; ++i) { + auto& resp = msg.Responses[i]; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT34, "got TEvGetResult", (Id, Self->GetLogId()), (BlobId, resp.Id), + (Status, resp.Status)); + if (resp.Status == NKikimrProto::OK) { + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(resp.Id, resp.Buffer, TInstant::Max()); + ev->Decommission = true; + SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), ev.release()); + ++NumPutsInFlight; + } + } + if (!NumPutsInFlight) { + Action(); + } + } + + void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg)); + if (msg.Status == NKikimrProto::OK) { + const size_t numErased = NeedfulBlobs.erase(msg.Id); + Y_VERIFY(numErased == 1); + } + if (!--NumPutsInFlight) { + IssueCollects(); + } + } + + void TAssimilator::IssueCollects() { + // FIXME: do it + Action(); + } + + void TAssimilator::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev) { + (void)ev; + } + + void TAssimilator::OnCopyDone() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT38, "data copying is done", (Id, Self->GetLogId())); + + class TTxFinishCopying : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + TAssimilator* const Self; + + public: + TTxFinishCopying(TAssimilator *self) + : TTransactionBase(self->Self) + , Self(self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + Self->Self->DecommitState = EDecommitState::BlobsCopied; + db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( + NIceDb::TUpdate<Schema::Config::DecommitState>(Self->Self->DecommitState) + ); + return true; + } + + void Complete(const TActorContext&) override { + Self->Action(); + } + }; + + Self->Execute(std::make_unique<TTxFinishCopying>(this)); + } + + void TAssimilator::CreatePipe() { + const TGroupID groupId(Self->Config.GetDecommitGroupId()); + const ui64 tabletId = MakeBSControllerID(groupId.AvailabilityDomainID()); + PipeId = Register(NTabletPipe::CreateClient(SelfId(), tabletId, NTabletPipe::TClientRetryPolicy::WithRetries())); + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobStorage::TEvControllerGroupDecommittedNotify(groupId.GetRaw())); + } + + void TAssimilator::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientConnected", (Id, Self->GetLogId()), (Status, msg.Status)); + if (msg.Status != NKikimrProto::OK) { + CreatePipe(); + } + } + + void TAssimilator::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr /*ev*/) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientDestroyed", (Id, Self->GetLogId())); + CreatePipe(); + } + + void TAssimilator::Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev) { + auto& msg = *ev->Get(); + const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvControllerGroupDecommittedResponse", (Id, Self->GetLogId()), + (Status, status)); + if (status == NKikimrProto::OK) { + class TTxFinishDecommission : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + public: + TTxFinishDecommission(TAssimilator *self) + : TTransactionBase(self->Self) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + Self->DecommitState = EDecommitState::Done; + db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( + NIceDb::TUpdate<Schema::Config::DecommitState>(Self->DecommitState) + ); + return true; + } + + void Complete(const TActorContext&) override {} + }; + + Self->GroupAssimilatorId = {}; + Self->Execute(std::make_unique<TTxFinishDecommission>(this)); + PassAway(); + } else { + NTabletPipe::CloseAndForgetClient(SelfId(), PipeId); + Action(); + } + } + TString TAssimilator::SerializeAssimilatorState() const { TStringStream stream; diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h index bab6d171bf..2a1a711728 100644 --- a/ydb/core/blob_depot/assimilator.h +++ b/ydb/core/blob_depot/assimilator.h @@ -6,6 +6,12 @@ namespace NKikimr::NBlobDepot { class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> { + struct TEvPrivate { + enum { + EvResume = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + }; + std::weak_ptr<TToken> Token; TBlobDepot *Self; @@ -13,6 +19,15 @@ namespace NKikimr::NBlobDepot { std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo; std::optional<TLogoBlobID> SkipBlobsUpTo; + std::optional<TLogoBlobID> LastScannedKey; + std::set<TLogoBlobID> NeedfulBlobs; // in current tablet, original blob ids + + static constexpr ui32 MaxSizeToQuery = 10'000'000; + + ui32 NumPutsInFlight = 0; + + TActorId PipeId; + public: TGroupAssimilator(TBlobDepot *self) : Token(self->Token) @@ -26,8 +41,19 @@ namespace NKikimr::NBlobDepot { STATEFN(StateFunc); private: - void SendRequest(); + void Action(); + void SendAssimilateRequest(); void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev); + void ScanDataForCopying(); + void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); + void Handle(TEvBlobStorage::TEvPutResult::TPtr ev); + void IssueCollects(); + void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); + void OnCopyDone(); + void CreatePipe(); + void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); + void Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev); TString SerializeAssimilatorState() const; }; diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 937daf5abe..cf92fccc7a 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -5,6 +5,11 @@ namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; + NKikimrBlobDepot::EKeepState TData::GetKeepState(const TKey& key) const { + const auto it = Data.find(key); + return it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default; + } + TData::TRecordsPerChannelGroup& TData::GetRecordsPerChannelGroup(TLogoBlobID id) { TTabletStorageInfo *info = Self->Info(); const ui32 groupId = info->GroupFor(id.Channel(), id.Generation()); @@ -35,10 +40,11 @@ namespace NKikimr::NBlobDepot { TKey key(blob.Id); - NKikimrBlobDepot::EKeepState keepState = blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default; - if (const auto it = Data.find(key); it != Data.end()) { - keepState = Max(keepState, it->second.KeepState); - } + // calculate keep state for this blob + const auto it = Data.find(key); + const NKikimrBlobDepot::EKeepState keepState = Max(it != Data.end() ? it->second.KeepState : NKikimrBlobDepot::EKeepState::Default, + blob.DoNotKeep ? NKikimrBlobDepot::EKeepState::DoNotKeep : + blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default); NKikimrBlobDepot::TValue value; value.SetKeepState(keepState); @@ -53,6 +59,7 @@ namespace NKikimr::NBlobDepot { db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(valueData); PutKey(key, TValue(std::move(value))); + LastAssimilatedKey = key; } void TData::AddTrashOnLoad(TLogoBlobID id) { diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index e6f83d2056..95ef9aa44b 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -286,6 +286,7 @@ namespace NKikimr::NBlobDepot { THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash; std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order + std::optional<TKey> LastAssimilatedKey; THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed @@ -332,6 +333,8 @@ namespace NKikimr::NBlobDepot { } } + NKikimrBlobDepot::EKeepState GetKeepState(const TKey& key) const; + TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id); void AddDataOnLoad(TKey key, TString value); diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 56574ad072..6b14a919d5 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -258,6 +258,20 @@ namespace NKikimr::NBlobDepot { void TData::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (Id, Self->GetLogId()), (Msg, ev->Get()->ToString()), (Sender, ev->Sender), (Cookie, ev->Cookie)); + + if (Self->Config.HasDecommitGroupId() && Self->DecommitState <= EDecommitState::BlobsFinished) { + for (const auto& item : ev->Get()->Record.GetItems()) { + std::optional<TKey> end = item.HasEndingKey() + ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config)) + : std::nullopt; + + if (!end || !LastAssimilatedKey || *LastAssimilatedKey < *end) { + // see if we have to query BS for this range and to apply items here + Y_VERIFY_DEBUG(false, "going to return corrupt data"); + } + } + } + Self->Execute(std::make_unique<TTxResolve>(Self, ev)); } diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 5a8524a267..714142fddf 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -26,6 +26,8 @@ namespace NKikimr::NBlobDepot { const ui32 generation = Self->Executor()->Generation(); for (const auto& item : Request->Get()->Record.GetItems()) { + auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); + auto *responseItem = responseRecord->AddItems(); responseItem->SetStatus(NKikimrProto::OK); @@ -33,6 +35,9 @@ namespace NKikimr::NBlobDepot { if (item.HasMeta()) { value.SetMeta(item.GetMeta()); } + if (const auto keepState = Self->Data->GetKeepState(key); keepState != NKikimrBlobDepot::EKeepState::Default) { + value.SetKeepState(keepState); + } auto *chain = value.AddValueChain(); auto *locator = chain->MutableLocator(); locator->CopyFrom(item.GetBlobLocator()); @@ -54,9 +59,7 @@ namespace NKikimr::NBlobDepot { continue; } - auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); - - if (!CheckKeyAgainstBarrier(key)) { + if (!CheckKeyAgainstBarrier(key, value)) { responseItem->SetStatus(NKikimrProto::ERROR); responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString() << " is being put beyond the barrier"); @@ -93,10 +96,11 @@ namespace NKikimr::NBlobDepot { } } - bool CheckKeyAgainstBarrier(const TData::TKey& key) { + bool CheckKeyAgainstBarrier(const TData::TKey& key, const NKikimrBlobDepot::TValue& value) { const auto& v = key.AsVariant(); const auto *id = std::get_if<TLogoBlobID>(&v); - return !id || Self->BarrierServer->CheckBlobForBarrier(*id); + return !id || Self->BarrierServer->CheckBlobForBarrier(*id) || + value.GetKeepState() == NKikimrBlobDepot::EKeepState::Keep; } void Complete(const TActorContext&) override { diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index 0ff0ecd678..65248b2a46 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -11,6 +11,7 @@ namespace NKikimr::NBlobDepot { BarriersFinished = 2, BlobsFinished = 3, BlobsCopied = 4, + Done = 5, }; struct Schema : NIceDb::Schema { diff --git a/ydb/core/blobstorage/base/blobstorage_events.h b/ydb/core/blobstorage/base/blobstorage_events.h index bd36342fbc..7e7fa781ee 100644 --- a/ydb/core/blobstorage/base/blobstorage_events.h +++ b/ydb/core/blobstorage/base/blobstorage_events.h @@ -524,6 +524,24 @@ namespace NKikimr { } }; + struct TEvBlobStorage::TEvControllerGroupDecommittedNotify : TEventPB<TEvControllerGroupDecommittedNotify, + NKikimrBlobStorage::TEvControllerGroupDecommittedNotify, EvControllerGroupDecommittedNotify> { + TEvControllerGroupDecommittedNotify() = default; + + TEvControllerGroupDecommittedNotify(ui32 groupId) { + Record.SetGroupId(groupId); + } + }; + + struct TEvBlobStorage::TEvControllerGroupDecommittedResponse : TEventPB<TEvControllerGroupDecommittedResponse, + NKikimrBlobStorage::TEvControllerGroupDecommittedResponse, EvControllerGroupDecommittedResponse> { + TEvControllerGroupDecommittedResponse() = default; + + TEvControllerGroupDecommittedResponse(NKikimrProto::EReplyStatus status) { + Record.SetStatus(status); + } + }; + struct TEvNodeWardenQueryGroupInfo : TEventPB<TEvNodeWardenQueryGroupInfo, NKikimrBlobStorage::TEvNodeWardenQueryGroupInfo, TEvBlobStorage::EvNodeWardenQueryGroupInfo> { TEvNodeWardenQueryGroupInfo() = default; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp index 492d57bb0b..d5875e7095 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp @@ -24,6 +24,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc const ui32 CollectStep; const bool Hard; const bool Collect; + const bool Decommission; TGroupQuorumTracker QuorumTracker; TInstant StartTime; @@ -127,6 +128,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc auto ev = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(TabletId, RecordGeneration, PerGenerationCounter, Channel, Collect, CollectGeneration, CollectStep, Keep.release(), DoNotKeep.release(), Deadline, false, Hard); ev->RestartCounter = counter; + ev->Decommission = Decommission; return ev; } @@ -157,6 +159,7 @@ public: , CollectStep(ev->CollectStep) , Hard(ev->Hard) , Collect(ev->Collect) + , Decommission(ev->Decommission) , QuorumTracker(Info.Get()) , StartTime(now) {} diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp index d338291c6f..28d623e7d7 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp @@ -28,6 +28,7 @@ class TBlobStorageGroupMultiCollectRequest const ui32 CollectStep; const bool Hard; const bool Collect; + const bool Decommission; ui64 FlagRequestsInFlight; ui64 CollectRequestsInFlight; @@ -108,6 +109,7 @@ public: , CollectStep(ev->CollectStep) , Hard(ev->Hard) , Collect(ev->Collect) + , Decommission(ev->Decommission) , FlagRequestsInFlight(0) , CollectRequestsInFlight(0) , StartTime(now) @@ -152,6 +154,7 @@ public: TabletId, RecordGeneration, PerGenerationCounter + idx, Channel, isCollect, CollectGeneration, CollectStep, keepPart.release(), doNotKeepPart.release(), Deadline, false, Hard)); + ev->Decommission = Decommission; // retain decommission flag R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx << " isLast# " << isLast << " ev# " << ev->ToString()); diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp index 02c57a38f8..7393fb33bc 100644 --- a/ydb/core/mind/bscontroller/bsc.cpp +++ b/ydb/core/mind/bscontroller/bsc.cpp @@ -262,6 +262,7 @@ ui32 TBlobStorageController::GetEventPriority(IEventHandle *ev) { case TEvBlobStorage::EvControllerNodeReport: return 1; case TEvBlobStorage::EvControllerProposeGroupKey: return 1; case TEvBlobStorage::EvControllerGetGroup: return 1; + case TEvBlobStorage::EvControllerGroupDecommittedNotify: return 1; // auxiliary messages that are not usually urgent (also includes RW transactions in TConfigRequest and UpdateDiskStatus) case TEvBlobStorage::EvControllerGroupReconfigureWipe: return 2; diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp index 46b20dc4e4..d3f8e6369f 100644 --- a/ydb/core/mind/bscontroller/config.cpp +++ b/ydb/core/mind/bscontroller/config.cpp @@ -237,8 +237,9 @@ namespace NKikimr::NBsController { meta->SetCurrentGeneration(cur.Generation); } } - Y_VERIFY(prev.VDisksInGroup.size() == cur.VDisksInGroup.size()); - for (size_t i = 0; i < prev.VDisksInGroup.size(); ++i) { + Y_VERIFY(prev.VDisksInGroup.size() == cur.VDisksInGroup.size() || + (cur.VDisksInGroup.empty() && cur.DecommitStatus == NKikimrBlobStorage::TGroupDecommitStatus::DONE)); + for (size_t i = 0; i < cur.VDisksInGroup.size(); ++i) { const TVSlotInfo& prevSlot = *prev.VDisksInGroup[i]; const TVSlotInfo& curSlot = *cur.VDisksInGroup[i]; if (prevSlot.VSlotId != curSlot.VSlotId) { diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index f1f5d66443..5afdba9c65 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -1857,6 +1857,7 @@ public: hFunc(TEvBlobStorage::TEvControllerScrubQueryStartQuantum, Handle); hFunc(TEvBlobStorage::TEvControllerScrubQuantumFinished, Handle); hFunc(TEvBlobStorage::TEvControllerScrubReportQuantumInProgress, Handle); + hFunc(TEvBlobStorage::TEvControllerGroupDecommittedNotify, Handle); cFunc(TEvPrivate::EvScrub, ScrubState.HandleTimer); cFunc(TEvPrivate::EvVSlotReadyUpdate, VSlotReadyUpdate); } @@ -1900,6 +1901,7 @@ public: fFunc(TEvBlobStorage::EvControllerScrubQueryStartQuantum, EnqueueIncomingEvent); fFunc(TEvBlobStorage::EvControllerScrubQuantumFinished, EnqueueIncomingEvent); fFunc(TEvBlobStorage::EvControllerScrubReportQuantumInProgress, EnqueueIncomingEvent); + fFunc(TEvBlobStorage::EvControllerGroupDecommittedNotify, EnqueueIncomingEvent); fFunc(TEvPrivate::EvScrub, EnqueueIncomingEvent); fFunc(TEvPrivate::EvVSlotReadyUpdate, EnqueueIncomingEvent); cFunc(TEvPrivate::EvVSlotNotReadyHistogramUpdate, VSlotNotReadyHistogramUpdate); @@ -1993,6 +1995,8 @@ public: void StartVirtualGroupSetupMachine(TGroupInfo *group); + void Handle(TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr ev); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // VSLOT READINESS EVALUATION diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index 91afd86726..b64acaee9a 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -442,4 +442,67 @@ namespace NKikimr::NBsController { group->VirtualGroupSetupMachineId = RegisterWithSameMailbox(new TVirtualGroupSetupMachine(this, *group)); } + void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr ev) { + class TTxDecommitGroup : public TTransactionBase<TBlobStorageController> { + TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr Ev; + std::optional<TConfigState> State; + NKikimrProto::EReplyStatus Status = NKikimrProto::OK; + TString ErrorReason; + + public: + TTxDecommitGroup(TBlobStorageController *controller, TEvBlobStorage::TEvControllerGroupDecommittedNotify::TPtr ev) + : TTransactionBase(controller) + , Ev(ev) + {} + +// TTxType GetTxType() const override { return NBlobStorageController::TXTYPE_DROP_DONOR; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + State.emplace(*Self, Self->HostRecords, TActivationContext::Now()); + State->CheckConsistency(); + Action(*State); + State->CheckConsistency(); + TString error; + if (State->Changed() && !Self->CommitConfigUpdates(*State, false, false, txc, &error)) { + State->Rollback(); + State.reset(); + } + return true; + } + + void Action(TConfigState& state) { + const ui32 groupId = Ev->Get()->Record.GetGroupId(); + TGroupInfo *group = state.Groups.FindForUpdate(groupId); + if (!group) { + std::tie(Status, ErrorReason) = std::make_tuple(NKikimrProto::ERROR, "group not found"); + return; + } else if (group->DecommitStatus == NKikimrBlobStorage::TGroupDecommitStatus::DONE) { + Status = NKikimrProto::ALREADY; + } else if (group->DecommitStatus != NKikimrBlobStorage::TGroupDecommitStatus::IN_PROGRESS) { + std::tie(Status, ErrorReason) = std::make_tuple(NKikimrProto::ERROR, "group is not being decommitted"); + } else { + for (const TVSlotInfo *vslot : group->VDisksInGroup) { + state.DestroyVSlot(vslot->VSlotId); + } + group->VDisksInGroup.clear(); + group->DecommitStatus = NKikimrBlobStorage::TGroupDecommitStatus::DONE; + group->ContentChanged = true; + } + } + + void Complete(const TActorContext&) override { + if (State) { + State->ApplyConfigUpdates(); + } + auto ev = std::make_unique<TEvBlobStorage::TEvControllerGroupDecommittedResponse>(Status); + if (ErrorReason) { + ev->Record.SetErrorReason(ErrorReason); + } + Self->Send(Ev->Sender, ev.release(), 0, Ev->Cookie); + } + }; + + Execute(new TTxDecommitGroup(this, ev)); + } + } // NKikimr::NBsController diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index d0009a446e..ec552aca45 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1341,6 +1341,15 @@ message TEvControllerNodeReport { repeated TVDiskReport VDiskReports = 2; } +message TEvControllerGroupDecommittedNotify { + optional uint32 GroupId = 1; +} + +message TEvControllerGroupDecommittedResponse { + optional NKikimrProto.EReplyStatus Status = 1; + optional string ErrorReason = 2; +} + message TEvTestLoadRequest { // an item for interval distribution setting message TIntervalInfo { |