diff options
author | alexvru <alexvru@ydb.tech> | 2022-08-26 11:05:58 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-08-26 11:05:58 +0300 |
commit | 8c9b9f41b5ff58d0ba78d4e491bdb250b6758a8a (patch) | |
tree | 11d95af55816194779356db40e8658f2d1057fbf | |
parent | 89cae858a59ecae5378edcd229962b6d1be5e5f8 (diff) | |
download | ydb-8c9b9f41b5ff58d0ba78d4e491bdb250b6758a8a.tar.gz |
BlobDepot work in progress -- TEvRange in decommission code
25 files changed, 309 insertions, 54 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index cde55cb5f95..6e4275e9c63 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -1147,6 +1147,8 @@ struct TEvBlobStorage { ui32 RequestedSize; TString Buffer; TVector<TPartMapItem> PartMap; + bool Keep = false; + bool DoNotKeep = false; TResponse() : Status(NKikimrProto::UNKNOWN) @@ -1687,6 +1689,7 @@ struct TEvBlobStorage { bool IsIndexOnly; ui32 ForceBlockedGeneration; ui32 RestartCounter = 0; + bool Decommission = false; TEvRange(ui64 tabletId, const TLogoBlobID &from, const TLogoBlobID &to, const bool mustRestoreFirst, TInstant deadline, bool isIndexOnly = false, ui32 forceBlockedGeneration = 0) @@ -1729,6 +1732,8 @@ struct TEvBlobStorage { struct TResponse { TLogoBlobID Id; TString Buffer; + bool Keep = false; + bool DoNotKeep = false; TResponse() {} diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index 71b1d822d82..e6427a3ce87 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -10,13 +10,13 @@ namespace NKikimr::NBlobDepot { if (inserted) { entry.Key = it->first; } - entry.Values.emplace(item.GetValueChain()); + entry.Values = item.GetValueChain(); Queue.PushBack(&entry); entry.ResolveInFlight = false; for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { - Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values.value()}, Agent.OtherRequestInFlight); + Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values}, Agent.OtherRequestInFlight); } } } @@ -28,8 +28,8 @@ namespace NKikimr::NBlobDepot { if (inserted) { entry.Key = it->first; } - if (entry.Values) { - return &entry.Values.value(); + if (!entry.Values.empty()) { + return &entry.Values; } if (!entry.ResolveInFlight) { entry.ResolveInFlight = true; @@ -39,6 +39,12 @@ namespace NKikimr::NBlobDepot { item->SetBeginningKey(it->first); item->SetEndingKey(it->first); item->SetIncludeEnding(true); + + if (Agent.VirtualGroupId) { + const auto& id = TLogoBlobID::FromBinary(it->first); + item->SetTabletId(id.TabletID()); + } + Agent.Issue(std::move(msg), this, nullptr); } diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h index 42d124b2bd0..3309bc97999 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.h +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h @@ -20,7 +20,7 @@ namespace NKikimr::NBlobDepot { struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> { TStringBuf Key; - std::optional<TResolvedValueChain> Values; + TResolvedValueChain Values; bool ResolveInFlight = false; std::list<TQueryWaitingForKey> QueriesWaitingForKey; }; diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 6849e254f40..a2afee30142 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -40,6 +40,9 @@ namespace NKikimr::NBlobDepot { }; std::vector<TReadItem> items; + const ui64 offsetOnEntry = offset; + const ui64 sizeOnEntry = size; + for (const auto& value : values) { const ui32 groupId = value.GetGroupId(); const auto blobId = LogoBlobIDFromLogoBlobID(value.GetBlobId()); @@ -48,6 +51,8 @@ namespace NKikimr::NBlobDepot { if (end <= begin || blobId.BlobSize() < end) { *error = "incorrect SubrangeBegin/SubrangeEnd pair"; + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, *error, (VirtualGroupId, VirtualGroupId), (TabletId, TabletId), + (Values, FormatList(values))); return false; } @@ -78,6 +83,8 @@ namespace NKikimr::NBlobDepot { if (size) { *error = "incorrect offset/size provided"; + STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, *error, (VirtualGroupId, VirtualGroupId), (TabletId, TabletId), + (Offset, offsetOnEntry), (Size, sizeOnEntry), (Values, FormatList(values))); return false; } diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 51cbaa05c9b..1ba881e64b2 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -55,6 +55,8 @@ namespace NKikimr::NBlobDepot { item->SetIncludeEnding(true); item->SetMaxKeys(1); item->SetReverse(true); + item->SetTabletId(TabletId); + item->SetMustRestoreFirst(true); Agent.Issue(std::move(resolve), this, nullptr); } diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 670960401ea..7e0143414fd 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -22,6 +22,17 @@ namespace NKikimr::NBlobDepot { void Initiate() override { auto& msg = *Event->Get<TEvBlobStorage::TEvRange>(); + + if (msg.Decommission) { + Y_VERIFY(Agent.ProxyId); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "forwarding TEvRange", (VirtualGroupId, Agent.VirtualGroupId), + (TabletId, Agent.TabletId), (Msg, msg), (ProxyId, Agent.ProxyId)); + const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId)); + Y_VERIFY(sent); + delete this; + return; + } + Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, msg.From, msg.To, Agent.VirtualGroupId); @@ -45,6 +56,8 @@ namespace NKikimr::NBlobDepot { item->SetEndingKey(to); item->SetIncludeEnding(true); item->SetReverse(reverse); + item->SetTabletId(msg.TabletId); + item->SetMustRestoreFirst(msg.MustRestoreFirst); Agent.Issue(std::move(resolve), this, nullptr); ++ResolvesInFlight; diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index ba66d03b68e..f7fc00806b1 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -99,9 +99,8 @@ namespace NKikimr::NBlobDepot { bool Execute(TTransactionContext& txc, const TActorContext&) override { NIceDb::TNiceDb db(txc.DB); - const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty(); - const bool blocksFinished = Ev->Blocks.empty() || !Ev->Barriers.empty() || !Ev->Blobs.empty() || done; - const bool barriersFinished = Ev->Barriers.empty() || !Ev->Blobs.empty() || done; + bool blocksFinished = false; + bool barriersFinished = false; if (const auto& blocks = Ev->Blocks; !blocks.empty()) { Self->SkipBlocksUpTo = blocks.back().TabletId; @@ -120,13 +119,16 @@ namespace NKikimr::NBlobDepot { for (const auto& barrier : Ev->Barriers) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier)); Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc); + blocksFinished = true; // there will be no blocks for sure } for (const auto& blob : Ev->Blobs) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob)); Self->Self->Data->AddDataOnDecommit(blob, txc); + blocksFinished = barriersFinished = true; // no blocks and no more barriers } auto& decommitState = Self->Self->DecommitState; + const auto decommitStateOnEntry = decommitState; if (blocksFinished && decommitState < EDecommitState::BlocksFinished) { decommitState = EDecommitState::BlocksFinished; UnblockRegisterActorQ = true; @@ -134,6 +136,7 @@ namespace NKikimr::NBlobDepot { if (barriersFinished && decommitState < EDecommitState::BarriersFinished) { decommitState = EDecommitState::BarriersFinished; } + const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty(); if (done && decommitState < EDecommitState::BlobsFinished) { decommitState = EDecommitState::BlobsFinished; } @@ -143,6 +146,24 @@ namespace NKikimr::NBlobDepot { NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState()) ); + auto toString = [](EDecommitState state) { + switch (state) { + case EDecommitState::Default: return "Default"; + case EDecommitState::BlocksFinished: return "BlocksFinished"; + case EDecommitState::BarriersFinished: return "BarriersFinished"; + case EDecommitState::BlobsFinished: return "BlobsFinished"; + case EDecommitState::BlobsCopied: return "BlobsCopied"; + case EDecommitState::Done: return "Done"; + } + }; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "decommit state change", (Id, Self->Self->GetLogId()), + (From, toString(decommitStateOnEntry)), (To, toString(decommitState)), + (UnblockRegisterActorQ, UnblockRegisterActorQ)); + + if (!Ev->Blobs.empty()) { + Self->Self->Data->LastAssimilatedBlobId = Ev->Blobs.back().Id; + } + return true; } @@ -292,21 +313,21 @@ namespace NKikimr::NBlobDepot { void TAssimilator::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { auto& msg = *ev->Get(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientConnected", (Id, Self->GetLogId()), (Status, msg.Status)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT39, "received TEvClientConnected", (Id, Self->GetLogId()), (Status, msg.Status)); if (msg.Status != NKikimrProto::OK) { CreatePipe(); } } void TAssimilator::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr /*ev*/) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientDestroyed", (Id, Self->GetLogId())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT40, "received TEvClientDestroyed", (Id, Self->GetLogId())); CreatePipe(); } void TAssimilator::Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev) { auto& msg = *ev->Get(); const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvControllerGroupDecommittedResponse", (Id, Self->GetLogId()), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT41, "received TEvControllerGroupDecommittedResponse", (Id, Self->GetLogId()), (Status, status)); if (status == NKikimrProto::OK) { class TTxFinishDecommission : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index f1ca62ccd2a..daa068ea1ea 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -40,6 +40,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle); hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle); + hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle); hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index cf92fccc7a1..a13ea999d60 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -32,13 +32,10 @@ namespace NKikimr::NBlobDepot { void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc) { + TKey key(blob.Id); + bool underSoft, underHard; Self->BarrierServer->GetBlobBarrierRelation(blob.Id, &underSoft, &underHard); - if (underHard || (underSoft && !blob.Keep)) { - return; // we can skip this blob as it is already being collected - } - - TKey key(blob.Id); // calculate keep state for this blob const auto it = Data.find(key); @@ -46,6 +43,13 @@ namespace NKikimr::NBlobDepot { blob.DoNotKeep ? NKikimrBlobDepot::EKeepState::DoNotKeep : blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob), + (UnderHard, underHard), (UnderSoft, underSoft), (KeepState, keepState)); + + if (underHard || (underSoft && keepState != NKikimrBlobDepot::EKeepState::Keep)) { + return; // we can skip this blob as it is already being collected + } + NKikimrBlobDepot::TValue value; value.SetKeepState(keepState); value.SetUnconfirmed(true); @@ -59,7 +63,6 @@ namespace NKikimr::NBlobDepot { db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(valueData); PutKey(key, TValue(std::move(value))); - LastAssimilatedKey = key; } void TData::AddTrashOnLoad(TLogoBlobID id) { @@ -91,9 +94,7 @@ namespace NKikimr::NBlobDepot { referencedBytes += id.BlobSize(); }); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (Id, Self->GetLogId()), (Key, key), - (ValueChain.size, data.ValueChain.size()), (ReferencedBytes, referencedBytes), - (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (Id, Self->GetLogId()), (Key, key), (Value, data)); const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(data)); if (!inserted) { @@ -102,6 +103,9 @@ namespace NKikimr::NBlobDepot { } std::optional<TString> TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key), + (KeepState, keepState)); + const auto [it, inserted] = Data.try_emplace(std::move(key), TValue(keepState)); if (!inserted) { if (keepState <= it->second.KeepState) { diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 95ef9aa44b8..7a4c44c2cdb 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -238,6 +238,22 @@ namespace NKikimr::NBlobDepot { , Public(false) , Unconfirmed(false) {} + + TString ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + + void Output(IOutputStream& s) const { + s << "{Meta# '" << EscapeC(Meta) << "'" + << " ValueChain# " << FormatList(ValueChain) + << " KeepState# " << NKikimrBlobDepot::EKeepState_Name(KeepState) + << " Public# " << (Public ? "true" : "false") + << " Unconfirmed# " << (Unconfirmed ? "true" : "false") + << " OriginalBlobId# " << (OriginalBlobId ? OriginalBlobId->ToString() : "<none>") + << "}"; + } }; enum EScanFlags : ui32 { @@ -286,10 +302,20 @@ namespace NKikimr::NBlobDepot { THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup; TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash; std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order - std::optional<TKey> LastAssimilatedKey; + std::optional<TLogoBlobID> LastAssimilatedBlobId; + + friend class TGroupAssimilator; THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed + struct TResolveDecommitContext { + TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request + ui32 NumRangesInFlight = 0; + bool Errors = false; + }; + ui64 LastRangeId = 0; + THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts; + class TTxIssueGC; class TTxConfirmGC; @@ -382,6 +408,7 @@ namespace NKikimr::NBlobDepot { bool IsLoaded() const { return Loaded; } void Handle(TEvBlobDepot::TEvResolve::TPtr ev); + void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev); private: void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep, diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 97bd23e98a1..f8d9f897b7e 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -243,6 +243,11 @@ namespace NKikimr::NBlobDepot { item.SetMeta(value.Meta.data(), value.Meta.size()); } + if (!item.ValueChainSize()) { + STLOG(PRI_WARN, BLOB_DEPOT, BDT48, "empty ValueChain on Resolve", (Id, Self->GetLogId()), + (Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie)); + } + size_t itemSize = item.ByteSizeLong(); if (Outbox.empty() || lastResponseSize + itemSize > EventMaxByteSize) { if (!Outbox.empty()) { @@ -262,22 +267,121 @@ namespace NKikimr::NBlobDepot { void TData::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (Id, Self->GetLogId()), (Msg, ev->Get()->ToString()), - (Sender, ev->Sender), (Cookie, ev->Cookie)); + (Sender, ev->Sender), (Cookie, ev->Cookie), (LastAssimilatedBlobId, LastAssimilatedBlobId)); if (Self->Config.HasDecommitGroupId() && Self->DecommitState <= EDecommitState::BlobsFinished) { + std::vector<std::tuple<ui64, bool, TLogoBlobID, TLogoBlobID>> queries; + for (const auto& item : ev->Get()->Record.GetItems()) { - std::optional<TKey> end = item.HasEndingKey() - ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config)) - : std::nullopt; + if (!item.HasTabletId()) { + STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item)); + auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::ERROR, + "incorrect request"); + TActivationContext::Send(response.release()); + return; + } + + const ui64 tabletId = item.GetTabletId(); + if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) { + continue; // fast path + } + + TLogoBlobID minId(tabletId, 0, 0, 0, 0, 0); + TLogoBlobID maxId(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, TLogoBlobID::MaxBlobSize, + TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode); + + if (item.HasBeginningKey()) { + minId = TKey::FromBinaryKey(item.GetBeginningKey(), Self->Config).GetBlobId(); + } + if (item.HasEndingKey()) { + maxId = TKey::FromBinaryKey(item.GetEndingKey(), Self->Config).GetBlobId(); + } + + Y_VERIFY_DEBUG(minId.TabletID() == tabletId); + Y_VERIFY_DEBUG(maxId.TabletID() == tabletId); + + if (!LastAssimilatedBlobId || *LastAssimilatedBlobId < maxId) { + if (LastAssimilatedBlobId && minId < *LastAssimilatedBlobId) { + minId = *LastAssimilatedBlobId; + } + if (minId == maxId) { + const auto it = Data.find(TKey(minId)); + if (it != Data.end() && !it->second.ValueChain.empty() || it->second.OriginalBlobId) { + continue; // fast path for extreme queries + } + } + queries.emplace_back(tabletId, item.GetMustRestoreFirst(), minId, maxId); + } + } - if (!end || !LastAssimilatedKey || *LastAssimilatedKey < *end) { - // see if we have to query BS for this range and to apply items here - Y_VERIFY_DEBUG(false, "going to return corrupt data"); + if (!queries.empty()) { + const ui64 id = ++LastRangeId; + for (const auto& [tabletId, mustRestoreFirst, minId, maxId] : queries) { + auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId, mustRestoreFirst, + TInstant::Max(), true); + ev->Decommission = true; + + const auto& tabletId_ = tabletId; + const auto& minId_ = minId; + const auto& maxId_ = maxId; + const auto& mustRestoreFirst_ = mustRestoreFirst; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId_), + (MinId, minId_), (MaxId, maxId_), (MustRestoreFirst, mustRestoreFirst_)); + SendToBSProxy(Self->SelfId(), Self->Config.GetDecommitGroupId(), ev.release(), id); } + ResolveDecommitContexts[id] = {ev, (ui32)queries.size()}; + return; } } Self->Execute(std::make_unique<TTxResolve>(Self, ev)); } + void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) { + class TTxCommitRange : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + TEvBlobStorage::TEvRangeResult::TPtr Ev; + + public: + TTxCommitRange(TBlobDepot *self, TEvBlobStorage::TEvRangeResult::TPtr ev) + : TTransactionBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + if (Ev->Get()->Status == NKikimrProto::OK) { + for (const auto& response : Ev->Get()->Responses) { + Self->Data->AddDataOnDecommit({ + .Id = response.Id, + .Keep = response.Keep, + .DoNotKeep = response.DoNotKeep + }, txc); + } + } + return true; + } + + void Complete(const TActorContext&) override { + auto& contexts = Self->Data->ResolveDecommitContexts; + if (const auto it = contexts.find(Ev->Cookie); it != contexts.end()) { + TResolveDecommitContext& context = it->second; + if (Ev->Get()->Status != NKikimrProto::OK) { + context.Errors = true; + } + if (!--context.NumRangesInFlight) { + if (context.Errors) { + auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, Self->SelfId(), + NKikimrProto::ERROR, "errors in range queries"); + TActivationContext::Send(response.release()); + } else { + Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev)); + } + } + contexts.erase(it); + } + } + }; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, *ev->Get())); + Self->Execute(std::make_unique<TTxCommitRange>(Self, ev)); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index b093cfb2e3a..a602caf8ff6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -101,7 +101,7 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) { } void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber, - ui32 shift, TString &data) { + ui32 shift, TString &data, bool keep, bool doNotKeep) { // Add actual data to Parts Y_VERIFY(id.PartId() != 0); ui32 partIdx = id.PartId() - 1; @@ -129,6 +129,8 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB } } Y_VERIFY(isFound); + Keep |= keep; + DoNotKeep |= doNotKeep; } void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { @@ -196,7 +198,8 @@ void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogo Y_VERIFY(isFound); } -void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { +void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber, + bool keep, bool doNotKeep) { Y_UNUSED(info); Y_VERIFY(id.PartId() != 0); ui32 partIdx = id.PartId() - 1; @@ -216,6 +219,8 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog } } Y_VERIFY(isFound); + Keep |= keep; + DoNotKeep |= doNotKeep; } ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, @@ -418,11 +423,11 @@ void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) { state.AddPutOkResponse(*Info, id, orderNumber); } -void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data) { +void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data, bool keep, bool doNotKeep) { Y_VERIFY(bool(id)); Y_VERIFY(id.PartId() != 0); TBlobState &state = GetState(id); - state.AddResponseData(*Info, id, orderNumber, shift, data); + state.AddResponseData(*Info, id, orderNumber, shift, data, keep, doNotKeep); } void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) { @@ -432,11 +437,11 @@ void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) { state.AddNoDataResponse(*Info, id, orderNumber); } -void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber) { +void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) { Y_VERIFY(bool(id)); Y_VERIFY(id.PartId() != 0); TBlobState &state = GetState(id); - state.AddNotYetResponse(*Info, id, orderNumber); + state.AddNotYetResponse(*Info, id, orderNumber, keep, doNotKeep); } void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index cfd0eb64999..72bfd73800c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -83,6 +83,8 @@ struct TBlobState { bool IsDone = false; std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr; NWilson::TSpan *Span = nullptr; + bool Keep = false; + bool DoNotKeep = false; void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info); void AddNeeded(ui64 begin, ui64 size); @@ -90,11 +92,12 @@ struct TBlobState { void MarkBlobReadyToPut(ui8 blobIdx = 0); bool Restore(const TBlobStorageGroupInfo &info); void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring, - ui32 shift, TString &data); + ui32 shift, TString &data, bool keep, bool doNotKeep); void AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber); void AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring); void AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring); - void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring); + void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring, + bool keep, bool doNotKeep); ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const; void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, @@ -205,11 +208,11 @@ struct TBlackboard { void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData); void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0); void MoveBlobStateToDone(const TLogoBlobID &id); - void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data); + void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data, bool keep, bool doNotKeep); void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber); void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber); void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber); - void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber); + void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep); EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr); TBlobState& GetState(const TLogoBlobID &id); ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 283f6049d5e..2640bb7c862 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -40,6 +40,8 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas const TBlobState &blobState = Blackboard.GetState(query.Id); outResponse.Id = query.Id; outResponse.PartMap = blobState.PartMap; + outResponse.Keep = blobState.Keep; + outResponse.DoNotKeep = blobState.DoNotKeep; if (blobState.WholeSituation == TBlobState::ESituation::Absent) { bool okay = true; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h index 78f5f2a5231..c2d266b62ca 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h @@ -217,7 +217,7 @@ public: if (replyStatus == NKikimrProto::OK) { // TODO(cthulhu): Verify shift and response size, and cookie R_LOG_DEBUG_SX(logCtx, "BPG58", "Got# OK orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString()); - Blackboard.AddResponseData(blobId, orderNumber, resultShift, resultBuffer); + Blackboard.AddResponseData(blobId, orderNumber, resultShift, resultBuffer, result.GetKeep(), result.GetDoNotKeep()); } else if (replyStatus == NKikimrProto::NODATA) { R_LOG_DEBUG_SX(logCtx, "BPG59", "Got# NODATA orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString()); @@ -231,7 +231,7 @@ public: } else if (replyStatus == NKikimrProto::NOT_YET) { R_LOG_DEBUG_SX(logCtx, "BPG67", "Got# NOT_YET orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString()); - Blackboard.AddNotYetResponse(blobId, orderNumber); + Blackboard.AddNotYetResponse(blobId, orderNumber, result.GetKeep(), result.GetDoNotKeep()); } else { Y_VERIFY(false, "Unexpected reply status# %s", NKikimrProto::EReplyStatus_Name(replyStatus).data()); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp index f3be4f5dd48..8aaaf7ea0ec 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp @@ -31,6 +31,8 @@ class TBlobStorageGroupIndexRestoreGetRequest std::unique_ptr<TEvBlobStorage::TEvGetResult> PendingResult; + THashMap<TLogoBlobID, std::pair<bool, bool>> KeepFlags; + void ReplyAndDie(NKikimrProto::EReplyStatus status) { A_LOG_DEBUG_S("DSPI14", "ReplyAndDie" << " Reply with status# " << NKikimrProto::EReplyStatus_Name(status) @@ -117,6 +119,9 @@ class TBlobStorageGroupIndexRestoreGetRequest const ui32 queryIdx = result.GetCookie(); Y_VERIFY(queryIdx < QuerySize && blobId == Queries[queryIdx].Id); BlobStatus[queryIdx].UpdateFromResponseData(result, vdisk, Info.Get()); + auto& [keep, doNotKeep] = KeepFlags[blobId]; + keep |= result.GetKeep(); + doNotKeep |= result.GetDoNotKeep(); } if (!VGetsInFlight) { @@ -135,6 +140,10 @@ class TBlobStorageGroupIndexRestoreGetRequest auto &a = PendingResult->Responses[idx]; a.Id = q.Id; + const auto it = KeepFlags.find(q.Id); + Y_VERIFY(it != KeepFlags.end()); + std::tie(a.Keep, a.DoNotKeep) = it->second; + A_LOG_DEBUG_S("DSPI11", "OnEnoughVGetResults Id# " << q.Id << " BlobStatus# " << DumpBlobStatus(idx)); if (blobState == TBlobStorageGroupInfo::EBS_DISINTEGRATED) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index c329dbf4f87..3e636abb10a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -23,6 +23,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob const bool MustRestoreFirst; const bool IsIndexOnly; const ui32 ForceBlockedGeneration; + const bool Decommission; TInstant StartTime; TAutoPtr<TEvBlobStorage::TEvRangeResult> Reply; @@ -320,6 +321,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob auto ev = std::make_unique<TEvBlobStorage::TEvRange>(TabletId, From, To, MustRestoreFirst, Deadline, IsIndexOnly, ForceBlockedGeneration); ev->RestartCounter = counter; + ev->Decommission = Decommission; return ev; } @@ -351,6 +353,7 @@ public: , MustRestoreFirst(ev->MustRestoreFirst) , IsIndexOnly(ev->IsIndexOnly) , ForceBlockedGeneration(ev->ForceBlockedGeneration) + , Decommission(ev->Decommission) , StartTime(now) , FailedDisks(&Info->GetTopology()) {} diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 3393f2ac30e..b4757a8171d 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -1285,7 +1285,8 @@ namespace NKikimr { } void AddResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &logoBlobId, ui64 sh, - const char *data, size_t size, const ui64 *cookie = nullptr, const ui64 *ingress = nullptr) { + const char *data, size_t size, const ui64 *cookie = nullptr, const ui64 *ingress = nullptr, + bool keep = false, bool doNotKeep = false) { IncrementSize(size); NKikimrBlobStorage::TQueryResult *r = Record.AddResult(); r->SetStatus(status); @@ -1303,10 +1304,18 @@ namespace NKikimr { } if (ingress) r->SetIngress(*ingress); + if (keep) { + r->SetKeep(true); + } + if (doNotKeep) { + r->SetDoNotKeep(true); + } + Y_VERIFY_DEBUG(keep + doNotKeep <= 1); } void AddResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &logoBlobId, const ui64 *cookie = nullptr, - const ui64 *ingress = nullptr, const NMatrix::TVectorType *local = nullptr) { + const ui64 *ingress = nullptr, const NMatrix::TVectorType *local = nullptr, bool keep = false, + bool doNotKeep = false) { NKikimrBlobStorage::TQueryResult *r = Record.AddResult(); r->SetStatus(status); LogoBlobIDFromLogoBlobID(logoBlobId, r->MutableBlobID()); @@ -1319,6 +1328,13 @@ namespace NKikimr { r->AddParts(i + 1); } } + if (keep) { + r->SetKeep(true); + } + if (doNotKeep) { + r->SetDoNotKeep(true); + } + Y_VERIFY_DEBUG(keep + doNotKeep <= 1); } TString ToString() const override { diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp index 328f2f207a8..57be3046445 100644 --- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp @@ -128,7 +128,12 @@ namespace NKikimr { // Add result ui64 ingressRaw = ingress.Raw(); ui64 *pingr = (ShowInternals ? &ingressRaw : nullptr); - Result->AddResult(status, query->LogoBlobID, cookiePtr, pingr); + + const int mode = ingress.GetCollectMode(TIngress::IngressMode(QueryCtx->HullCtx->VCtx->Top->GType)); + const bool keep = (mode & CollectModeKeep) && !(mode & CollectModeDoNotKeep); + const bool doNotKeep = mode & CollectModeDoNotKeep; + + Result->AddResult(status, query->LogoBlobID, cookiePtr, pingr, nullptr, keep, doNotKeep); Merger.Clear(); } } @@ -194,6 +199,10 @@ namespace NKikimr { ui64 ingr = it->Ingress.Raw(); ui64 *pingr = (ShowInternals ? &ingr : nullptr); + const int mode = it->Ingress.GetCollectMode(TIngress::IngressMode(QueryCtx->HullCtx->VCtx->Top->GType)); + const bool keep = (mode & CollectModeKeep) && !(mode & CollectModeDoNotKeep); + const bool doNotKeep = mode & CollectModeDoNotKeep; + NReadBatcher::TDataItem::EType t = it->GetType(); switch (t) { case NReadBatcher::TDataItem::ET_CLEAN: @@ -210,7 +219,8 @@ namespace NKikimr { case NReadBatcher::TDataItem::ET_NOT_YET: // put NOT_YET Y_VERIFY(it->Id.PartId() > 0); - Result->AddResult(NKikimrProto::NOT_YET, it->Id, query->Shift, nullptr, query->Size, cookiePtr, pingr); + Result->AddResult(NKikimrProto::NOT_YET, it->Id, query->Shift, nullptr, query->Size, cookiePtr, + pingr, keep, doNotKeep); break; case NReadBatcher::TDataItem::ET_SETDISK: case NReadBatcher::TDataItem::ET_SETMEM: @@ -224,19 +234,23 @@ namespace NKikimr { ui64 Size; const ui64 *CookiePtr; const ui64 *IngrPtr; + const bool Keep; + const bool DoNotKeep; bool Success = true; void operator()(NReadBatcher::TReadError) { - Result->AddResult(NKikimrProto::CORRUPTED, Id, Shift, nullptr, Size, CookiePtr, IngrPtr); + Result->AddResult(NKikimrProto::CORRUPTED, Id, Shift, nullptr, Size, CookiePtr, IngrPtr, + Keep, DoNotKeep); Success = false; } void operator()(const char *data, size_t size) const { - Result->AddResult(NKikimrProto::OK, Id, Shift, data, size, CookiePtr, IngrPtr); + Result->AddResult(NKikimrProto::OK, Id, Shift, data, size, CookiePtr, IngrPtr, Keep, + DoNotKeep); } void operator()(const TRope& data) const { const TString s = data.ConvertToString(); (*this)(s.data(), s.size()); } - } processor{Result, it->Id, query->Shift, query->Size, cookiePtr, pingr}; + } processor{Result, it->Id, query->Shift, query->Size, cookiePtr, pingr, keep, doNotKeep}; rit.GetData(processor); if (!processor.Success) { NMatrix::TVectorType& v = neededParts[it->Id.FullID()]; diff --git a/ydb/core/blobstorage/vdisk/query/query_range.cpp b/ydb/core/blobstorage/vdisk/query/query_range.cpp index 4513187b550..5abc2cc7a0f 100644 --- a/ydb/core/blobstorage/vdisk/query/query_range.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_range.cpp @@ -130,7 +130,12 @@ namespace NKikimr { ui64 *pingr = (ShowInternals ? &ingr : nullptr); Y_VERIFY(logoBlobId.PartId() == 0); // Index-only response must contain a single record for the blob const NMatrix::TVectorType local = ingress.LocalParts(QueryCtx->HullCtx->VCtx->Top->GType); - Result->AddResult(NKikimrProto::OK, logoBlobId, CookiePtr, pingr, &local); + + const int mode = ingress.GetCollectMode(TIngress::IngressMode(QueryCtx->HullCtx->VCtx->Top->GType)); + const bool keep = (mode & CollectModeKeep) && !(mode & CollectModeDoNotKeep); + const bool doNotKeep = mode & CollectModeDoNotKeep; + + Result->AddResult(NKikimrProto::OK, logoBlobId, CookiePtr, pingr, &local, keep, doNotKeep); --Counter; ResultSize.AddLogoBlobIndex(); } diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index a3749d1a22e..ce621373296 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -188,6 +188,8 @@ message TEvResolve { optional bool ReturnOwners = 7 [default = false]; optional bool Reverse = 8 [default = false]; // reverse output optional uint64 Cookie = 9; // request cookie to match response item + optional fixed64 TabletId = 10; // used in virtual group mode to resolve keys of specific tablet + optional bool MustRestoreFirst = 11; } repeated TItem Items = 1; diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index c7aebd4ef7e..72f4f32e177 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -507,6 +507,9 @@ message TQueryResult { optional uint64 Ingress = 8; repeated uint32 Parts = 9; // part id's (>0) residing on this disk; returned only through index range queries + + optional bool Keep = 10; + optional bool DoNotKeep = 11; } message TEvVGetResult { diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp index 5556dad5e28..e0dea392639 100644 --- a/ydb/core/test_tablet/load_actor_impl.cpp +++ b/ydb/core/test_tablet/load_actor_impl.cpp @@ -18,10 +18,6 @@ namespace NKikimr::NTestShard { Settings.GetStorageServerPort())); Send(parentId, new TTestShard::TEvSwitchMode(TTestShard::EMode::STATE_SERVER_CONNECT)); Become(&TThis::StateFunc); - if (Settings.RestartPeriodsSize()) { - TActivationContext::Schedule(GenerateRandomInterval(Settings.GetRestartPeriods()), new IEventHandle( - TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); - } } void TLoadActor::PassAway() { diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h index a89e4b03bcf..2da3b1f07ac 100644 --- a/ydb/core/test_tablet/load_actor_impl.h +++ b/ydb/core/test_tablet/load_actor_impl.h @@ -34,9 +34,11 @@ namespace NKikimr::NTestShard { struct TEvValidationFinished : TEventLocal<TEvValidationFinished, EvValidationFinished> { std::unordered_map<TString, TKeyInfo> Keys; + bool InitialCheck; - TEvValidationFinished(std::unordered_map<TString, TKeyInfo> keys) + TEvValidationFinished(std::unordered_map<TString, TKeyInfo> keys, bool initialCheck) : Keys(std::move(keys)) + , InitialCheck(initialCheck) {} }; diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index b727144ac37..4b45f033338 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -348,7 +348,7 @@ namespace NKikimr::NTestShard { Y_VERIFY(info.ConfirmedState == info.PendingState); Y_VERIFY(info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED); } - Send(ParentId, new TEvValidationFinished(std::move(Keys))); + Send(ParentId, new TEvValidationFinished(std::move(Keys), InitialCheck)); PassAway(); } } @@ -595,6 +595,11 @@ namespace NKikimr::NTestShard { BytesOfData += info.Len; } Action(); + + if (Settings.RestartPeriodsSize() && ev->Get()->InitialCheck) { + TActivationContext::Schedule(GenerateRandomInterval(Settings.GetRestartPeriods()), new IEventHandle( + TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); + } } } // NKikimr::NTestShard |