aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-08-26 11:05:58 +0300
committeralexvru <alexvru@ydb.tech>2022-08-26 11:05:58 +0300
commit8c9b9f41b5ff58d0ba78d4e491bdb250b6758a8a (patch)
tree11d95af55816194779356db40e8658f2d1057fbf
parent89cae858a59ecae5378edcd229962b6d1be5e5f8 (diff)
downloadydb-8c9b9f41b5ff58d0ba78d4e491bdb250b6758a8a.tar.gz
BlobDepot work in progress -- TEvRange in decommission code
-rw-r--r--ydb/core/base/blobstorage.h5
-rw-r--r--ydb/core/blob_depot/agent/blob_mapping_cache.cpp14
-rw-r--r--ydb/core/blob_depot/agent/blob_mapping_cache.h2
-rw-r--r--ydb/core/blob_depot/agent/read.cpp7
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp13
-rw-r--r--ydb/core/blob_depot/assimilator.cpp33
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp1
-rw-r--r--ydb/core/blob_depot/data.cpp22
-rw-r--r--ydb/core/blob_depot/data.h29
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp118
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp17
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp9
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h20
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_extr.cpp24
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_range.cpp7
-rw-r--r--ydb/core/protos/blob_depot.proto2
-rw-r--r--ydb/core/protos/blobstorage.proto3
-rw-r--r--ydb/core/test_tablet/load_actor_impl.cpp4
-rw-r--r--ydb/core/test_tablet/load_actor_impl.h4
-rw-r--r--ydb/core/test_tablet/load_actor_read_validate.cpp7
25 files changed, 309 insertions, 54 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index cde55cb5f95..6e4275e9c63 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -1147,6 +1147,8 @@ struct TEvBlobStorage {
ui32 RequestedSize;
TString Buffer;
TVector<TPartMapItem> PartMap;
+ bool Keep = false;
+ bool DoNotKeep = false;
TResponse()
: Status(NKikimrProto::UNKNOWN)
@@ -1687,6 +1689,7 @@ struct TEvBlobStorage {
bool IsIndexOnly;
ui32 ForceBlockedGeneration;
ui32 RestartCounter = 0;
+ bool Decommission = false;
TEvRange(ui64 tabletId, const TLogoBlobID &from, const TLogoBlobID &to, const bool mustRestoreFirst,
TInstant deadline, bool isIndexOnly = false, ui32 forceBlockedGeneration = 0)
@@ -1729,6 +1732,8 @@ struct TEvBlobStorage {
struct TResponse {
TLogoBlobID Id;
TString Buffer;
+ bool Keep = false;
+ bool DoNotKeep = false;
TResponse()
{}
diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
index 71b1d822d82..e6427a3ce87 100644
--- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
+++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
@@ -10,13 +10,13 @@ namespace NKikimr::NBlobDepot {
if (inserted) {
entry.Key = it->first;
}
- entry.Values.emplace(item.GetValueChain());
+ entry.Values = item.GetValueChain();
Queue.PushBack(&entry);
entry.ResolveInFlight = false;
for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) {
- Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values.value()}, Agent.OtherRequestInFlight);
+ Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values}, Agent.OtherRequestInFlight);
}
}
}
@@ -28,8 +28,8 @@ namespace NKikimr::NBlobDepot {
if (inserted) {
entry.Key = it->first;
}
- if (entry.Values) {
- return &entry.Values.value();
+ if (!entry.Values.empty()) {
+ return &entry.Values;
}
if (!entry.ResolveInFlight) {
entry.ResolveInFlight = true;
@@ -39,6 +39,12 @@ namespace NKikimr::NBlobDepot {
item->SetBeginningKey(it->first);
item->SetEndingKey(it->first);
item->SetIncludeEnding(true);
+
+ if (Agent.VirtualGroupId) {
+ const auto& id = TLogoBlobID::FromBinary(it->first);
+ item->SetTabletId(id.TabletID());
+ }
+
Agent.Issue(std::move(msg), this, nullptr);
}
diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h
index 42d124b2bd0..3309bc97999 100644
--- a/ydb/core/blob_depot/agent/blob_mapping_cache.h
+++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h
@@ -20,7 +20,7 @@ namespace NKikimr::NBlobDepot {
struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> {
TStringBuf Key;
- std::optional<TResolvedValueChain> Values;
+ TResolvedValueChain Values;
bool ResolveInFlight = false;
std::list<TQueryWaitingForKey> QueriesWaitingForKey;
};
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index 6849e254f40..a2afee30142 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -40,6 +40,9 @@ namespace NKikimr::NBlobDepot {
};
std::vector<TReadItem> items;
+ const ui64 offsetOnEntry = offset;
+ const ui64 sizeOnEntry = size;
+
for (const auto& value : values) {
const ui32 groupId = value.GetGroupId();
const auto blobId = LogoBlobIDFromLogoBlobID(value.GetBlobId());
@@ -48,6 +51,8 @@ namespace NKikimr::NBlobDepot {
if (end <= begin || blobId.BlobSize() < end) {
*error = "incorrect SubrangeBegin/SubrangeEnd pair";
+ STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, *error, (VirtualGroupId, VirtualGroupId), (TabletId, TabletId),
+ (Values, FormatList(values)));
return false;
}
@@ -78,6 +83,8 @@ namespace NKikimr::NBlobDepot {
if (size) {
*error = "incorrect offset/size provided";
+ STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, *error, (VirtualGroupId, VirtualGroupId), (TabletId, TabletId),
+ (Offset, offsetOnEntry), (Size, sizeOnEntry), (Values, FormatList(values)));
return false;
}
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index 51cbaa05c9b..1ba881e64b2 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -55,6 +55,8 @@ namespace NKikimr::NBlobDepot {
item->SetIncludeEnding(true);
item->SetMaxKeys(1);
item->SetReverse(true);
+ item->SetTabletId(TabletId);
+ item->SetMustRestoreFirst(true);
Agent.Issue(std::move(resolve), this, nullptr);
}
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 670960401ea..7e0143414fd 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -22,6 +22,17 @@ namespace NKikimr::NBlobDepot {
void Initiate() override {
auto& msg = *Event->Get<TEvBlobStorage::TEvRange>();
+
+ if (msg.Decommission) {
+ Y_VERIFY(Agent.ProxyId);
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "forwarding TEvRange", (VirtualGroupId, Agent.VirtualGroupId),
+ (TabletId, Agent.TabletId), (Msg, msg), (ProxyId, Agent.ProxyId));
+ const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId));
+ Y_VERIFY(sent);
+ delete this;
+ return;
+ }
+
Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, msg.From, msg.To,
Agent.VirtualGroupId);
@@ -45,6 +56,8 @@ namespace NKikimr::NBlobDepot {
item->SetEndingKey(to);
item->SetIncludeEnding(true);
item->SetReverse(reverse);
+ item->SetTabletId(msg.TabletId);
+ item->SetMustRestoreFirst(msg.MustRestoreFirst);
Agent.Issue(std::move(resolve), this, nullptr);
++ResolvesInFlight;
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index ba66d03b68e..f7fc00806b1 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -99,9 +99,8 @@ namespace NKikimr::NBlobDepot {
bool Execute(TTransactionContext& txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);
- const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty();
- const bool blocksFinished = Ev->Blocks.empty() || !Ev->Barriers.empty() || !Ev->Blobs.empty() || done;
- const bool barriersFinished = Ev->Barriers.empty() || !Ev->Blobs.empty() || done;
+ bool blocksFinished = false;
+ bool barriersFinished = false;
if (const auto& blocks = Ev->Blocks; !blocks.empty()) {
Self->SkipBlocksUpTo = blocks.back().TabletId;
@@ -120,13 +119,16 @@ namespace NKikimr::NBlobDepot {
for (const auto& barrier : Ev->Barriers) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier));
Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc);
+ blocksFinished = true; // there will be no blocks for sure
}
for (const auto& blob : Ev->Blobs) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob));
Self->Self->Data->AddDataOnDecommit(blob, txc);
+ blocksFinished = barriersFinished = true; // no blocks and no more barriers
}
auto& decommitState = Self->Self->DecommitState;
+ const auto decommitStateOnEntry = decommitState;
if (blocksFinished && decommitState < EDecommitState::BlocksFinished) {
decommitState = EDecommitState::BlocksFinished;
UnblockRegisterActorQ = true;
@@ -134,6 +136,7 @@ namespace NKikimr::NBlobDepot {
if (barriersFinished && decommitState < EDecommitState::BarriersFinished) {
decommitState = EDecommitState::BarriersFinished;
}
+ const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty();
if (done && decommitState < EDecommitState::BlobsFinished) {
decommitState = EDecommitState::BlobsFinished;
}
@@ -143,6 +146,24 @@ namespace NKikimr::NBlobDepot {
NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState())
);
+ auto toString = [](EDecommitState state) {
+ switch (state) {
+ case EDecommitState::Default: return "Default";
+ case EDecommitState::BlocksFinished: return "BlocksFinished";
+ case EDecommitState::BarriersFinished: return "BarriersFinished";
+ case EDecommitState::BlobsFinished: return "BlobsFinished";
+ case EDecommitState::BlobsCopied: return "BlobsCopied";
+ case EDecommitState::Done: return "Done";
+ }
+ };
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "decommit state change", (Id, Self->Self->GetLogId()),
+ (From, toString(decommitStateOnEntry)), (To, toString(decommitState)),
+ (UnblockRegisterActorQ, UnblockRegisterActorQ));
+
+ if (!Ev->Blobs.empty()) {
+ Self->Self->Data->LastAssimilatedBlobId = Ev->Blobs.back().Id;
+ }
+
return true;
}
@@ -292,21 +313,21 @@ namespace NKikimr::NBlobDepot {
void TAssimilator::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
auto& msg = *ev->Get();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientConnected", (Id, Self->GetLogId()), (Status, msg.Status));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT39, "received TEvClientConnected", (Id, Self->GetLogId()), (Status, msg.Status));
if (msg.Status != NKikimrProto::OK) {
CreatePipe();
}
}
void TAssimilator::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr /*ev*/) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvClientDestroyed", (Id, Self->GetLogId()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT40, "received TEvClientDestroyed", (Id, Self->GetLogId()));
CreatePipe();
}
void TAssimilator::Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev) {
auto& msg = *ev->Get();
const NKikimrProto::EReplyStatus status = msg.Record.GetStatus();
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "received TEvControllerGroupDecommittedResponse", (Id, Self->GetLogId()),
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT41, "received TEvControllerGroupDecommittedResponse", (Id, Self->GetLogId()),
(Status, status));
if (status == NKikimrProto::OK) {
class TTxFinishDecommission : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index f1ca62ccd2a..daa068ea1ea 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -40,6 +40,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvCollectGarbage, BarrierServer->Handle);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Data->Handle);
+ hFunc(TEvBlobStorage::TEvRangeResult, Data->Handle);
hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index cf92fccc7a1..a13ea999d60 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -32,13 +32,10 @@ namespace NKikimr::NBlobDepot {
void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
NTabletFlatExecutor::TTransactionContext& txc) {
+ TKey key(blob.Id);
+
bool underSoft, underHard;
Self->BarrierServer->GetBlobBarrierRelation(blob.Id, &underSoft, &underHard);
- if (underHard || (underSoft && !blob.Keep)) {
- return; // we can skip this blob as it is already being collected
- }
-
- TKey key(blob.Id);
// calculate keep state for this blob
const auto it = Data.find(key);
@@ -46,6 +43,13 @@ namespace NKikimr::NBlobDepot {
blob.DoNotKeep ? NKikimrBlobDepot::EKeepState::DoNotKeep :
blob.Keep ? NKikimrBlobDepot::EKeepState::Keep : NKikimrBlobDepot::EKeepState::Default);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT49, "AddDataOnDecommit", (Id, Self->GetLogId()), (Blob, blob),
+ (UnderHard, underHard), (UnderSoft, underSoft), (KeepState, keepState));
+
+ if (underHard || (underSoft && keepState != NKikimrBlobDepot::EKeepState::Keep)) {
+ return; // we can skip this blob as it is already being collected
+ }
+
NKikimrBlobDepot::TValue value;
value.SetKeepState(keepState);
value.SetUnconfirmed(true);
@@ -59,7 +63,6 @@ namespace NKikimr::NBlobDepot {
db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Update<Schema::Data::Value>(valueData);
PutKey(key, TValue(std::move(value)));
- LastAssimilatedKey = key;
}
void TData::AddTrashOnLoad(TLogoBlobID id) {
@@ -91,9 +94,7 @@ namespace NKikimr::NBlobDepot {
referencedBytes += id.BlobSize();
});
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (Id, Self->GetLogId()), (Key, key),
- (ValueChain.size, data.ValueChain.size()), (ReferencedBytes, referencedBytes),
- (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (Id, Self->GetLogId()), (Key, key), (Value, data));
const auto [it, inserted] = Data.try_emplace(std::move(key), std::move(data));
if (!inserted) {
@@ -102,6 +103,9 @@ namespace NKikimr::NBlobDepot {
}
std::optional<TString> TData::UpdateKeepState(TKey key, NKikimrBlobDepot::EKeepState keepState) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT51, "UpdateKeepState", (Id, Self->GetLogId()), (Key, key),
+ (KeepState, keepState));
+
const auto [it, inserted] = Data.try_emplace(std::move(key), TValue(keepState));
if (!inserted) {
if (keepState <= it->second.KeepState) {
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 95ef9aa44b8..7a4c44c2cdb 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -238,6 +238,22 @@ namespace NKikimr::NBlobDepot {
, Public(false)
, Unconfirmed(false)
{}
+
+ TString ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+
+ void Output(IOutputStream& s) const {
+ s << "{Meta# '" << EscapeC(Meta) << "'"
+ << " ValueChain# " << FormatList(ValueChain)
+ << " KeepState# " << NKikimrBlobDepot::EKeepState_Name(KeepState)
+ << " Public# " << (Public ? "true" : "false")
+ << " Unconfirmed# " << (Unconfirmed ? "true" : "false")
+ << " OriginalBlobId# " << (OriginalBlobId ? OriginalBlobId->ToString() : "<none>")
+ << "}";
+ }
};
enum EScanFlags : ui32 {
@@ -286,10 +302,20 @@ namespace NKikimr::NBlobDepot {
THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash;
std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order
- std::optional<TKey> LastAssimilatedKey;
+ std::optional<TLogoBlobID> LastAssimilatedBlobId;
+
+ friend class TGroupAssimilator;
THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
+ struct TResolveDecommitContext {
+ TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request
+ ui32 NumRangesInFlight = 0;
+ bool Errors = false;
+ };
+ ui64 LastRangeId = 0;
+ THashMap<ui64, TResolveDecommitContext> ResolveDecommitContexts;
+
class TTxIssueGC;
class TTxConfirmGC;
@@ -382,6 +408,7 @@ namespace NKikimr::NBlobDepot {
bool IsLoaded() const { return Loaded; }
void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
+ void Handle(TEvBlobStorage::TEvRangeResult::TPtr ev);
private:
void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index 97bd23e98a1..f8d9f897b7e 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -243,6 +243,11 @@ namespace NKikimr::NBlobDepot {
item.SetMeta(value.Meta.data(), value.Meta.size());
}
+ if (!item.ValueChainSize()) {
+ STLOG(PRI_WARN, BLOB_DEPOT, BDT48, "empty ValueChain on Resolve", (Id, Self->GetLogId()),
+ (Key, key), (Value, value), (Item, item), (Sender, Request->Sender), (Cookie, Request->Cookie));
+ }
+
size_t itemSize = item.ByteSizeLong();
if (Outbox.empty() || lastResponseSize + itemSize > EventMaxByteSize) {
if (!Outbox.empty()) {
@@ -262,22 +267,121 @@ namespace NKikimr::NBlobDepot {
void TData::Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (Id, Self->GetLogId()), (Msg, ev->Get()->ToString()),
- (Sender, ev->Sender), (Cookie, ev->Cookie));
+ (Sender, ev->Sender), (Cookie, ev->Cookie), (LastAssimilatedBlobId, LastAssimilatedBlobId));
if (Self->Config.HasDecommitGroupId() && Self->DecommitState <= EDecommitState::BlobsFinished) {
+ std::vector<std::tuple<ui64, bool, TLogoBlobID, TLogoBlobID>> queries;
+
for (const auto& item : ev->Get()->Record.GetItems()) {
- std::optional<TKey> end = item.HasEndingKey()
- ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config))
- : std::nullopt;
+ if (!item.HasTabletId()) {
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDT42, "incorrect request", (Id, Self->GetLogId()), (Item, item));
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::ERROR,
+ "incorrect request");
+ TActivationContext::Send(response.release());
+ return;
+ }
+
+ const ui64 tabletId = item.GetTabletId();
+ if (LastAssimilatedBlobId && tabletId < LastAssimilatedBlobId->TabletID()) {
+ continue; // fast path
+ }
+
+ TLogoBlobID minId(tabletId, 0, 0, 0, 0, 0);
+ TLogoBlobID maxId(tabletId, Max<ui32>(), Max<ui32>(), TLogoBlobID::MaxChannel, TLogoBlobID::MaxBlobSize,
+ TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode);
+
+ if (item.HasBeginningKey()) {
+ minId = TKey::FromBinaryKey(item.GetBeginningKey(), Self->Config).GetBlobId();
+ }
+ if (item.HasEndingKey()) {
+ maxId = TKey::FromBinaryKey(item.GetEndingKey(), Self->Config).GetBlobId();
+ }
+
+ Y_VERIFY_DEBUG(minId.TabletID() == tabletId);
+ Y_VERIFY_DEBUG(maxId.TabletID() == tabletId);
+
+ if (!LastAssimilatedBlobId || *LastAssimilatedBlobId < maxId) {
+ if (LastAssimilatedBlobId && minId < *LastAssimilatedBlobId) {
+ minId = *LastAssimilatedBlobId;
+ }
+ if (minId == maxId) {
+ const auto it = Data.find(TKey(minId));
+ if (it != Data.end() && !it->second.ValueChain.empty() || it->second.OriginalBlobId) {
+ continue; // fast path for extreme queries
+ }
+ }
+ queries.emplace_back(tabletId, item.GetMustRestoreFirst(), minId, maxId);
+ }
+ }
- if (!end || !LastAssimilatedKey || *LastAssimilatedKey < *end) {
- // see if we have to query BS for this range and to apply items here
- Y_VERIFY_DEBUG(false, "going to return corrupt data");
+ if (!queries.empty()) {
+ const ui64 id = ++LastRangeId;
+ for (const auto& [tabletId, mustRestoreFirst, minId, maxId] : queries) {
+ auto ev = std::make_unique<TEvBlobStorage::TEvRange>(tabletId, minId, maxId, mustRestoreFirst,
+ TInstant::Max(), true);
+ ev->Decommission = true;
+
+ const auto& tabletId_ = tabletId;
+ const auto& minId_ = minId;
+ const auto& maxId_ = maxId;
+ const auto& mustRestoreFirst_ = mustRestoreFirst;
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "going to TEvRange", (Id, Self->GetLogId()), (TabletId, tabletId_),
+ (MinId, minId_), (MaxId, maxId_), (MustRestoreFirst, mustRestoreFirst_));
+ SendToBSProxy(Self->SelfId(), Self->Config.GetDecommitGroupId(), ev.release(), id);
}
+ ResolveDecommitContexts[id] = {ev, (ui32)queries.size()};
+ return;
}
}
Self->Execute(std::make_unique<TTxResolve>(Self, ev));
}
+ void TData::Handle(TEvBlobStorage::TEvRangeResult::TPtr ev) {
+ class TTxCommitRange : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ TEvBlobStorage::TEvRangeResult::TPtr Ev;
+
+ public:
+ TTxCommitRange(TBlobDepot *self, TEvBlobStorage::TEvRangeResult::TPtr ev)
+ : TTransactionBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ if (Ev->Get()->Status == NKikimrProto::OK) {
+ for (const auto& response : Ev->Get()->Responses) {
+ Self->Data->AddDataOnDecommit({
+ .Id = response.Id,
+ .Keep = response.Keep,
+ .DoNotKeep = response.DoNotKeep
+ }, txc);
+ }
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ auto& contexts = Self->Data->ResolveDecommitContexts;
+ if (const auto it = contexts.find(Ev->Cookie); it != contexts.end()) {
+ TResolveDecommitContext& context = it->second;
+ if (Ev->Get()->Status != NKikimrProto::OK) {
+ context.Errors = true;
+ }
+ if (!--context.NumRangesInFlight) {
+ if (context.Errors) {
+ auto [response, record] = TEvBlobDepot::MakeResponseFor(*context.Ev, Self->SelfId(),
+ NKikimrProto::ERROR, "errors in range queries");
+ TActivationContext::Send(response.release());
+ } else {
+ Self->Execute(std::make_unique<TTxResolve>(Self, context.Ev));
+ }
+ }
+ contexts.erase(it);
+ }
+ }
+ };
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT50, "TEvRangeResult", (Id, Self->GetLogId()), (Msg, *ev->Get()));
+ Self->Execute(std::make_unique<TTxCommitRange>(Self, ev));
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
index b093cfb2e3a..a602caf8ff6 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
@@ -101,7 +101,7 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
}
void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
- ui32 shift, TString &data) {
+ ui32 shift, TString &data, bool keep, bool doNotKeep) {
// Add actual data to Parts
Y_VERIFY(id.PartId() != 0);
ui32 partIdx = id.PartId() - 1;
@@ -129,6 +129,8 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
}
}
Y_VERIFY(isFound);
+ Keep |= keep;
+ DoNotKeep |= doNotKeep;
}
void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
@@ -196,7 +198,8 @@ void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogo
Y_VERIFY(isFound);
}
-void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
+void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
+ bool keep, bool doNotKeep) {
Y_UNUSED(info);
Y_VERIFY(id.PartId() != 0);
ui32 partIdx = id.PartId() - 1;
@@ -216,6 +219,8 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog
}
}
Y_VERIFY(isFound);
+ Keep |= keep;
+ DoNotKeep |= doNotKeep;
}
ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
@@ -418,11 +423,11 @@ void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddPutOkResponse(*Info, id, orderNumber);
}
-void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data) {
+void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data, bool keep, bool doNotKeep) {
Y_VERIFY(bool(id));
Y_VERIFY(id.PartId() != 0);
TBlobState &state = GetState(id);
- state.AddResponseData(*Info, id, orderNumber, shift, data);
+ state.AddResponseData(*Info, id, orderNumber, shift, data, keep, doNotKeep);
}
void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
@@ -432,11 +437,11 @@ void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddNoDataResponse(*Info, id, orderNumber);
}
-void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber) {
+void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) {
Y_VERIFY(bool(id));
Y_VERIFY(id.PartId() != 0);
TBlobState &state = GetState(id);
- state.AddNotYetResponse(*Info, id, orderNumber);
+ state.AddNotYetResponse(*Info, id, orderNumber, keep, doNotKeep);
}
void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
index cfd0eb64999..72bfd73800c 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
@@ -83,6 +83,8 @@ struct TBlobState {
bool IsDone = false;
std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr;
NWilson::TSpan *Span = nullptr;
+ bool Keep = false;
+ bool DoNotKeep = false;
void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
@@ -90,11 +92,12 @@ struct TBlobState {
void MarkBlobReadyToPut(ui8 blobIdx = 0);
bool Restore(const TBlobStorageGroupInfo &info);
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
- ui32 shift, TString &data);
+ ui32 shift, TString &data, bool keep, bool doNotKeep);
void AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
void AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
- void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
+ void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
+ bool keep, bool doNotKeep);
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
@@ -205,11 +208,11 @@ struct TBlackboard {
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
- void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data);
+ void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data, bool keep, bool doNotKeep);
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
- void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);
+ void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr);
TBlobState& GetState(const TLogoBlobID &id);
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
index 283f6049d5e..2640bb7c862 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
@@ -40,6 +40,8 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas
const TBlobState &blobState = Blackboard.GetState(query.Id);
outResponse.Id = query.Id;
outResponse.PartMap = blobState.PartMap;
+ outResponse.Keep = blobState.Keep;
+ outResponse.DoNotKeep = blobState.DoNotKeep;
if (blobState.WholeSituation == TBlobState::ESituation::Absent) {
bool okay = true;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
index 78f5f2a5231..c2d266b62ca 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
@@ -217,7 +217,7 @@ public:
if (replyStatus == NKikimrProto::OK) {
// TODO(cthulhu): Verify shift and response size, and cookie
R_LOG_DEBUG_SX(logCtx, "BPG58", "Got# OK orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString());
- Blackboard.AddResponseData(blobId, orderNumber, resultShift, resultBuffer);
+ Blackboard.AddResponseData(blobId, orderNumber, resultShift, resultBuffer, result.GetKeep(), result.GetDoNotKeep());
} else if (replyStatus == NKikimrProto::NODATA) {
R_LOG_DEBUG_SX(logCtx, "BPG59", "Got# NODATA orderNumber# " << orderNumber
<< " vDiskId# " << vdisk.ToString());
@@ -231,7 +231,7 @@ public:
} else if (replyStatus == NKikimrProto::NOT_YET) {
R_LOG_DEBUG_SX(logCtx, "BPG67", "Got# NOT_YET orderNumber# " << orderNumber
<< " vDiskId# " << vdisk.ToString());
- Blackboard.AddNotYetResponse(blobId, orderNumber);
+ Blackboard.AddNotYetResponse(blobId, orderNumber, result.GetKeep(), result.GetDoNotKeep());
} else {
Y_VERIFY(false, "Unexpected reply status# %s", NKikimrProto::EReplyStatus_Name(replyStatus).data());
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
index f3be4f5dd48..8aaaf7ea0ec 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
@@ -31,6 +31,8 @@ class TBlobStorageGroupIndexRestoreGetRequest
std::unique_ptr<TEvBlobStorage::TEvGetResult> PendingResult;
+ THashMap<TLogoBlobID, std::pair<bool, bool>> KeepFlags;
+
void ReplyAndDie(NKikimrProto::EReplyStatus status) {
A_LOG_DEBUG_S("DSPI14", "ReplyAndDie"
<< " Reply with status# " << NKikimrProto::EReplyStatus_Name(status)
@@ -117,6 +119,9 @@ class TBlobStorageGroupIndexRestoreGetRequest
const ui32 queryIdx = result.GetCookie();
Y_VERIFY(queryIdx < QuerySize && blobId == Queries[queryIdx].Id);
BlobStatus[queryIdx].UpdateFromResponseData(result, vdisk, Info.Get());
+ auto& [keep, doNotKeep] = KeepFlags[blobId];
+ keep |= result.GetKeep();
+ doNotKeep |= result.GetDoNotKeep();
}
if (!VGetsInFlight) {
@@ -135,6 +140,10 @@ class TBlobStorageGroupIndexRestoreGetRequest
auto &a = PendingResult->Responses[idx];
a.Id = q.Id;
+ const auto it = KeepFlags.find(q.Id);
+ Y_VERIFY(it != KeepFlags.end());
+ std::tie(a.Keep, a.DoNotKeep) = it->second;
+
A_LOG_DEBUG_S("DSPI11", "OnEnoughVGetResults Id# " << q.Id << " BlobStatus# " << DumpBlobStatus(idx));
if (blobState == TBlobStorageGroupInfo::EBS_DISINTEGRATED) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index c329dbf4f87..3e636abb10a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -23,6 +23,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
const bool MustRestoreFirst;
const bool IsIndexOnly;
const ui32 ForceBlockedGeneration;
+ const bool Decommission;
TInstant StartTime;
TAutoPtr<TEvBlobStorage::TEvRangeResult> Reply;
@@ -320,6 +321,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
auto ev = std::make_unique<TEvBlobStorage::TEvRange>(TabletId, From, To, MustRestoreFirst, Deadline, IsIndexOnly,
ForceBlockedGeneration);
ev->RestartCounter = counter;
+ ev->Decommission = Decommission;
return ev;
}
@@ -351,6 +353,7 @@ public:
, MustRestoreFirst(ev->MustRestoreFirst)
, IsIndexOnly(ev->IsIndexOnly)
, ForceBlockedGeneration(ev->ForceBlockedGeneration)
+ , Decommission(ev->Decommission)
, StartTime(now)
, FailedDisks(&Info->GetTopology())
{}
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 3393f2ac30e..b4757a8171d 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -1285,7 +1285,8 @@ namespace NKikimr {
}
void AddResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &logoBlobId, ui64 sh,
- const char *data, size_t size, const ui64 *cookie = nullptr, const ui64 *ingress = nullptr) {
+ const char *data, size_t size, const ui64 *cookie = nullptr, const ui64 *ingress = nullptr,
+ bool keep = false, bool doNotKeep = false) {
IncrementSize(size);
NKikimrBlobStorage::TQueryResult *r = Record.AddResult();
r->SetStatus(status);
@@ -1303,10 +1304,18 @@ namespace NKikimr {
}
if (ingress)
r->SetIngress(*ingress);
+ if (keep) {
+ r->SetKeep(true);
+ }
+ if (doNotKeep) {
+ r->SetDoNotKeep(true);
+ }
+ Y_VERIFY_DEBUG(keep + doNotKeep <= 1);
}
void AddResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &logoBlobId, const ui64 *cookie = nullptr,
- const ui64 *ingress = nullptr, const NMatrix::TVectorType *local = nullptr) {
+ const ui64 *ingress = nullptr, const NMatrix::TVectorType *local = nullptr, bool keep = false,
+ bool doNotKeep = false) {
NKikimrBlobStorage::TQueryResult *r = Record.AddResult();
r->SetStatus(status);
LogoBlobIDFromLogoBlobID(logoBlobId, r->MutableBlobID());
@@ -1319,6 +1328,13 @@ namespace NKikimr {
r->AddParts(i + 1);
}
}
+ if (keep) {
+ r->SetKeep(true);
+ }
+ if (doNotKeep) {
+ r->SetDoNotKeep(true);
+ }
+ Y_VERIFY_DEBUG(keep + doNotKeep <= 1);
}
TString ToString() const override {
diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp
index 328f2f207a8..57be3046445 100644
--- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp
@@ -128,7 +128,12 @@ namespace NKikimr {
// Add result
ui64 ingressRaw = ingress.Raw();
ui64 *pingr = (ShowInternals ? &ingressRaw : nullptr);
- Result->AddResult(status, query->LogoBlobID, cookiePtr, pingr);
+
+ const int mode = ingress.GetCollectMode(TIngress::IngressMode(QueryCtx->HullCtx->VCtx->Top->GType));
+ const bool keep = (mode & CollectModeKeep) && !(mode & CollectModeDoNotKeep);
+ const bool doNotKeep = mode & CollectModeDoNotKeep;
+
+ Result->AddResult(status, query->LogoBlobID, cookiePtr, pingr, nullptr, keep, doNotKeep);
Merger.Clear();
}
}
@@ -194,6 +199,10 @@ namespace NKikimr {
ui64 ingr = it->Ingress.Raw();
ui64 *pingr = (ShowInternals ? &ingr : nullptr);
+ const int mode = it->Ingress.GetCollectMode(TIngress::IngressMode(QueryCtx->HullCtx->VCtx->Top->GType));
+ const bool keep = (mode & CollectModeKeep) && !(mode & CollectModeDoNotKeep);
+ const bool doNotKeep = mode & CollectModeDoNotKeep;
+
NReadBatcher::TDataItem::EType t = it->GetType();
switch (t) {
case NReadBatcher::TDataItem::ET_CLEAN:
@@ -210,7 +219,8 @@ namespace NKikimr {
case NReadBatcher::TDataItem::ET_NOT_YET:
// put NOT_YET
Y_VERIFY(it->Id.PartId() > 0);
- Result->AddResult(NKikimrProto::NOT_YET, it->Id, query->Shift, nullptr, query->Size, cookiePtr, pingr);
+ Result->AddResult(NKikimrProto::NOT_YET, it->Id, query->Shift, nullptr, query->Size, cookiePtr,
+ pingr, keep, doNotKeep);
break;
case NReadBatcher::TDataItem::ET_SETDISK:
case NReadBatcher::TDataItem::ET_SETMEM:
@@ -224,19 +234,23 @@ namespace NKikimr {
ui64 Size;
const ui64 *CookiePtr;
const ui64 *IngrPtr;
+ const bool Keep;
+ const bool DoNotKeep;
bool Success = true;
void operator()(NReadBatcher::TReadError) {
- Result->AddResult(NKikimrProto::CORRUPTED, Id, Shift, nullptr, Size, CookiePtr, IngrPtr);
+ Result->AddResult(NKikimrProto::CORRUPTED, Id, Shift, nullptr, Size, CookiePtr, IngrPtr,
+ Keep, DoNotKeep);
Success = false;
}
void operator()(const char *data, size_t size) const {
- Result->AddResult(NKikimrProto::OK, Id, Shift, data, size, CookiePtr, IngrPtr);
+ Result->AddResult(NKikimrProto::OK, Id, Shift, data, size, CookiePtr, IngrPtr, Keep,
+ DoNotKeep);
}
void operator()(const TRope& data) const {
const TString s = data.ConvertToString();
(*this)(s.data(), s.size());
}
- } processor{Result, it->Id, query->Shift, query->Size, cookiePtr, pingr};
+ } processor{Result, it->Id, query->Shift, query->Size, cookiePtr, pingr, keep, doNotKeep};
rit.GetData(processor);
if (!processor.Success) {
NMatrix::TVectorType& v = neededParts[it->Id.FullID()];
diff --git a/ydb/core/blobstorage/vdisk/query/query_range.cpp b/ydb/core/blobstorage/vdisk/query/query_range.cpp
index 4513187b550..5abc2cc7a0f 100644
--- a/ydb/core/blobstorage/vdisk/query/query_range.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_range.cpp
@@ -130,7 +130,12 @@ namespace NKikimr {
ui64 *pingr = (ShowInternals ? &ingr : nullptr);
Y_VERIFY(logoBlobId.PartId() == 0); // Index-only response must contain a single record for the blob
const NMatrix::TVectorType local = ingress.LocalParts(QueryCtx->HullCtx->VCtx->Top->GType);
- Result->AddResult(NKikimrProto::OK, logoBlobId, CookiePtr, pingr, &local);
+
+ const int mode = ingress.GetCollectMode(TIngress::IngressMode(QueryCtx->HullCtx->VCtx->Top->GType));
+ const bool keep = (mode & CollectModeKeep) && !(mode & CollectModeDoNotKeep);
+ const bool doNotKeep = mode & CollectModeDoNotKeep;
+
+ Result->AddResult(NKikimrProto::OK, logoBlobId, CookiePtr, pingr, &local, keep, doNotKeep);
--Counter;
ResultSize.AddLogoBlobIndex();
}
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index a3749d1a22e..ce621373296 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -188,6 +188,8 @@ message TEvResolve {
optional bool ReturnOwners = 7 [default = false];
optional bool Reverse = 8 [default = false]; // reverse output
optional uint64 Cookie = 9; // request cookie to match response item
+ optional fixed64 TabletId = 10; // used in virtual group mode to resolve keys of specific tablet
+ optional bool MustRestoreFirst = 11;
}
repeated TItem Items = 1;
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index c7aebd4ef7e..72f4f32e177 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -507,6 +507,9 @@ message TQueryResult {
optional uint64 Ingress = 8;
repeated uint32 Parts = 9; // part id's (>0) residing on this disk; returned only through index range queries
+
+ optional bool Keep = 10;
+ optional bool DoNotKeep = 11;
}
message TEvVGetResult {
diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp
index 5556dad5e28..e0dea392639 100644
--- a/ydb/core/test_tablet/load_actor_impl.cpp
+++ b/ydb/core/test_tablet/load_actor_impl.cpp
@@ -18,10 +18,6 @@ namespace NKikimr::NTestShard {
Settings.GetStorageServerPort()));
Send(parentId, new TTestShard::TEvSwitchMode(TTestShard::EMode::STATE_SERVER_CONNECT));
Become(&TThis::StateFunc);
- if (Settings.RestartPeriodsSize()) {
- TActivationContext::Schedule(GenerateRandomInterval(Settings.GetRestartPeriods()), new IEventHandle(
- TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
- }
}
void TLoadActor::PassAway() {
diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h
index a89e4b03bcf..2da3b1f07ac 100644
--- a/ydb/core/test_tablet/load_actor_impl.h
+++ b/ydb/core/test_tablet/load_actor_impl.h
@@ -34,9 +34,11 @@ namespace NKikimr::NTestShard {
struct TEvValidationFinished : TEventLocal<TEvValidationFinished, EvValidationFinished> {
std::unordered_map<TString, TKeyInfo> Keys;
+ bool InitialCheck;
- TEvValidationFinished(std::unordered_map<TString, TKeyInfo> keys)
+ TEvValidationFinished(std::unordered_map<TString, TKeyInfo> keys, bool initialCheck)
: Keys(std::move(keys))
+ , InitialCheck(initialCheck)
{}
};
diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp
index b727144ac37..4b45f033338 100644
--- a/ydb/core/test_tablet/load_actor_read_validate.cpp
+++ b/ydb/core/test_tablet/load_actor_read_validate.cpp
@@ -348,7 +348,7 @@ namespace NKikimr::NTestShard {
Y_VERIFY(info.ConfirmedState == info.PendingState);
Y_VERIFY(info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED);
}
- Send(ParentId, new TEvValidationFinished(std::move(Keys)));
+ Send(ParentId, new TEvValidationFinished(std::move(Keys), InitialCheck));
PassAway();
}
}
@@ -595,6 +595,11 @@ namespace NKikimr::NTestShard {
BytesOfData += info.Len;
}
Action();
+
+ if (Settings.RestartPeriodsSize() && ev->Get()->InitialCheck) {
+ TActivationContext::Schedule(GenerateRandomInterval(Settings.GetRestartPeriods()), new IEventHandle(
+ TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0));
+ }
}
} // NKikimr::NTestShard