aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Kriukov <kru.bash.all@gmail.com>2022-05-05 13:59:07 +0300
committerAleksandr Kriukov <kru.bash.all@gmail.com>2022-05-05 13:59:07 +0300
commitda89cd9e274efbfa5515ef1ebf67b617c4ef47ba (patch)
treea72a49318ef2b9c53f1f8111f69c0471ac2fb49a
parentef5630e88d8bf4b5ecc3854c7dc8a802adbad1a5 (diff)
downloadydb-da89cd9e274efbfa5515ef1ebf67b617c4ef47ba.tar.gz
Move blobs from collect operation to trash during initialization, KIKIMR-14854
ref:b0023d2f62ef703b753fa82b98475abf22c0028c
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h54
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp84
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h4
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp26
4 files changed, 113 insertions, 55 deletions
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index 859e07d4de9..7ab1c7cd43c 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -246,6 +246,28 @@ protected:
}
};
+
+ struct TTxRegisterInitialGCCompletion : public NTabletFlatExecutor::ITransaction {
+ TKeyValueFlat *Self;
+
+ TTxRegisterInitialGCCompletion(TKeyValueFlat *keyValueFlat)
+ : Self(keyValueFlat)
+ {}
+
+ bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxRegisterInitialGCCompletion Execute");
+ TSimpleDbFlat db(txc.DB);
+ Self->State.RegisterInitialGCCompletionExecute(db, ctx);
+ return true;
+ }
+
+ void Complete(const TActorContext &ctx) override {
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID()
+ << " TTxRegisterInitialGCCompletion Complete");
+ Self->State.RegisterInitialGCCompletionComplete(ctx);
+ }
+ };
+
TKeyValueState State;
TDeque<TAutoPtr<IEventHandle>> InitialEventsQueue;
TActorId CollectorActorId;
@@ -373,23 +395,27 @@ protected:
return;
}
- if (ev->Cookie == (ui64)TKeyValueState::ECollectCookie::SoftInitial) {
- NKikimrProto::EReplyStatus status = ev->Get()->Status;
- if (status == NKikimrProto::OK) {
- State.RegisterInitialCollectResult(ctx);
- } else {
- LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
- << " received not ok TEvCollectGarbageResult"
- << " Status# " << NKikimrProto::EReplyStatus_Name(status)
- << " ErrorReason# " << ev->Get()->ErrorReason);
- Send(SelfId(), new TKikimrEvents::TEvPoisonPill);
- }
+ if (ev->Cookie != (ui64)TKeyValueState::ECollectCookie::SoftInitial) {
+ LOG_CRIT_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
+ << " received TEvCollectGarbageResult with unexpected Cookie# " << ev->Cookie);
+ Send(SelfId(), new TKikimrEvents::TEvPoisonPill);
return;
}
- LOG_CRIT_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
- << " received TEvCollectGarbageResult with unexpected Cookie# " << ev->Cookie);
- Send(SelfId(), new TKikimrEvents::TEvPoisonPill);
+ NKikimrProto::EReplyStatus status = ev->Get()->Status;
+ if (status != NKikimrProto::OK) {
+ LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
+ << " received not ok TEvCollectGarbageResult"
+ << " Status# " << NKikimrProto::EReplyStatus_Name(status)
+ << " ErrorReason# " << ev->Get()->ErrorReason);
+ Send(SelfId(), new TKikimrEvents::TEvPoisonPill);
+ return;
+ }
+
+ bool isLast = State.RegisterInitialCollectResult(ctx);
+ if (isLast) {
+ Execute(new TTxRegisterInitialGCCompletion(this));
+ }
}
void Handle(TEvKeyValue::TEvRequest::TPtr ev, const TActorContext &ctx) {
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index 96c00e8c0be..997fd31d1bc 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -575,6 +575,19 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
THashMap<TGroupChannel, THolder<TVector<TLogoBlobID>>> keepForGroupChannel;
const ui32 barrierGeneration = executorGeneration - 1;
const ui32 barrierStep = Max<ui32>();
+
+ auto addBlobToKeep = [&] (const TLogoBlobID &id) {
+ ui32 group = info->GroupFor(id.Channel(), id.Generation());
+ Y_VERIFY(group != Max<ui32>(), "RefBlob# %s is mapped to an invalid group (-1)!",
+ id.ToString().c_str());
+ TGroupChannel key(group, id.Channel());
+ THolder<TVector<TLogoBlobID>> &ptr = keepForGroupChannel[key];
+ if (!ptr) {
+ ptr = MakeHolder<TVector<TLogoBlobID>>();
+ }
+ ptr->emplace_back(id);
+ };
+
for (const auto &refInfo : RefCounts) {
// Extract blob id and validate its channel
const TLogoBlobID &id = refInfo.first;
@@ -584,19 +597,16 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
const THelpers::TGenerationStep storedGenStep(StoredState.GetCollectGeneration(), StoredState.GetCollectStep());
// Mark with keep flag only new blobs
if (storedGenStep < blobGenStep) {
- const ui32 group = info->GroupFor(id.Channel(), id.Generation());
- Y_VERIFY(group != Max<ui32>(), "RefBlob# %s is mapped to an invalid group (-1)!",
- id.ToString().c_str());
- const TGroupChannel key(group, id.Channel());
- auto it = keepForGroupChannel.find(key);
- if (it == keepForGroupChannel.end()) {
- bool isInserted = false;
- std::tie(it, isInserted) = keepForGroupChannel.emplace(key, MakeHolder<TVector<TLogoBlobID>>());
- Y_VERIFY(isInserted);
- }
- it->second->emplace_back(id);
+ addBlobToKeep(id);
}
}
+
+ if (CollectOperation) {
+ for (const TLogoBlobID &id : CollectOperation->Keep) {
+ addBlobToKeep(id);
+ }
+ }
+
for (const auto &channelInfo : info->Channels) {
if (channelInfo.Channel < BLOB_CHANNEL) {
continue;
@@ -676,7 +686,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
ChannelRangeSets[id.Channel()].Remove(id.Generation());
}
}
- if (CollectOperation.Get()) {
+ if (CollectOperation) {
auto &keep = CollectOperation->Keep;
for (auto it = keep.begin(); it != keep.end(); ++it) {
const TLogoBlobID &id = *it;
@@ -696,38 +706,52 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
CollectOperation->Header.SetCollectGeneration(ExecutorGeneration);
CollectOperation->Header.SetCollectStep(0);
}
+
+ if (CollectOperation) {
+ for (const TLogoBlobID &id : CollectOperation->DoNotKeep) {
+ Trash.insert(id);
+ THelpers::DbUpdateTrash(id, db, ctx);
+ }
+ THelpers::DbEraseCollect(db, ctx);
+ CollectOperation = nullptr;
+ }
+
+
THelpers::DbUpdateState(StoredState, db, ctx);
- IsCollectEventSent = true;
// corner case, if no CollectGarbage events were sent
if (InitialCollectsSent == 0) {
SendCutHistory(ctx);
- if (CollectOperation.Get()) {
- // finish collect operation from local base
- StoreCollectComplete(ctx);
- } else {
- // initiate collection if trash was loaded from local base
- IsCollectEventSent = false;
- PrepareCollectIfNeeded(ctx);
- }
+ RegisterInitialGCCompletionComplete(ctx);
+ } else {
+ IsCollectEventSent = true;
}
}
-void TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) {
+
+
+bool TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " InitialCollectsSent# " << InitialCollectsSent << " Marker# KV50");
if (--InitialCollectsSent == 0) {
SendCutHistory(ctx);
- if (CollectOperation.Get()) {
- // finish collect operation from local base
- StoreCollectComplete(ctx);
- } else {
- IsCollectEventSent = false;
- // initiate collection if trash was loaded from local base
- PrepareCollectIfNeeded(ctx);
- }
+ return true;
}
+ return false;
+}
+
+
+void TKeyValueState::RegisterInitialGCCompletionExecute(ISimpleDb &db, const TActorContext &ctx) {
+ StoredState.SetCollectGeneration(ExecutorGeneration);
+ StoredState.SetCollectStep(0);
+ THelpers::DbUpdateState(StoredState, db, ctx);
+}
+
+void TKeyValueState::RegisterInitialGCCompletionComplete(const TActorContext &ctx) {
+ IsCollectEventSent = false;
+ // initiate collection if trash was loaded from local base
+ PrepareCollectIfNeeded(ctx);
}
void TKeyValueState::SendCutHistory(const TActorContext &ctx) {
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index ffdc1f72504..a14dee8f233 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -318,7 +318,9 @@ public:
void Load(const TString &key, const TString& value);
void InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 executorGeneration, ISimpleDb &db,
const TActorContext &ctx, const TTabletStorageInfo *info);
- void RegisterInitialCollectResult(const TActorContext &ctx);
+ bool RegisterInitialCollectResult(const TActorContext &ctx);
+ void RegisterInitialGCCompletionExecute(ISimpleDb &db, const TActorContext &ctx);
+ void RegisterInitialGCCompletionComplete(const TActorContext &ctx);
void SendCutHistory(const TActorContext &ctx);
void OnInitQueueEmpty(const TActorContext &ctx);
void OnStateWork(const TActorContext &ctx);
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index a11c2f6b16c..021e3353bdf 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -1,5 +1,6 @@
#include "defs.h"
#include "keyvalue.h"
+#include "keyvalue_flat_impl.h"
#include "keyvalue_state.h"
#include <ydb/public/lib/base/msgbus.h>
#include <ydb/core/testlib/tablet_helpers.h>
@@ -7,13 +8,14 @@
#include <util/random/fast.h>
const bool ENABLE_DETAILED_KV_LOG = false;
+const bool ENABLE_TESTLOG_OUTPUT = false;
namespace NKikimr {
namespace {
template <typename... TArgs>
void TestLog(TArgs&&... args) {
- if constexpr (ENABLE_DETAILED_KV_LOG) {
+ if constexpr (ENABLE_TESTLOG_OUTPUT || ENABLE_DETAILED_KV_LOG) {
Cerr << ((TStringBuilder() << ... << args) << Endl);
}
}
@@ -84,7 +86,7 @@ struct TTestContext {
Y_UNUSED(dispatchName);
outActiveZone = false;
Runtime.Reset(new TTestBasicRuntime);
- Runtime->SetScheduledLimit(100);
+ Runtime->SetScheduledLimit(200);
Runtime->SetLogPriority(NKikimrServices::KEYVALUE, NLog::PRI_DEBUG);
SetupLogging(*Runtime);
SetupTabletServices(*Runtime);
@@ -1012,15 +1014,15 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) {
if (collectStep == 2) {
savedInitialEvents.push(event);
+ TestLog("Event drop; saved intial GCresult");
return TTestActorRuntime::EEventAction::DROP;
}
}
- if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) {
- }
if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) {
switch (collectStep++) {
case 1: {
runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill));
+ TestLog("Event drop; Collect; Tablet was poisoned");
return TTestActorRuntime::EEventAction::DROP;
}
}
@@ -1050,6 +1052,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
TDispatchOptions options;
options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
+ TestLog("First dispatch");
tc.Runtime->DispatchEvents(options);
ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
@@ -1059,9 +1062,11 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
bool onlyOneCollect = false;
try {
+ TestLog("Second dispatch");
tc.Runtime->DispatchEvents(options2);
} catch (NActors::TSchedulingLimitReachedException) {
onlyOneCollect = true;
+ TestLog("Exception was catch");
}
while (savedInitialEvents.size()) {
@@ -1072,13 +1077,14 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowI
TDispatchOptions options3;
options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType);
- tc.Runtime->DispatchEvents(options3);
+ TestLog("Third dispatch ", collectStep);
+ UNIT_ASSERT_VALUES_EQUAL(collectStep, 2);
+
+ ExecuteWrite(tc, {{"key4", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteWrite(tc, {{"key5", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteWrite(tc, {{"key6", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
- try {
- tc.Runtime->DispatchEvents(options3);
- } catch (NActors::TSchedulingLimitReachedException) {
- onlyOneCollect = true;
- }
+ UNIT_ASSERT_VALUES_EQUAL(collectStep, 3);
UNIT_ASSERT(onlyOneCollect);
}