aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-01-28 19:07:24 +0300
committeralexvru <alexvru@ydb.tech>2023-01-28 19:07:24 +0300
commitb4c07516d0e7248f1050c3cf3ad83057ab6d0593 (patch)
tree245097424f23e40178c28bf373f2f47c23091d66
parente9ca29e3dfc6f7601dd4037785c75d8d3b6620f3 (diff)
downloadydb-b4c07516d0e7248f1050c3cf3ad83057ab6d0593.tar.gz
Fix MustRestoreFirst logic in BlobDepot for decommitted groups
-rw-r--r--ydb/core/blob_depot/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/blob_depot/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/blob_depot/CMakeLists.linux.txt1
-rw-r--r--ydb/core/blob_depot/assimilator.cpp40
-rw-r--r--ydb/core/blob_depot/assimilator.h2
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp7
-rw-r--r--ydb/core/blob_depot/data.cpp22
-rw-r--r--ydb/core/blob_depot/data.h26
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp322
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp184
10 files changed, 398 insertions, 208 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.darwin.txt b/ydb/core/blob_depot/CMakeLists.darwin.txt
index b7013389cf6..e1ac07e603d 100644
--- a/ydb/core/blob_depot/CMakeLists.darwin.txt
+++ b/ydb/core/blob_depot/CMakeLists.darwin.txt
@@ -23,6 +23,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp
diff --git a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt
index 04727d74a40..ae8c5e7a0d6 100644
--- a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt
@@ -24,6 +24,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp
diff --git a/ydb/core/blob_depot/CMakeLists.linux.txt b/ydb/core/blob_depot/CMakeLists.linux.txt
index 04727d74a40..ae8c5e7a0d6 100644
--- a/ydb/core/blob_depot/CMakeLists.linux.txt
+++ b/ydb/core/blob_depot/CMakeLists.linux.txt
@@ -24,6 +24,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_mon.cpp
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index 94f13cc13d2..2a31d58e964 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -16,30 +16,36 @@ namespace NKikimr::NBlobDepot {
};
};
- class TAssimilator::TTxCommitAssimilatedBlob : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- const TActorId AssimilatorId;
+ class TBlobDepot::TData::TTxCommitAssimilatedBlob : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
const NKikimrProto::EReplyStatus Status;
const TBlobSeqId BlobSeqId;
const TData::TKey Key;
- const ui64 GetId;
+ const ui32 NotifyEventType;
+ const TActorId ParentId;
+ const ui64 Cookie;
+ const bool Keep;
+ const bool DoNotKeep;
public:
TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_COMMIT_ASSIMILATED_BLOB; }
- TTxCommitAssimilatedBlob(TAssimilator *self, NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key,
- ui64 getId)
- : TTransactionBase(self->Self)
- , AssimilatorId(self->SelfId())
+ TTxCommitAssimilatedBlob(TBlobDepot *self, NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId,
+ TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep)
+ : TTransactionBase(self)
, Status(status)
, BlobSeqId(blobSeqId)
, Key(std::move(key))
- , GetId(getId)
+ , NotifyEventType(notifyEventType)
+ , ParentId(parentId)
+ , Cookie(cookie)
+ , Keep(keep)
+ , DoNotKeep(doNotKeep)
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
if (Status == NKikimrProto::OK) {
Y_VERIFY(!Self->Data->CanBeCollected(BlobSeqId));
- Self->Data->BindToBlob(Key, BlobSeqId, txc, this);
+ Self->Data->BindToBlob(Key, BlobSeqId, Keep, DoNotKeep, txc, this);
} else if (Status == NKikimrProto::NODATA) {
if (const TData::TValue *value = Self->Data->FindKey(Key); value && value->GoingToAssimilate) {
Self->Data->DeleteKey(Key, txc, this);
@@ -60,7 +66,7 @@ namespace NKikimr::NBlobDepot {
}
}
Self->Data->CommitTrash(this);
- TActivationContext::Send(new IEventHandle(TEvPrivate::EvTxComplete, 0, AssimilatorId, {}, nullptr, GetId));
+ TActivationContext::Send(new IEventHandle(NotifyEventType, 0, ParentId, {}, nullptr, Cookie));
}
};
@@ -380,8 +386,8 @@ namespace NKikimr::NBlobDepot {
++it->second;
getBytes += id.BlobSize();
} else if (resp.Status == NKikimrProto::NODATA) {
- Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, NKikimrProto::NODATA, TBlobSeqId(),
- TData::TKey(resp.Id), it->first));
+ Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(resp.Id),
+ TEvPrivate::EvTxComplete, SelfId(), it->first);
++it->second;
}
}
@@ -417,8 +423,8 @@ namespace NKikimr::NBlobDepot {
const auto& [key, getId] = it->second;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "got TEvPutResult", (Id, Self->GetLogId()), (Msg, msg),
(NumGetsUnprocessed, GetIdToUnprocessedPuts.size()), (Key, key));
- Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(this, msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id),
- std::move(key), getId));
+ Self->Data->ExecuteTxCommitAssimilatedBlob(msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), std::move(key),
+ TEvPrivate::EvTxComplete, SelfId(), getId);
PutIdToKey.erase(it);
}
@@ -533,6 +539,12 @@ namespace NKikimr::NBlobDepot {
return stream.Str();
}
+ void TBlobDepot::TData::ExecuteTxCommitAssimilatedBlob(NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId,
+ TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep) {
+ Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(Self, status, blobSeqId, std::move(key),
+ notifyEventType, parentId, cookie, keep, doNotKeep));
+ }
+
void TBlobDepot::StartGroupAssimilator() {
if (Config.GetIsDecommittingGroup() && DecommitState != EDecommitState::Done) {
Y_VERIFY(!GroupAssimilatorId);
diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h
index c4717325b59..bb54cd14a0d 100644
--- a/ydb/core/blob_depot/assimilator.h
+++ b/ydb/core/blob_depot/assimilator.h
@@ -42,8 +42,6 @@ namespace NKikimr::NBlobDepot {
bool ActionInProgress = false;
bool ResumeScanDataForCopyingInFlight = false;
- class TTxCommitAssimilatedBlob;
-
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BLOB_DEPOT_ASSIMILATOR_ACTOR;
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 86661f95904..9a464fc5c13 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -48,7 +48,9 @@ namespace NKikimr::NBlobDepot {
try {
auto handleFromAgentPipe = [this](auto& ev) {
const auto it = PipeServers.find(ev->Recipient);
- Y_VERIFY(it != PipeServers.end());
+ if (it == PipeServers.end()) {
+ return; // this may be a race with TEvServerDisconnected and postpone queue; it's okay to have this
+ }
auto& info = it->second;
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT69, "HandleFromAgentPipe", (Id, GetLogId()), (RequestId, ev->Cookie),
@@ -87,8 +89,7 @@ namespace NKikimr::NBlobDepot {
cFunc(TEvPrivate::EvProcessRegisterAgentQ, ProcessRegisterAgentQ);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
- hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle);
- hFunc(TEvBlobStorage::TEvGetResult, Data->Handle);
+ hFunc(TEvBlobStorage::TEvGetResult, Data->UncertaintyResolver->Handle);
hFunc(TEvBlobStorage::TEvStatusResult, SpaceMonitor->Handle);
cFunc(TEvPrivate::EvKickSpaceMonitor, KickSpaceMonitor);
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 5c34c16cdfd..a19c348c383 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -195,6 +195,7 @@ namespace NKikimr::NBlobDepot {
}
const TData::TValue *TData::FindKey(const TKey& key) const {
+ Y_VERIFY(IsKeyLoaded(key));
const auto it = Data.find(key);
return it != Data.end() ? &it->second : nullptr;
}
@@ -223,11 +224,20 @@ namespace NKikimr::NBlobDepot {
}, item);
}
- void TData::BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "BindToBlob", (Id, Self->GetLogId()), (Key, key), (BlobSeqId, blobSeqId));
+ void TData::BindToBlob(const TKey& key, TBlobSeqId blobSeqId, bool keep, bool doNotKeep, NTabletFlatExecutor::TTransactionContext& txc, void *cookie) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "BindToBlob", (Id, Self->GetLogId()), (Key, key), (BlobSeqId, blobSeqId),
+ (Keep, keep), (DoNotKeep, doNotKeep));
Y_VERIFY(IsKeyLoaded(key));
- UpdateKey(key, txc, cookie, "BindToBlob", [&](TValue& value, bool inserted) {
- if (inserted || value.GoingToAssimilate) {
+ UpdateKey(key, txc, cookie, "BindToBlob", [&](TValue& value, bool /*inserted*/) {
+ EUpdateOutcome outcome = EUpdateOutcome::NO_CHANGE;
+ if (doNotKeep && value.KeepState < EKeepState::DoNotKeep) {
+ value.KeepState = EKeepState::DoNotKeep;
+ outcome = EUpdateOutcome::CHANGE;
+ } else if (keep && value.KeepState < EKeepState::Keep) {
+ value.KeepState = EKeepState::Keep;
+ outcome = EUpdateOutcome::CHANGE;
+ }
+ if (value.ValueChain.empty()) {
auto *chain = value.ValueChain.Add();
auto *locator = chain->MutableLocator();
locator->SetGroupId(Self->Info()->GroupFor(blobSeqId.Channel, blobSeqId.Generation));
@@ -235,9 +245,9 @@ namespace NKikimr::NBlobDepot {
locator->SetTotalDataLen(key.GetBlobId().BlobSize());
locator->SetFooterLen(0);
value.GoingToAssimilate = false;
- return inserted ? EUpdateOutcome::DROP : EUpdateOutcome::CHANGE;
+ outcome = EUpdateOutcome::CHANGE;
}
- return EUpdateOutcome::NO_CHANGE;
+ return outcome;
});
}
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 254293f2446..af9b1e3d143 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -398,13 +398,10 @@ namespace NKikimr::NBlobDepot {
struct TResolveDecommitContext {
TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request
- ui32 NumRangesInFlight;
+ TActorId ReturnAfterLoadingKeys;
std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {};
std::vector<TKey> ResolutionErrors = {};
- std::deque<TKey> DropNodataBlobs = {};
};
- ui64 LastResolveQueryId = 0;
- THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts;
class TTxIssueGC;
class TTxConfirmGC;
@@ -412,7 +409,6 @@ namespace NKikimr::NBlobDepot {
class TTxDataLoad;
class TTxLoadSpecificKeys;
- class TTxResolve;
class TResolveResultAccumulator;
class TUncertaintyResolver;
@@ -483,7 +479,8 @@ namespace NKikimr::NBlobDepot {
void UpdateKey(const TKey& key, const NKikimrBlobDepot::TEvCommitBlobSeq::TItem& item,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
- void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
+ void BindToBlob(const TKey& key, TBlobSeqId blobSeqId, bool keep, bool doNotKeep,
+ NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void MakeKeyCertain(const TKey& key);
void HandleCommitCertainKeys();
@@ -536,12 +533,21 @@ namespace NKikimr::NBlobDepot {
bool IsLoaded() const { return Loaded; }
bool IsKeyLoaded(const TKey& key) const { return Loaded || LoadedKeys[key]; }
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ class TResolveDecommitActor;
+ IActor *CreateResolveDecommitActor(TEvBlobDepot::TEvResolve::TPtr ev);
+
+ class TTxCommitAssimilatedBlob;
+ void ExecuteTxCommitAssimilatedBlob(NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key,
+ ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep = false, bool doNotKeep = false);
+
+ class TTxResolve;
+ void ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context);
+
void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
- void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev);
- void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
- template<typename TCallback>
- void HandleResolutionResult(ui64 id, TCallback&& callback);
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ui64 GetTotalStoredDataSize() const {
return TotalStoredDataSize;
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
new file mode 100644
index 00000000000..f7df4860fda
--- /dev/null
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -0,0 +1,322 @@
+#include "data.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TData::TResolveDecommitActor : public TActorBootstrapped<TResolveDecommitActor> {
+ struct TEvPrivate {
+ enum {
+ EvTxComplete = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+ };
+
+ TBlobDepot* const Self;
+ std::weak_ptr<TToken> Token;
+ TResolveDecommitContext Context;
+ TEvBlobDepot::TEvResolve::TPtr Ev;
+
+ ui32 RangesInFlight = 0;
+
+ std::deque<std::tuple<TLogoBlobID, bool>> GetQ;
+ ui32 GetsInFlight = 0;
+ ui32 GetBytesInFlight = 0;
+ static constexpr ui32 MaxGetsInFlight = 10;
+ static constexpr ui32 MaxGetBytesInFlight = 10'000'000;
+
+ ui32 PutsInFlight = 0;
+
+ THashMap<TLogoBlobID, TKey> IdToKey;
+
+ public:
+ TResolveDecommitActor(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr ev)
+ : Self(self)
+ , Token(self->Token)
+ , Ev(ev)
+ {}
+
+ void Bootstrap() {
+ if (Token.expired()) {
+ return;
+ }
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()),
+ (Sender, Ev->Sender), (Cookie, Ev->Cookie));
+
+ if (!Self->Data->Loaded) {
+ // issue request to a resolver to just load keys we are looking for
+ TResolveDecommitContext context;
+ context.ReturnAfterLoadingKeys = SelfId();
+ Self->Data->ExecuteTxResolve(Ev, std::move(context));
+ } else {
+ Handle(Ev);
+ }
+ }
+
+ void Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
+ Ev = ev; // store event back
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "keys loaded", (Id, Self->GetLogId()),
+ (Sender, Ev->Sender), (Cookie, Ev->Cookie));
+
+ for (const auto& item : Ev->Get()->Record.GetItems()) {
+ switch (item.GetKeyDesignatorCase()) {
+ case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
+ if (!item.HasTabletId()) {
+ return FinishWithError(NLog::PRI_CRIT, "incorrect request");
+ }
+
+ const ui64 tabletId = item.GetTabletId();
+ const auto& range = item.GetKeyRange();
+
+ TLogoBlobID minId = range.HasBeginningKey()
+ ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId()
+ : TLogoBlobID(tabletId, 0, 0, 0, 0, 0);
+
+ TLogoBlobID maxId = range.HasEndingKey()
+ ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId()
+ : TLogoBlobID(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel,
+ TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
+ TLogoBlobID::MaxCrcMode);
+
+ Y_VERIFY(minId <= maxId);
+
+ if (Self->Data->LastAssimilatedBlobId < maxId) {
+ // adjust minId to skip already assimilated items in range query
+ if (minId < Self->Data->LastAssimilatedBlobId) {
+ if (item.GetMustRestoreFirst()) {
+ ScanRangeAndIssueGets(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN);
+ }
+ minId = *Self->Data->LastAssimilatedBlobId;
+ }
+
+ // issue scan query
+ IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst());
+ } else if (item.GetMustRestoreFirst()) {
+ ScanRangeAndIssueGets(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END);
+ }
+
+ break;
+ }
+
+ case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: {
+ TData::TKey key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config);
+ const TValue *value = Self->Data->FindKey(key);
+ const bool doGet = (!value && key.GetBlobId() < Self->Data->LastAssimilatedBlobId) // value not yet assimilated
+ || (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet
+ if (doGet) {
+ IssueGet(key.GetBlobId(), item.GetMustRestoreFirst());
+ }
+ break;
+ }
+
+ case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET:
+ Y_VERIFY_DEBUG(false);
+ break;
+ }
+ }
+
+ Become(&TThis::StateFunc);
+ CheckIfDone();
+ }
+
+ void ScanRangeAndIssueGets(TKey from, TKey to, TScanFlags flags) {
+ Self->Data->ScanRange(&from, &to, flags, [&](const TKey& key, const TValue& value) {
+ if (value.GoingToAssimilate) {
+ IssueGet(key.GetBlobId(), true /*mustRestoreFirst*/);
+ }
+ return true;
+ });
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // RANGE QUERIES are for metadata only -- they scan not yet assimilated parts of the original group and do not
+ // recover any data; thus they are IsIndexOnly and not MustRestoreFirst range queries
+
+ void IssueRange(ui64 tabletId, TLogoBlobID from, TLogoBlobID to, bool mustRestoreFirst) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "going to TEvRange", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (TabletId, tabletId), (From, from), (To, to), (MustRestoreFirst, mustRestoreFirst));
+ auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, from, to, false, TInstant::Max(), true);
+ ev->Decommission = true;
+ SendToBSProxy(SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), mustRestoreFirst);
+ ++RangesInFlight;
+ }
+
+ void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvRangeResult", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (Msg, msg));
+
+ if (msg.Status == NKikimrProto::OK) {
+ for (const auto& r : msg.Responses) {
+ if (ev->Cookie) {
+ if (const TValue *value = Self->Data->FindKey(TKey(r.Id)); !value || value->GoingToAssimilate) {
+ IssueGet(r.Id, true /*mustRestoreFirst*/);
+ }
+ } else {
+ Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
+ }
+ }
+ } else {
+ return FinishWithError(NLog::PRI_NOTICE, TStringBuilder() << "TEvRange query failed: " << msg.ErrorReason);
+ }
+
+ --RangesInFlight;
+ CheckIfDone();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // GET QUERIES may contain request either just for metadata, or for the data too; in case we receive data, we
+ // have to put it to BlobDepot storage
+
+ void IssueGet(TLogoBlobID id, bool mustRestoreFirst) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT86, "going to TEvGet", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (BlobId, id), (MustRestoreFirst, mustRestoreFirst));
+ GetQ.emplace_back(id, mustRestoreFirst);
+ ProcessGetQueue();
+ }
+
+ static ui32 GetBytesFor(const std::tuple<TLogoBlobID, bool>& q) {
+ const auto& [id, mustRestoreFirst] = q;
+ return mustRestoreFirst ? id.BlobSize() : 0;
+ }
+
+ void ProcessGetQueue() {
+ while (!GetQ.empty() && GetsInFlight < MaxGetsInFlight && GetBytesInFlight + GetBytesFor(GetQ.front()) <= MaxGetBytesInFlight) {
+ const auto [id, mustRestoreFirst] = GetQ.front();
+ ++GetsInFlight;
+ const ui32 bytes = GetBytesFor(GetQ.front());
+ GetBytesInFlight += bytes;
+ GetQ.pop_front();
+ auto ev = std::make_unique<TEvBlobStorage::TEvGet>(id, 0, 0, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead, false /*mustRestoreFirst*/,
+ !mustRestoreFirst /*isIndexOnly*/);
+ ev->Decommission = true;
+ SendToBSProxy(SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), bytes);
+ }
+ }
+
+ void Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
+ auto& msg = *ev->Get();
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT87, "TEvGetResult", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (Msg, msg));
+
+ for (ui32 i = 0; i < msg.ResponseSz; ++i) {
+ auto& r = msg.Responses[i];
+ if (r.Status == NKikimrProto::OK) {
+ if (r.Buffer) { // wasn't index read
+ IssuePut(TKey(r.Id), std::move(r.Buffer), r.Keep, r.DoNotKeep);
+ } else {
+ Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
+ }
+ } else if (r.Status == NKikimrProto::NODATA) {
+ Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(r.Id),
+ TEvPrivate::EvTxComplete, SelfId(), 0);
+ ++PutsInFlight;
+ } else {
+ // mark this specific key as unresolvable
+ Context.ResolutionErrors.emplace_back(r.Id);
+ }
+ }
+
+ --GetsInFlight;
+ GetBytesInFlight -= ev->Cookie;
+
+ ProcessGetQueue();
+ CheckIfDone();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // PUT QUERIES are used to store retrieved MustRestoreFirst blobs in local storage
+
+ void IssuePut(TKey key, TString&& buffer, bool keep, bool doNotKeep) {
+ std::vector<ui8> channels(1);
+ Self->PickChannels(NKikimrBlobDepot::TChannelKind::Data, channels);
+ TChannelInfo& channel = Self->Channels[channels.front()];
+ const ui64 value = channel.NextBlobSeqId++;
+ const auto blobSeqId = TBlobSeqId::FromSequentalNumber(channel.Index, Self->Executor()->Generation(), value);
+ const TLogoBlobID id = blobSeqId.MakeBlobId(Self->TabletID(), EBlobType::VG_DATA_BLOB, 0, buffer.size());
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT91, "going to TEvPut", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (Key, key), (BlobId, id));
+ SendToBSProxy(SelfId(), channel.GroupId, new TEvBlobStorage::TEvPut(id, std::move(buffer), TInstant::Max()),
+ (ui64)keep | (ui64)doNotKeep << 1);
+ const bool inserted = channel.AssimilatedBlobsInFlight.insert(value).second; // prevent from barrier advancing
+ Y_VERIFY(inserted);
+ const bool inserted1 = IdToKey.try_emplace(id, std::move(key)).second;
+ Y_VERIFY(inserted1);
+ ++PutsInFlight;
+ }
+
+ void Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
+ auto& msg = *ev->Get();
+
+ const auto it = IdToKey.find(msg.Id);
+ Y_VERIFY(it != IdToKey.end());
+ TKey key = std::move(it->second);
+ IdToKey.erase(it);
+
+ const bool keep = ev->Cookie & 1;
+ const bool doNotKeep = ev->Cookie >> 1 & 1;
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "got TEvPutResult", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (Msg, msg), (Key, key), (Keep, keep), (DoNotKeep, doNotKeep));
+
+ Self->Data->ExecuteTxCommitAssimilatedBlob(msg.Status, TBlobSeqId::FromLogoBlobId(msg.Id), std::move(key),
+ TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep);
+
+ if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item
+ Context.ResolutionErrors.emplace_back(msg.Id);
+ }
+ }
+
+ void HandleTxComplete() {
+ --PutsInFlight;
+ CheckIfDone();
+ }
+
+ void CheckIfDone() {
+ if (RangesInFlight + GetsInFlight + GetQ.size() + PutsInFlight == 0) {
+ FinishWithSuccess();
+ }
+ }
+
+ void FinishWithSuccess() {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT92, "request succeeded", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (ResolutionErrors.size, Context.ResolutionErrors.size()),
+ (DecommitBlobs.size, Context.DecommitBlobs.size()));
+ std::sort(Context.ResolutionErrors.begin(), Context.ResolutionErrors.end());
+ Self->Data->ExecuteTxResolve(Ev, std::move(Context));
+ PassAway();
+ }
+
+ void FinishWithError(NLog::EPriority prio, TString errorReason) {
+ STLOG(prio, BLOB_DEPOT, BDT89, "request failed", (Id, Self->GetLogId()), (Sender, Ev->Sender),
+ (Cookie, Ev->Cookie), (ErrorReason, errorReason));
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*Ev, NKikimrProto::ERROR, std::move(errorReason));
+ TActivationContext::Send(response.release());
+ PassAway();
+ }
+
+ STATEFN(StateFunc) {
+ if (Token.expired()) {
+ return PassAway();
+ }
+
+ switch (const ui32 type = ev->GetTypeRewrite()) {
+ hFunc(TEvBlobDepot::TEvResolve, Handle);
+ hFunc(TEvBlobStorage::TEvGetResult, Handle);
+ hFunc(TEvBlobStorage::TEvRangeResult, Handle);
+ hFunc(TEvBlobStorage::TEvPutResult, Handle);
+ cFunc(TEvPrivate::EvTxComplete, HandleTxComplete);
+
+ default:
+ Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDT90, "unexpected event", (Id, Self->GetLogId()), (Type, type));
+ break;
+ }
+ }
+ };
+
+ IActor *TBlobDepot::TData::CreateResolveDecommitActor(TEvBlobDepot::TEvResolve::TPtr ev) {
+ return new TResolveDecommitActor(Self, ev);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index 6dcfdd1512a..6570d5b71f3 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -168,6 +168,10 @@ namespace NKikimr::NBlobDepot {
KeysLoaded = true;
}
+ if (ResolveDecommitContext.ReturnAfterLoadingKeys) {
+ return true;
+ }
+
ui32 numItemsRemain = 10'000;
while (!ResolveDecommitContext.DecommitBlobs.empty()) {
@@ -183,20 +187,6 @@ namespace NKikimr::NBlobDepot {
ResolveDecommitContext.DecommitBlobs.pop_front();
}
- while (!ResolveDecommitContext.DropNodataBlobs.empty()) {
- if (!numItemsRemain) {
- SuccessorTx = true;
- return true;
- }
-
- const TKey& key = ResolveDecommitContext.DropNodataBlobs.front();
- const TValue *value = Self->Data->FindKey(key);
- if (value && value->GoingToAssimilate) {
- Self->Data->DeleteKey(key, txc, this);
- }
- ResolveDecommitContext.DropNodataBlobs.pop_front();
- }
-
const auto& record = Request->Get()->Record;
for (const auto& item : record.GetItems()) {
std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt;
@@ -245,6 +235,8 @@ namespace NKikimr::NBlobDepot {
if (SuccessorTx) {
Self->Execute(std::make_unique<TTxResolve>(*this));
+ } else if (const TActorId recipient = ResolveDecommitContext.ReturnAfterLoadingKeys) {
+ TActivationContext::Send(Request->Forward(recipient));
} else if (Uncertainties.empty()) {
Result.Send(NKikimrProto::OK, std::nullopt);
} else {
@@ -429,168 +421,14 @@ namespace NKikimr::NBlobDepot {
(Sender, ev->Sender), (Cookie, ev->Cookie), (LastAssimilatedBlobId, LastAssimilatedBlobId));
if (Self->Config.GetIsDecommittingGroup() && Self->DecommitState <= EDecommitState::BlobsFinished) {
- std::vector<std::unique_ptr<IEventBase>> outbox;
- std::unordered_map<std::tuple<ui64, bool>, std::vector<TLogoBlobID>> getBlobIds;
- const ui64 id = LastResolveQueryId + 1;
-
- for (const auto& item : ev->Get()->Record.GetItems()) {
- switch (item.GetKeyDesignatorCase()) {
- case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
- if (!item.HasTabletId()) {
- STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item));
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, NKikimrProto::ERROR,
- "incorrect request");
- TActivationContext::Send(response.release());
- return;
- }
-
- const ui64 tabletId = item.GetTabletId();
- if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) {
- continue; // fast path
- }
-
- const auto& range = item.GetKeyRange();
-
- TLogoBlobID minId = range.HasBeginningKey()
- ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config).GetBlobId()
- : TLogoBlobID(tabletId, 0, 0, 0, 0, 0);
-
- TLogoBlobID maxId = range.HasEndingKey()
- ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config).GetBlobId()
- : TLogoBlobID(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel,
- TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
- TLogoBlobID::MaxCrcMode);
-
- Y_VERIFY(minId <= maxId);
-
- if (LastAssimilatedBlobId && maxId <= *LastAssimilatedBlobId) {
- continue; // we have this range fully assimilated, just skip
- }
-
- // adjust minId to skip already assimilated items
- minId = Max(minId, LastAssimilatedBlobId.value_or(minId));
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId),
- (MinId, minId), (MaxId, maxId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id));
-
- auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId,
- item.GetMustRestoreFirst(), TInstant::Max(), true);
- ev->Decommission = true;
- outbox.push_back(std::move(ev));
-
- break;
- }
-
- case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: {
- const TLogoBlobID blobId = TKey::FromBinaryKey(item.GetExactKey(), Self->Config).GetBlobId();
- const auto key = std::make_tuple(blobId.TabletID(), item.GetMustRestoreFirst());
- getBlobIds[key].push_back(blobId);
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT86, "going to TEvGet", (Id, Self->GetLogId()),
- (BlobId, blobId), (MustRestoreFirst, item.GetMustRestoreFirst()), (Cookie, id));
- break;
- }
-
- case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET:
- Y_VERIFY_DEBUG(false);
- break;
- }
-
- }
-
- // convert all gets to events in outbox
- for (auto& [key, blobIds] : getBlobIds) {
- const auto& [tabletId, mustRestoreFirst] = key;
- const ui32 sz = blobIds.size();
- TArrayHolder<TEvBlobStorage::TEvGet::TQuery> q(new TEvBlobStorage::TEvGet::TQuery[sz]);
- for (ui32 i = 0; i < sz; ++i) {
- q[i].Set(blobIds[i]);
- }
- auto ev = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(),
- NKikimrBlobStorage::EGetHandleClass::FastRead, mustRestoreFirst, true);
- ev->Decommission = true;
- outbox.emplace_back(std::move(ev));
- }
-
- if (!outbox.empty()) {
- ResolveDecommitContexts[id] = {ev, (ui32)outbox.size()};
- ++LastResolveQueryId;
- Y_VERIFY(LastResolveQueryId == id);
-
- for (auto& ev : outbox) {
- SendToBSProxy(Self->SelfId(), Self->Config.GetVirtualGroupId(), ev.release(), id);
- }
-
- return;
- }
+ Self->RegisterWithSameMailbox(CreateResolveDecommitActor(ev));
+ } else {
+ Self->Execute(std::make_unique<TTxResolve>(Self, ev));
}
-
- Self->Execute(std::make_unique<TTxResolve>(Self, ev));
}
- template<typename TCallback>
- void TData::HandleResolutionResult(ui64 id, TCallback&& callback) {
- auto& contexts = Self->Data->ResolveDecommitContexts;
- if (const auto it = contexts.find(id); it != contexts.end()) {
- auto& context = it->second;
- if (!callback(context)) {
- STLOG(PRI_INFO, BLOB_DEPOT, BDT87, "failing TEvResolve", (Id, Self->GetLogId()), (Sender, context.Ev->Sender),
- (Cookie, context.Ev->Cookie));
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, NKikimrProto::ERROR,
- "unassimilated key resolution error");
- TActivationContext::Send(response.release());
- } else if (!--context.NumRangesInFlight) {
- auto ev = context.Ev;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT88, "actually starting TEvResolve", (Id, Self->GetLogId()), (Sender, ev->Sender),
- (Cookie, ev->Cookie));
- std::sort(context.ResolutionErrors.begin(), context.ResolutionErrors.end());
- Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context)));
- } else {
- return; // keep context running
- }
- contexts.erase(it);
- }
- }
-
- void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) {
- auto& msg = *ev->Get();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie));
-
- HandleResolutionResult(ev->Cookie, [&](auto& context) {
- if (msg.Status == NKikimrProto::OK) {
- for (const auto& r : msg.Responses) {
- context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
- }
- return true;
- } else {
- // we can't be sure that there are no still unassimilated keys in this range, so we report ERROR to
- // the whole range query; TODO(alexvru): provide a way to mark only specific request item as faulty one
- return false;
- }
- });
- }
-
- void TData::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
- if (!ev->Cookie) {
- return UncertaintyResolver->Handle(ev);
- }
-
- auto& msg = *ev->Get();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT55, "TEvGetResult", (Id, Self->GetLogId()), (Msg, msg), (Cookie, ev->Cookie));
-
- HandleResolutionResult(ev->Cookie, [&](auto& context) {
- for (ui32 i = 0; i < msg.ResponseSz; ++i) {
- const auto& r = msg.Responses[i];
- if (r.Status == NKikimrProto::OK) {
- context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
- } else if (r.Status == NKikimrProto::NODATA) {
- context.DropNodataBlobs.push_back(TKey(r.Id));
- } else {
- // mark this specific key as unresolvable
- context.ResolutionErrors.push_back(TKey(r.Id));
- }
- }
- return true;
- });
+ void TData::ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context) {
+ Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context)));
}
} // NKikimr::NBlobDepot