diff options
author | alexvru <alexvru@ydb.tech> | 2023-01-28 19:07:24 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-01-28 19:07:24 +0300 |
commit | b4c07516d0e7248f1050c3cf3ad83057ab6d0593 (patch) | |
tree | 245097424f23e40178c28bf373f2f47c23091d66 | |
parent | e9ca29e3dfc6f7601dd4037785c75d8d3b6620f3 (diff) | |
download | ydb-b4c07516d0e7248f1050c3cf3ad83057ab6d0593.tar.gz |
Fix MustRestoreFirst logic in BlobDepot for decommitted groups
-rw-r--r-- | ydb/core/blob_depot/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.cpp | 40 | ||||
-rw-r--r-- | ydb/core/blob_depot/assimilator.h | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot.cpp | 7 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 22 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.h | 26 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_decommit.cpp | 322 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_resolve.cpp | 184 |
10 files changed, 398 insertions, 208 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.darwin.txt b/ydb/core/blob_depot/CMakeLists.darwin.txt index b7013389cf6..e1ac07e603d 100644 --- a/ydb/core/blob_depot/CMakeLists.darwin.txt +++ b/ydb/core/blob_depot/CMakeLists.darwin.txt @@ -23,6 +23,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp diff --git a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt index 04727d74a40..ae8c5e7a0d6 100644 --- a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp diff --git a/ydb/core/blob_depot/CMakeLists.linux.txt b/ydb/core/blob_depot/CMakeLists.linux.txt index 04727d74a40..ae8c5e7a0d6 100644 --- a/ydb/core/blob_depot/CMakeLists.linux.txt +++ b/ydb/core/blob_depot/CMakeLists.linux.txt @@ -24,6 +24,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index 94f13cc13d2..2a31d58e964 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -16,30 +16,36 @@ namespace NKikimr::NBlobDepot { }; }; - class TAssimilator::TTxCommitAssimilatedBlob : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - const TActorId AssimilatorId; + class TBlobDepot::TData::TTxCommitAssimilatedBlob : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { const NKikimrProto::EReplyStatus Status; const TBlobSeqId BlobSeqId; const TData::TKey Key; - const ui64 GetId; + const ui32 NotifyEventType; + const TActorId ParentId; + const ui64 Cookie; + const bool Keep; + const bool DoNotKeep; public: TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_COMMIT_ASSIMILATED_BLOB; } - TTxCommitAssimilatedBlob(TAssimilator *self, NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key, - ui64 getId) - : TTransactionBase(self->Self) - , AssimilatorId(self->SelfId()) + TTxCommitAssimilatedBlob(TBlobDepot *self, NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, + TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep) + : TTransactionBase(self) , Status(status) , BlobSeqId(blobSeqId) , Key(std::move(key)) - , GetId(getId) + , NotifyEventType(notifyEventType) + , ParentId(parentId) + , Cookie(cookie) + , Keep(keep) + , DoNotKeep(doNotKeep) {} bool Execute(TTransactionContext& txc, const TActorContext&) override { if (Status == NKikimrProto::OK) { Y_VERIFY(!Self->Data->CanBeCollected(BlobSeqId)); - Self->Data->BindToBlob(Key, BlobSeqId, txc, this); + Self->Data->BindToBlob(Key, BlobSeqId, Keep, DoNotKeep, txc, this); } else if (Status == NKikimrProto::NODATA) { if (const TData::TValue *value = Self->Data->FindKey(Key); value && value->GoingToAssimilate) { Self->Data->DeleteKey(Key, txc, this); @@ -60,7 +66,7 @@ namespace NKikimr::NBlobDepot { } } Self->Data->CommitTrash(this); - TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, GetId)); + TActivationContext::Send(new IEventHandle(NotifyEventType, 0, ParentId, {}, nullptr, Cookie)); } }; @@ -380,8 +386,8 @@ namespace NKikimr::NBlobDepot { ++it->second; getBytes += id.BlobSize(); } else if (resp.Status == NKikimrProto::NODATA) { - Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, NKikimrProto::NODATA, TBlobSeqId(), - TData::TKey(resp.Id), it->first)); + Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(resp.Id), + TEvPrivate::EvTxComplete, SelfId(), it->first); ++it->second; } } @@ -417,8 +423,8 @@ namespace NKikimr::NBlobDepot { const auto& [key, getId] = it->second; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size()), (Key, key)); - Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), - std::move(key), getId)); + Self->Data->ExecuteTxCommitAssimilatedBlob(msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), std::move(key), + TEvPrivate::EvTxComplete, SelfId(), getId); PutIdToKey.erase(it); } @@ -533,6 +539,12 @@ namespace NKikimr::NBlobDepot { return stream.Str(); } + void TBlobDepot::TData::ExecuteTxCommitAssimilatedBlob(NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, + TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep) { + Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(Self, status, blobSeqId, std::move(key), + notifyEventType, parentId, cookie, keep, doNotKeep)); + } + void TBlobDepot::StartGroupAssimilator() { if (Config.GetIsDecommittingGroup() && DecommitState != EDecommitState::Done) { Y_VERIFY(!GroupAssimilatorId); diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h index c4717325b59..bb54cd14a0d 100644 --- a/ydb/core/blob_depot/assimilator.h +++ b/ydb/core/blob_depot/assimilator.h @@ -42,8 +42,6 @@ namespace NKikimr::NBlobDepot { bool ActionInProgress = false; bool ResumeScanDataForCopyingInFlight = false; - class TTxCommitAssimilatedBlob; - public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BLOB_DEPOT_ASSIMILATOR_ACTOR; diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index 86661f95904..9a464fc5c13 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -48,7 +48,9 @@ namespace NKikimr::NBlobDepot { try { auto handleFromAgentPipe = [this](auto& ev) { const auto it = PipeServers.find(ev->Recipient); - Y_VERIFY(it != PipeServers.end()); + if (it == PipeServers.end()) { + return; // this may be a race with TEvServerDisconnected and postpone queue; it's okay to have this + } auto& info = it->second; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT69, "HandleFromAgentPipe", (Id, GetLogId()), (RequestId, ev->Cookie), @@ -87,8 +89,7 @@ namespace NKikimr::NBlobDepot { cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); - hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle); - hFunc(TEvBlobStorage::TEvGetResult, Data->Handle); + hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle); hFunc(TEvBlobStorage::TEvStatusResult, SpaceMonitor->Handle); cFunc(TEvPrivate::EvKickSpaceMonitor, KickSpaceMonitor); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 5c34c16cdfd..a19c348c383 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -195,6 +195,7 @@ namespace NKikimr::NBlobDepot { } const TData::TValue *TData::FindKey(const TKey& key) const { + Y_VERIFY(IsKeyLoaded(key)); const auto it = Data.find(key); return it != Data.end() ? &it->second : nullptr; } @@ -223,11 +224,20 @@ namespace NKikimr::NBlobDepot { }, item); } - void TData::BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "BindToBlob", (Id, Self->GetLogId()), (Key, key), (BlobSeqId, blobSeqId)); + void TData::BindToBlob(const TKey& key, TBlobSeqId blobSeqId, bool keep, bool doNotKeep, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "BindToBlob", (Id, Self->GetLogId()), (Key, key), (BlobSeqId, blobSeqId), + (Keep, keep), (DoNotKeep, doNotKeep)); Y_VERIFY(IsKeyLoaded(key)); - UpdateKey(key, txc, cookie, "BindToBlob", [&](TValue& value, bool inserted) { - if (inserted || value.GoingToAssimilate) { + UpdateKey(key, txc, cookie, "BindToBlob", [&](TValue& value, bool /*inserted*/) { + EUpdateOutcome outcome = EUpdateOutcome::NO_CHANGE; + if (doNotKeep && value.KeepState < EKeepState::DoNotKeep) { + value.KeepState = EKeepState::DoNotKeep; + outcome = EUpdateOutcome::CHANGE; + } else if (keep && value.KeepState < EKeepState::Keep) { + value.KeepState = EKeepState::Keep; + outcome = EUpdateOutcome::CHANGE; + } + if (value.ValueChain.empty()) { auto *chain = value.ValueChain.Add(); auto *locator = chain->MutableLocator(); locator->SetGroupId(Self->Info()->GroupFor(blobSeqId.Channel, blobSeqId.Generation)); @@ -235,9 +245,9 @@ namespace NKikimr::NBlobDepot { locator->SetTotalDataLen(key.GetBlobId().BlobSize()); locator->SetFooterLen(0); value.GoingToAssimilate = false; - return inserted ? EUpdateOutcome::DROP : EUpdateOutcome::CHANGE; + outcome = EUpdateOutcome::CHANGE; } - return EUpdateOutcome::NO_CHANGE; + return outcome; }); } diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 254293f2446..af9b1e3d143 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -398,13 +398,10 @@ namespace NKikimr::NBlobDepot { struct TResolveDecommitContext { TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request - ui32 NumRangesInFlight; + TActorId ReturnAfterLoadingKeys; std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {}; std::vector<TKey> ResolutionErrors = {}; - std::deque<TKey> DropNodataBlobs = {}; }; - ui64 LastResolveQueryId = 0; - THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts; class TTxIssueGC; class TTxConfirmGC; @@ -412,7 +409,6 @@ namespace NKikimr::NBlobDepot { class TTxDataLoad; class TTxLoadSpecificKeys; - class TTxResolve; class TResolveResultAccumulator; class TUncertaintyResolver; @@ -483,7 +479,8 @@ namespace NKikimr::NBlobDepot { void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); - void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); + void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, bool keep, bool doNotKeep, + NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void MakeKeyCertain(const TKey& key); void HandleCommitCertainKeys(); @@ -536,12 +533,21 @@ namespace NKikimr::NBlobDepot { bool IsLoaded() const { return Loaded; } bool IsKeyLoaded(const TKey& key) const { return Loaded || LoadedKeys[key]; } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + class TResolveDecommitActor; + IActor *CreateResolveDecommitActor(TEvBlobDepot::TEvResolve::TPtr ev); + + class TTxCommitAssimilatedBlob; + void ExecuteTxCommitAssimilatedBlob(NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key, + ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep = false, bool doNotKeep = false); + + class TTxResolve; + void ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context); + void Handle(TEvBlobDepot::TEvResolve::TPtr ev); - void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev); - void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); - template<typename TCallback> - void HandleResolutionResult(ui64 id, TCallback&& callback); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// ui64 GetTotalStoredDataSize() const { return TotalStoredDataSize; diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp new file mode 100644 index 00000000000..f7df4860fda --- /dev/null +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -0,0 +1,322 @@ +#include "data.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TData::TResolveDecommitActor : public TActorBootstrapped<TResolveDecommitActor> { + struct TEvPrivate { + enum { + EvTxComplete = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + }; + + TBlobDepot* const Self; + std::weak_ptr<TToken> Token; + TResolveDecommitContext Context; + TEvBlobDepot::TEvResolve::TPtr Ev; + + ui32 RangesInFlight = 0; + + std::deque<std::tuple<TLogoBlobID, bool>> GetQ; + ui32 GetsInFlight = 0; + ui32 GetBytesInFlight = 0; + static constexpr ui32 MaxGetsInFlight = 10; + static constexpr ui32 MaxGetBytesInFlight = 10'000'000; + + ui32 PutsInFlight = 0; + + THashMap<TLogoBlobID, TKey> IdToKey; + + public: + TResolveDecommitActor(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr ev) + : Self(self) + , Token(self->Token) + , Ev(ev) + {} + + void Bootstrap() { + if (Token.expired()) { + return; + } + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()), + (Sender, Ev->Sender), (Cookie, Ev->Cookie)); + + if (!Self->Data->Loaded) { + // issue request to a resolver to just load keys we are looking for + TResolveDecommitContext context; + context.ReturnAfterLoadingKeys = SelfId(); + Self->Data->ExecuteTxResolve(Ev, std::move(context)); + } else { + Handle(Ev); + } + } + + void Handle(TEvBlobDepot::TEvResolve::TPtr ev) { + Ev = ev; // store event back + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "keys loaded", (Id, Self->GetLogId()), + (Sender, Ev->Sender), (Cookie, Ev->Cookie)); + + for (const auto& item : Ev->Get()->Record.GetItems()) { + switch (item.GetKeyDesignatorCase()) { + case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: { + if (!item.HasTabletId()) { + return FinishWithError(NLog::PRI_CRIT, "incorrect request"); + } + + const ui64 tabletId = item.GetTabletId(); + const auto& range = item.GetKeyRange(); + + TLogoBlobID minId = range.HasBeginningKey() + ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId() + : TLogoBlobID(tabletId, 0, 0, 0, 0, 0); + + TLogoBlobID maxId = range.HasEndingKey() + ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId() + : TLogoBlobID(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, + TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, + TLogoBlobID::MaxCrcMode); + + Y_VERIFY(minId <= maxId); + + if (Self->Data->LastAssimilatedBlobId < maxId) { + // adjust minId to skip already assimilated items in range query + if (minId < Self->Data->LastAssimilatedBlobId) { + if (item.GetMustRestoreFirst()) { + ScanRangeAndIssueGets(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN); + } + minId = *Self->Data->LastAssimilatedBlobId; + } + + // issue scan query + IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst()); + } else if (item.GetMustRestoreFirst()) { + ScanRangeAndIssueGets(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END); + } + + break; + } + + case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: { + TData::TKey key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config); + const TValue *value = Self->Data->FindKey(key); + const bool doGet = (!value && key.GetBlobId() < Self->Data->LastAssimilatedBlobId) // value not yet assimilated + || (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet + if (doGet) { + IssueGet(key.GetBlobId(), item.GetMustRestoreFirst()); + } + break; + } + + case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET: + Y_VERIFY_DEBUG(false); + break; + } + } + + Become(&TThis::StateFunc); + CheckIfDone(); + } + + void ScanRangeAndIssueGets(TKey from, TKey to, TScanFlags flags) { + Self->Data->ScanRange(&from, &to, flags, [&](const TKey& key, const TValue& value) { + if (value.GoingToAssimilate) { + IssueGet(key.GetBlobId(), true /*mustRestoreFirst*/); + } + return true; + }); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // RANGE QUERIES are for metadata only -- they scan not yet assimilated parts of the original group and do not + // recover any data; thus they are IsIndexOnly and not MustRestoreFirst range queries + + void IssueRange(ui64 tabletId, TLogoBlobID from, TLogoBlobID to, bool mustRestoreFirst) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "going to TEvRange", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (TabletId, tabletId), (From, from), (To, to), (MustRestoreFirst, mustRestoreFirst)); + auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, from, to, false, TInstant::Max(), true); + ev->Decommission = true; + SendToBSProxy(SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), mustRestoreFirst); + ++RangesInFlight; + } + + void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvRangeResult", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (Msg, msg)); + + if (msg.Status == NKikimrProto::OK) { + for (const auto& r : msg.Responses) { + if (ev->Cookie) { + if (const TValue *value = Self->Data->FindKey(TKey(r.Id)); !value || value->GoingToAssimilate) { + IssueGet(r.Id, true /*mustRestoreFirst*/); + } + } else { + Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); + } + } + } else { + return FinishWithError(NLog::PRI_NOTICE, TStringBuilder() << "TEvRange query failed: " << msg.ErrorReason); + } + + --RangesInFlight; + CheckIfDone(); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // GET QUERIES may contain request either just for metadata, or for the data too; in case we receive data, we + // have to put it to BlobDepot storage + + void IssueGet(TLogoBlobID id, bool mustRestoreFirst) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT86, "going to TEvGet", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (BlobId, id), (MustRestoreFirst, mustRestoreFirst)); + GetQ.emplace_back(id, mustRestoreFirst); + ProcessGetQueue(); + } + + static ui32 GetBytesFor(const std::tuple<TLogoBlobID, bool>& q) { + const auto& [id, mustRestoreFirst] = q; + return mustRestoreFirst ? id.BlobSize() : 0; + } + + void ProcessGetQueue() { + while (!GetQ.empty() && GetsInFlight < MaxGetsInFlight && GetBytesInFlight + GetBytesFor(GetQ.front()) <= MaxGetBytesInFlight) { + const auto [id, mustRestoreFirst] = GetQ.front(); + ++GetsInFlight; + const ui32 bytes = GetBytesFor(GetQ.front()); + GetBytesInFlight += bytes; + GetQ.pop_front(); + auto ev = std::make_unique<TEvBlobStorage::TEvGet>(id, 0, 0, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead, false /*mustRestoreFirst*/, + !mustRestoreFirst /*isIndexOnly*/); + ev->Decommission = true; + SendToBSProxy(SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), bytes); + } + } + + void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { + auto& msg = *ev->Get(); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT87, "TEvGetResult", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (Msg, msg)); + + for (ui32 i = 0; i < msg.ResponseSz; ++i) { + auto& r = msg.Responses[i]; + if (r.Status == NKikimrProto::OK) { + if (r.Buffer) { // wasn't index read + IssuePut(TKey(r.Id), std::move(r.Buffer), r.Keep, r.DoNotKeep); + } else { + Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); + } + } else if (r.Status == NKikimrProto::NODATA) { + Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(r.Id), + TEvPrivate::EvTxComplete, SelfId(), 0); + ++PutsInFlight; + } else { + // mark this specific key as unresolvable + Context.ResolutionErrors.emplace_back(r.Id); + } + } + + --GetsInFlight; + GetBytesInFlight -= ev->Cookie; + + ProcessGetQueue(); + CheckIfDone(); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // PUT QUERIES are used to store retrieved MustRestoreFirst blobs in local storage + + void IssuePut(TKey key, TString&& buffer, bool keep, bool doNotKeep) { + std::vector<ui8> channels(1); + Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels); + TChannelInfo& channel = Self->Channels[channels.front()]; + const ui64 value = channel.NextBlobSeqId++; + const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value); + const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size()); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (Key, key), (BlobId, id)); + SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()), + (ui64)keep | (ui64)doNotKeep << 1); + const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing + Y_VERIFY(inserted); + const bool inserted1 = IdToKey.try_emplace(id, std::move(key)).second; + Y_VERIFY(inserted1); + ++PutsInFlight; + } + + void Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { + auto& msg = *ev->Get(); + + const auto it = IdToKey.find(msg.Id); + Y_VERIFY(it != IdToKey.end()); + TKey key = std::move(it->second); + IdToKey.erase(it); + + const bool keep = ev->Cookie & 1; + const bool doNotKeep = ev->Cookie >> 1 & 1; + + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "got TEvPutResult", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (Msg, msg), (Key, key), (Keep, keep), (DoNotKeep, doNotKeep)); + + Self->Data->ExecuteTxCommitAssimilatedBlob(msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), std::move(key), + TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep); + + if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item + Context.ResolutionErrors.emplace_back(msg.Id); + } + } + + void HandleTxComplete() { + --PutsInFlight; + CheckIfDone(); + } + + void CheckIfDone() { + if (RangesInFlight + GetsInFlight + GetQ.size() + PutsInFlight == 0) { + FinishWithSuccess(); + } + } + + void FinishWithSuccess() { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT92, "request succeeded", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (ResolutionErrors.size, Context.ResolutionErrors.size()), + (DecommitBlobs.size, Context.DecommitBlobs.size())); + std::sort(Context.ResolutionErrors.begin(), Context.ResolutionErrors.end()); + Self->Data->ExecuteTxResolve(Ev, std::move(Context)); + PassAway(); + } + + void FinishWithError(NLog::EPriority prio, TString errorReason) { + STLOG(prio, BLOB_DEPOT, BDT89, "request failed", (Id, Self->GetLogId()), (Sender, Ev->Sender), + (Cookie, Ev->Cookie), (ErrorReason, errorReason)); + auto [response, record] = TEvBlobDepot::MakeResponseFor(*Ev, NKikimrProto::ERROR, std::move(errorReason)); + TActivationContext::Send(response.release()); + PassAway(); + } + + STATEFN(StateFunc) { + if (Token.expired()) { + return PassAway(); + } + + switch (const ui32 type = ev->GetTypeRewrite()) { + hFunc(TEvBlobDepot::TEvResolve, Handle); + hFunc(TEvBlobStorage::TEvGetResult, Handle); + hFunc(TEvBlobStorage::TEvRangeResult, Handle); + hFunc(TEvBlobStorage::TEvPutResult, Handle); + cFunc(TEvPrivate::EvTxComplete, HandleTxComplete); + + default: + Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); + STLOG(PRI_CRIT, BLOB_DEPOT, BDT90, "unexpected event", (Id, Self->GetLogId()), (Type, type)); + break; + } + } + }; + + IActor *TBlobDepot::TData::CreateResolveDecommitActor(TEvBlobDepot::TEvResolve::TPtr ev) { + return new TResolveDecommitActor(Self, ev); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 6dcfdd1512a..6570d5b71f3 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -168,6 +168,10 @@ namespace NKikimr::NBlobDepot { KeysLoaded = true; } + if (ResolveDecommitContext.ReturnAfterLoadingKeys) { + return true; + } + ui32 numItemsRemain = 10'000; while (!ResolveDecommitContext.DecommitBlobs.empty()) { @@ -183,20 +187,6 @@ namespace NKikimr::NBlobDepot { ResolveDecommitContext.DecommitBlobs.pop_front(); } - while (!ResolveDecommitContext.DropNodataBlobs.empty()) { - if (!numItemsRemain) { - SuccessorTx = true; - return true; - } - - const TKey& key = ResolveDecommitContext.DropNodataBlobs.front(); - const TValue *value = Self->Data->FindKey(key); - if (value && value->GoingToAssimilate) { - Self->Data->DeleteKey(key, txc, this); - } - ResolveDecommitContext.DropNodataBlobs.pop_front(); - } - const auto& record = Request->Get()->Record; for (const auto& item : record.GetItems()) { std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt; @@ -245,6 +235,8 @@ namespace NKikimr::NBlobDepot { if (SuccessorTx) { Self->Execute(std::make_unique<TTxResolve>(*this)); + } else if (const TActorId recipient = ResolveDecommitContext.ReturnAfterLoadingKeys) { + TActivationContext::Send(Request->Forward(recipient)); } else if (Uncertainties.empty()) { Result.Send(NKikimrProto::OK, std::nullopt); } else { @@ -429,168 +421,14 @@ namespace NKikimr::NBlobDepot { (Sender, ev->Sender), (Cookie, ev->Cookie), (LastAssimilatedBlobId, LastAssimilatedBlobId)); if (Self->Config.GetIsDecommittingGroup() && Self->DecommitState <= EDecommitState::BlobsFinished) { - std::vector<std::unique_ptr<IEventBase>> outbox; - std::unordered_map<std::tuple<ui64, bool>, std::vector<TLogoBlobID>> getBlobIds; - const ui64 id = LastResolveQueryId + 1; - - for (const auto& item : ev->Get()->Record.GetItems()) { - switch (item.GetKeyDesignatorCase()) { - case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: { - if (!item.HasTabletId()) { - STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item)); - auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, NKikimrProto::ERROR, - "incorrect request"); - TActivationContext::Send(response.release()); - return; - } - - const ui64 tabletId = item.GetTabletId(); - if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) { - continue; // fast path - } - - const auto& range = item.GetKeyRange(); - - TLogoBlobID minId = range.HasBeginningKey() - ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId() - : TLogoBlobID(tabletId, 0, 0, 0, 0, 0); - - TLogoBlobID maxId = range.HasEndingKey() - ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId() - : TLogoBlobID(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, - TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, - TLogoBlobID::MaxCrcMode); - - Y_VERIFY(minId <= maxId); - - if (LastAssimilatedBlobId && maxId <= *LastAssimilatedBlobId) { - continue; // we have this range fully assimilated, just skip - } - - // adjust minId to skip already assimilated items - minId = Max(minId, LastAssimilatedBlobId.value_or(minId)); - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId), - (MinId, minId), (MaxId, maxId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id)); - - auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId, - item.GetMustRestoreFirst(), TInstant::Max(), true); - ev->Decommission = true; - outbox.push_back(std::move(ev)); - - break; - } - - case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: { - const TLogoBlobID blobId = TKey::FromBinaryKey(item.GetExactKey(), Self->Config).GetBlobId(); - const auto key = std::make_tuple(blobId.TabletID(), item.GetMustRestoreFirst()); - getBlobIds[key].push_back(blobId); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT86, "going to TEvGet", (Id, Self->GetLogId()), - (BlobId, blobId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id)); - break; - } - - case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET: - Y_VERIFY_DEBUG(false); - break; - } - - } - - // convert all gets to events in outbox - for (auto& [key, blobIds] : getBlobIds) { - const auto& [tabletId, mustRestoreFirst] = key; - const ui32 sz = blobIds.size(); - TArrayHolder<TEvBlobStorage::TEvGet::TQuery> q(new TEvBlobStorage::TEvGet::TQuery[sz]); - for (ui32 i = 0; i < sz; ++i) { - q[i].Set(blobIds[i]); - } - auto ev = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), - NKikimrBlobStorage::EGetHandleClass::FastRead, mustRestoreFirst, true); - ev->Decommission = true; - outbox.emplace_back(std::move(ev)); - } - - if (!outbox.empty()) { - ResolveDecommitContexts[id] = {ev, (ui32)outbox.size()}; - ++LastResolveQueryId; - Y_VERIFY(LastResolveQueryId == id); - - for (auto& ev : outbox) { - SendToBSProxy(Self->SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), id); - } - - return; - } + Self->RegisterWithSameMailbox(CreateResolveDecommitActor(ev)); + } else { + Self->Execute(std::make_unique<TTxResolve>(Self, ev)); } - - Self->Execute(std::make_unique<TTxResolve>(Self, ev)); } - template<typename TCallback> - void TData::HandleResolutionResult(ui64 id, TCallback&& callback) { - auto& contexts = Self->Data->ResolveDecommitContexts; - if (const auto it = contexts.find(id); it != contexts.end()) { - auto& context = it->second; - if (!callback(context)) { - STLOG(PRI_INFO, BLOB_DEPOT, BDT87, "failing TEvResolve", (Id, Self->GetLogId()), (Sender, context.Ev->Sender), - (Cookie, context.Ev->Cookie)); - auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, NKikimrProto::ERROR, - "unassimilated key resolution error"); - TActivationContext::Send(response.release()); - } else if (!--context.NumRangesInFlight) { - auto ev = context.Ev; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "actually starting TEvResolve", (Id, Self->GetLogId()), (Sender, ev->Sender), - (Cookie, ev->Cookie)); - std::sort(context.ResolutionErrors.begin(), context.ResolutionErrors.end()); - Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context))); - } else { - return; // keep context running - } - contexts.erase(it); - } - } - - void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) { - auto& msg = *ev->Get(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie)); - - HandleResolutionResult(ev->Cookie, [&](auto& context) { - if (msg.Status == NKikimrProto::OK) { - for (const auto& r : msg.Responses) { - context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); - } - return true; - } else { - // we can't be sure that there are no still unassimilated keys in this range, so we report ERROR to - // the whole range query; TODO(alexvru): provide a way to mark only specific request item as faulty one - return false; - } - }); - } - - void TData::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { - if (!ev->Cookie) { - return UncertaintyResolver->Handle(ev); - } - - auto& msg = *ev->Get(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvGetResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie)); - - HandleResolutionResult(ev->Cookie, [&](auto& context) { - for (ui32 i = 0; i < msg.ResponseSz; ++i) { - const auto& r = msg.Responses[i]; - if (r.Status == NKikimrProto::OK) { - context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); - } else if (r.Status == NKikimrProto::NODATA) { - context.DropNodataBlobs.push_back(TKey(r.Id)); - } else { - // mark this specific key as unresolvable - context.ResolutionErrors.push_back(TKey(r.Id)); - } - } - return true; - }); + void TData::ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context) { + Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context))); } } // NKikimr::NBlobDepot |