aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@yandex-team.ru>2022-06-06 14:52:20 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:52:20 +0300
commit3ad62aa372aacec6af48bb3e8c2e82a768022f8b (patch)
tree9abd5c4135f10c2055d288d83cd7ad689a67c0f8
parentd664fb5c2f4a6a578c314d2ecee39c904d2b4418 (diff)
downloadydb-3ad62aa372aacec6af48bb3e8c2e82a768022f8b.tar.gz
revert: 1650efddce963ea571676ae4d9c3658ea4807264; merge from trunk: r9407142; Fix race during GC, KIKIMR-14680
REVIEW: 2510720 x-ydb-stable-ref: c674fe371bece621a42fc6616ff23ff2efe610bb
-rw-r--r--ydb/core/keyvalue/keyvalue_collector.cpp2
-rw-r--r--ydb/core/keyvalue/keyvalue_flat_impl.h24
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp32
-rw-r--r--ydb/core/keyvalue/keyvalue_state.h6
-rw-r--r--ydb/core/keyvalue/keyvalue_state_collect.cpp2
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp230
6 files changed, 273 insertions, 23 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp
index 8b7696bcb2..6aa83dfd20 100644
--- a/ydb/core/keyvalue/keyvalue_collector.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector.cpp
@@ -177,7 +177,7 @@ public:
new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter,
channelIdx, true,
CollectOperation->Header.CollectGeneration, CollectOperation->Header.CollectStep,
- keep.Release(), doNotKeep.Release(), TInstant::Max(), true), 0);
+ keep.Release(), doNotKeep.Release(), TInstant::Max(), true), (ui64)TKeyValueState::ECollectCookie::Soft);
}
STFUNC(StateWait) {
diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h
index ae30344e14..859e07d4de 100644
--- a/ydb/core/keyvalue/keyvalue_flat_impl.h
+++ b/ydb/core/keyvalue/keyvalue_flat_impl.h
@@ -368,8 +368,28 @@ protected:
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev);
LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
- << " Handle TEvCollectGarbageResult Marker# KV52");
- State.RegisterInitialCollectResult(ctx);
+ << " Handle TEvCollectGarbageResult Cookie# " << ev->Cookie << " Marker# KV52");
+ if (ev->Cookie == (ui64)TKeyValueState::ECollectCookie::Hard) {
+ return;
+ }
+
+ if (ev->Cookie == (ui64)TKeyValueState::ECollectCookie::SoftInitial) {
+ NKikimrProto::EReplyStatus status = ev->Get()->Status;
+ if (status == NKikimrProto::OK) {
+ State.RegisterInitialCollectResult(ctx);
+ } else {
+ LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
+ << " received not ok TEvCollectGarbageResult"
+ << " Status# " << NKikimrProto::EReplyStatus_Name(status)
+ << " ErrorReason# " << ev->Get()->ErrorReason);
+ Send(SelfId(), new TKikimrEvents::TEvPoisonPill);
+ }
+ return;
+ }
+
+ LOG_CRIT_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
+ << " received TEvCollectGarbageResult with unexpected Cookie# " << ev->Cookie);
+ Send(SelfId(), new TKikimrEvents::TEvPoisonPill);
}
void Handle(TEvKeyValue::TEvRequest::TPtr ev, const TActorContext &ctx) {
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index 710763d964..96c00e8c0b 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -563,7 +563,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
const TActorId nodeWarden = MakeBlobStorageNodeWardenID(ctx.SelfID.NodeId());
const TActorId proxy = MakeBlobStorageProxyID(group);
ctx.ExecutorThread.Send(new IEventHandle(proxy, TActorId(), ev.Release(),
- IEventHandle::FlagForwardOnNondelivery, 0, &nodeWarden));
+ IEventHandle::FlagForwardOnNondelivery, (ui64)TKeyValueState::ECollectCookie::Hard, &nodeWarden));
}
}
@@ -628,7 +628,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
const TActorId nodeWarden = MakeBlobStorageNodeWardenID(ctx.SelfID.NodeId());
const TActorId proxy = MakeBlobStorageProxyID(group);
ctx.ExecutorThread.Send(new IEventHandle(proxy, KeyValueActorId, ev.Release(),
- IEventHandle::FlagForwardOnNondelivery, 0, &nodeWarden));
+ IEventHandle::FlagForwardOnNondelivery, (ui64)TKeyValueState::ECollectCookie::SoftInitial, &nodeWarden));
}
}
@@ -698,25 +698,35 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
}
THelpers::DbUpdateState(StoredState, db, ctx);
+
+ IsCollectEventSent = true;
// corner case, if no CollectGarbage events were sent
if (InitialCollectsSent == 0) {
SendCutHistory(ctx);
- }
- if (CollectOperation.Get()) {
- // finish collect operation from local base
- IsCollectEventSent = true;
- StoreCollectComplete(ctx);
- } else {
- // initiate collection if trash was loaded from local base
- PrepareCollectIfNeeded(ctx);
+ if (CollectOperation.Get()) {
+ // finish collect operation from local base
+ StoreCollectComplete(ctx);
+ } else {
+ // initiate collection if trash was loaded from local base
+ IsCollectEventSent = false;
+ PrepareCollectIfNeeded(ctx);
+ }
}
}
void TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " InitialCollectsSent# " << InitialCollectsSent << " Marker# KV50");
if (--InitialCollectsSent == 0) {
SendCutHistory(ctx);
+ if (CollectOperation.Get()) {
+ // finish collect operation from local base
+ StoreCollectComplete(ctx);
+ } else {
+ IsCollectEventSent = false;
+ // initiate collection if trash was loaded from local base
+ PrepareCollectIfNeeded(ctx);
+ }
}
}
diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h
index d179e1dfc6..ffdc1f7250 100644
--- a/ydb/core/keyvalue/keyvalue_state.h
+++ b/ydb/core/keyvalue/keyvalue_state.h
@@ -232,6 +232,12 @@ public:
}
};
+ enum class ECollectCookie {
+ Hard = 1,
+ SoftInitial = 2,
+ Soft = 3,
+ };
+
ui32 GetGeneration() const {
return StoredState.UserGeneration;
}
diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp
index 604c1e20ff..248d62fd3e 100644
--- a/ydb/core/keyvalue/keyvalue_state_collect.cpp
+++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp
@@ -6,7 +6,7 @@ namespace NKeyValue {
void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "PrepareCollectIfNeeded KeyValue# " << TabletId << " Marker# KV61");
- if (CollectOperation.Get()) {
+ if (CollectOperation.Get() || InitialCollectsSent) {
// We already are trying to collect something, just pass this time.
return;
}
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 093cd7d46c..a11c2f6b16 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -11,6 +11,13 @@ const bool ENABLE_DETAILED_KV_LOG = false;
namespace NKikimr {
namespace {
+template <typename... TArgs>
+void TestLog(TArgs&&... args) {
+ if constexpr (ENABLE_DETAILED_KV_LOG) {
+ Cerr << ((TStringBuilder() << ... << args) << Endl);
+ }
+}
+
void SetupLogging(TTestActorRuntime& runtime) {
NActors::NLog::EPriority priority = ENABLE_DETAILED_KV_LOG ? NLog::PRI_DEBUG : NLog::PRI_ERROR;
NActors::NLog::EPriority otherPriority = NLog::PRI_ERROR;
@@ -500,21 +507,53 @@ struct TKeyValuePair {
};
template <typename TRequestEvent>
-void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) {
+void SendRequest(const decltype(std::declval<TRequestEvent>().Record) &record, TTestContext &tc) {
+ auto request = std::make_unique<TRequestEvent>();
+ request->Record = record;
+ TestLog("Send event# ", TypeName(*request));
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries());
+}
+
+template <typename TRequestEvent>
+auto ReceiveResponse(TTestContext &tc) -> decltype(std::declval<typename TRequestEvent::TResponse>().Record) {
TAutoPtr<IEventHandle> handle;
- std::unique_ptr<TRequestEvent> request;
- typename TRequestEvent::TResponse *response;
+ typename TRequestEvent::TResponse *response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle);
+ TestLog("Received event# ", TypeName(*response));
+ return response->Record;
+}
+
+template <typename TRequestEvent>
+void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) {
DoWithRetry([&] {
tc.Runtime->ResetScheduledCount();
- request = std::make_unique<TRequestEvent>();
- request->Record = dp.Request;
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries());
- response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle);
- dp.Response = response->Record;
+ SendRequest<TRequestEvent>(dp.Request, tc);
+ dp.Response = ReceiveResponse<TRequestEvent>(tc);
return true;
});
}
+
+void SendWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel,
+ NKikimrKeyValue::Priorities::Priority priority)
+{
+ NKikimrKeyValue::ExecuteTransactionRequest et;
+
+ for (auto &[key, value] : pairs) {
+ NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = et.add_commands();
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::Write *write = cmd->mutable_write();
+
+ write->set_key(key);
+ write->set_value(value);
+ write->set_storage_channel(storageChannel);
+ write->set_priority(priority);
+ }
+
+ et.set_tablet_id(tc.TabletId);
+ et.set_lock_generation(lockedGeneration);
+
+ SendRequest<TEvKeyValue::TEvExecuteTransaction>(et, tc);
+}
+
template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK>
void ExecuteWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel,
NKikimrKeyValue::Priorities::Priority priority)
@@ -547,6 +586,46 @@ enum class EBorderKind {
Without
};
+
+void SendDeleteRange(TTestContext &tc,
+ const TString &from, EBorderKind fromKind,
+ const TString &to, EBorderKind toKind,
+ ui64 lock_generation)
+{
+ NKikimrKeyValue::ExecuteTransactionRequest record;
+ record.set_lock_generation(lock_generation);
+ record.set_tablet_id(tc.TabletId);
+
+ NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = record.add_commands();
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::DeleteRange *deleteRange = cmd->mutable_delete_range();
+
+ auto *r = deleteRange->mutable_range();
+
+ switch (fromKind) {
+ case EBorderKind::Include:
+ r->set_from_key_inclusive(from);
+ break;
+ case EBorderKind::Exclude:
+ r->set_from_key_exclusive(from);
+ break;
+ case EBorderKind::Without:
+ break;
+ }
+
+ switch (toKind) {
+ case EBorderKind::Include:
+ r->set_to_key_inclusive(to);
+ break;
+ case EBorderKind::Exclude:
+ r->set_to_key_exclusive(to);
+ break;
+ case EBorderKind::Without:
+ break;
+ }
+
+ SendRequest<TEvKeyValue::TEvExecuteTransaction>(record, tc);
+}
+
template <bool IsSuccess = true>
void ExecuteDeleteRange(TTestContext &tc,
const TString &from, EBorderKind fromKind,
@@ -869,6 +948,141 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOk) {
}
+Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
+ TTestContext tc;
+ TMaybe<TActorId> tabletActor;
+ bool firstCollect = true;
+ auto setup = [&] (TTestActorRuntime &runtime) {
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) {
+ TestLog("CollectGarbageResult!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie);
+ }
+ if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) {
+ TestLog("CollectGarbage!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie);
+ if (!firstCollect) {
+ TestLog("Drop!");
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) {
+ TestLog("Collect!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie);
+ if (firstCollect) {
+ TestLog("Drop!");
+ runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill));
+ firstCollect = false;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& /*parentId*/, const TActorId& actorId) {
+ if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") {
+ tabletActor = actorId;
+ TestLog("KV tablet was created ", actorId);
+ }
+ });
+ };
+ TFinalizer finalizer(tc);
+ bool activeZone = false;
+ tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone);
+ ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill));
+ ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteRead(tc, "key", "value", 0, 0, 0);
+ ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
+ tc.Runtime->DispatchEvents(options);
+ ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0);
+ ExecuteRead(tc, "key1", "value1", 0, 0, 0);
+}
+
+
+Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowInitialGC) {
+ TTestContext tc;
+ TMaybe<TActorId> tabletActor;
+ TMaybe<TActorId> collectorActor;
+ ui32 collectStep = 1;
+ TQueue<TAutoPtr<IEventHandle>> savedInitialEvents;
+
+ auto setup = [&] (TTestActorRuntime &runtime) {
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ //TestLog("Event ", (event && event->GetBase() ? TypeName(*event->GetBase()) : "unknown"), ' ', event->Sender, "->", event->Recipient);
+ if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) {
+ if (collectStep == 2) {
+ savedInitialEvents.push(event);
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) {
+ }
+ if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) {
+ switch (collectStep++) {
+ case 1: {
+ runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ if (tabletActor && *tabletActor == parentId) {
+ TestLog("CreateActor by KV ", actorId, ' ', TypeName(*runtime.FindActor(actorId)));
+ }
+ if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") {
+ tabletActor = actorId;
+ }
+ if (tabletActor && *tabletActor == parentId && TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueCollector") {
+ collectorActor = actorId;
+ }
+ });
+ };
+ TFinalizer finalizer(tc);
+ bool activeZone = false;
+ tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone);
+ ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill));
+ ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteRead(tc, "key", "value", 0, 0, 0);
+ ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
+ tc.Runtime->DispatchEvents(options);
+ ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+
+
+ TDispatchOptions options2;
+ options2.FinalEvents.push_back(TEvKeyValue::TEvCollect::EventType);
+
+ bool onlyOneCollect = false;
+ try {
+ tc.Runtime->DispatchEvents(options2);
+ } catch (NActors::TSchedulingLimitReachedException) {
+ onlyOneCollect = true;
+ }
+
+ while (savedInitialEvents.size()) {
+ tc.Runtime->Send(savedInitialEvents.front().Release());
+ savedInitialEvents.pop();
+ }
+
+ TDispatchOptions options3;
+ options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType);
+
+ tc.Runtime->DispatchEvents(options3);
+
+ try {
+ tc.Runtime->DispatchEvents(options3);
+ } catch (NActors::TSchedulingLimitReachedException) {
+ onlyOneCollect = true;
+ }
+ UNIT_ASSERT(onlyOneCollect);
+}
+
+
Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOkWithNewApi) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {