summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Kriukov <[email protected]>2022-04-27 23:52:07 +0300
committerAleksandr Kriukov <[email protected]>2022-04-27 23:52:07 +0300
commita447193ffaba945617c168539a78d6b911372369 (patch)
tree4fdf503609c9d4bf34bf6cd614af33cfb28a7c79
parent4b54e6ccdd21efbbbe67818fa55d582a20394c58 (diff)
Fix race during GC, KIKIMR-14680
ref:5f67b07b43501c719c827101e84f138f0cfc1e23
-rw-r--r--ydb/core/keyvalue/keyvalue_state.cpp8
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp196
2 files changed, 184 insertions, 20 deletions
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp
index a1965116336..96c00e8c0be 100644
--- a/ydb/core/keyvalue/keyvalue_state.cpp
+++ b/ydb/core/keyvalue/keyvalue_state.cpp
@@ -698,30 +698,32 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
}
THelpers::DbUpdateState(StoredState, db, ctx);
+
+ IsCollectEventSent = true;
// corner case, if no CollectGarbage events were sent
if (InitialCollectsSent == 0) {
SendCutHistory(ctx);
if (CollectOperation.Get()) {
// finish collect operation from local base
- IsCollectEventSent = true;
StoreCollectComplete(ctx);
} else {
// initiate collection if trash was loaded from local base
+ IsCollectEventSent = false;
PrepareCollectIfNeeded(ctx);
}
}
}
void TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) {
- LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
<< " InitialCollectsSent# " << InitialCollectsSent << " Marker# KV50");
if (--InitialCollectsSent == 0) {
SendCutHistory(ctx);
if (CollectOperation.Get()) {
// finish collect operation from local base
- IsCollectEventSent = true;
StoreCollectComplete(ctx);
} else {
+ IsCollectEventSent = false;
// initiate collection if trash was loaded from local base
PrepareCollectIfNeeded(ctx);
}
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index 7b46ebec585..a11c2f6b16c 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -11,6 +11,13 @@ const bool ENABLE_DETAILED_KV_LOG = false;
namespace NKikimr {
namespace {
+template <typename... TArgs>
+void TestLog(TArgs&&... args) {
+ if constexpr (ENABLE_DETAILED_KV_LOG) {
+ Cerr << ((TStringBuilder() << ... << args) << Endl);
+ }
+}
+
void SetupLogging(TTestActorRuntime& runtime) {
NActors::NLog::EPriority priority = ENABLE_DETAILED_KV_LOG ? NLog::PRI_DEBUG : NLog::PRI_ERROR;
NActors::NLog::EPriority otherPriority = NLog::PRI_ERROR;
@@ -82,10 +89,9 @@ struct TTestContext {
SetupLogging(*Runtime);
SetupTabletServices(*Runtime);
setup(*Runtime);
- TActorId bootstrapId = CreateTestBootstrapper(*Runtime,
+ CreateTestBootstrapper(*Runtime,
CreateTestTabletInfo(TabletId, TabletType, TErasureType::ErasureNone),
&CreateKeyValueFlat);
- Cerr << (TStringBuilder() << "BootstrapId# " << bootstrapId << Endl);
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
@@ -501,22 +507,53 @@ struct TKeyValuePair {
};
template <typename TRequestEvent>
-void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) {
+void SendRequest(const decltype(std::declval<TRequestEvent>().Record) &record, TTestContext &tc) {
+ auto request = std::make_unique<TRequestEvent>();
+ request->Record = record;
+ TestLog("Send event# ", TypeName(*request));
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries());
+}
+
+template <typename TRequestEvent>
+auto ReceiveResponse(TTestContext &tc) -> decltype(std::declval<typename TRequestEvent::TResponse>().Record) {
TAutoPtr<IEventHandle> handle;
- std::unique_ptr<TRequestEvent> request;
- typename TRequestEvent::TResponse *response;
+ typename TRequestEvent::TResponse *response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle);
+ TestLog("Received event# ", TypeName(*response));
+ return response->Record;
+}
+
+template <typename TRequestEvent>
+void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) {
DoWithRetry([&] {
tc.Runtime->ResetScheduledCount();
- request = std::make_unique<TRequestEvent>();
- request->Record = dp.Request;
- Cerr << (TStringBuilder() << "Execute event# " << TypeName(*request) << Endl);
- tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries());
- response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle);
- dp.Response = response->Record;
+ SendRequest<TRequestEvent>(dp.Request, tc);
+ dp.Response = ReceiveResponse<TRequestEvent>(tc);
return true;
});
}
+
+void SendWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel,
+ NKikimrKeyValue::Priorities::Priority priority)
+{
+ NKikimrKeyValue::ExecuteTransactionRequest et;
+
+ for (auto &[key, value] : pairs) {
+ NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = et.add_commands();
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::Write *write = cmd->mutable_write();
+
+ write->set_key(key);
+ write->set_value(value);
+ write->set_storage_channel(storageChannel);
+ write->set_priority(priority);
+ }
+
+ et.set_tablet_id(tc.TabletId);
+ et.set_lock_generation(lockedGeneration);
+
+ SendRequest<TEvKeyValue::TEvExecuteTransaction>(et, tc);
+}
+
template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK>
void ExecuteWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel,
NKikimrKeyValue::Priorities::Priority priority)
@@ -549,6 +586,46 @@ enum class EBorderKind {
Without
};
+
+void SendDeleteRange(TTestContext &tc,
+ const TString &from, EBorderKind fromKind,
+ const TString &to, EBorderKind toKind,
+ ui64 lock_generation)
+{
+ NKikimrKeyValue::ExecuteTransactionRequest record;
+ record.set_lock_generation(lock_generation);
+ record.set_tablet_id(tc.TabletId);
+
+ NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = record.add_commands();
+ NKikimrKeyValue::ExecuteTransactionRequest::Command::DeleteRange *deleteRange = cmd->mutable_delete_range();
+
+ auto *r = deleteRange->mutable_range();
+
+ switch (fromKind) {
+ case EBorderKind::Include:
+ r->set_from_key_inclusive(from);
+ break;
+ case EBorderKind::Exclude:
+ r->set_from_key_exclusive(from);
+ break;
+ case EBorderKind::Without:
+ break;
+ }
+
+ switch (toKind) {
+ case EBorderKind::Include:
+ r->set_to_key_inclusive(to);
+ break;
+ case EBorderKind::Exclude:
+ r->set_to_key_exclusive(to);
+ break;
+ case EBorderKind::Without:
+ break;
+ }
+
+ SendRequest<TEvKeyValue::TEvExecuteTransaction>(record, tc);
+}
+
template <bool IsSuccess = true>
void ExecuteDeleteRange(TTestContext &tc,
const TString &from, EBorderKind fromKind,
@@ -878,19 +955,19 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
auto setup = [&] (TTestActorRuntime &runtime) {
runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) {
- Cerr << (TStringBuilder() << "CollectGarbageResult!!! " << event->Sender << "->" << event->Recipient << " Cookie# " << event->Cookie << Endl);
+ TestLog("CollectGarbageResult!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie);
}
if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) {
- Cerr << (TStringBuilder() << "CollectGarbage!!! " << event->Sender << "->" << event->Recipient << " Cookie# " << event->Cookie << Endl);
+ TestLog("CollectGarbage!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie);
if (!firstCollect) {
- Cerr << (TStringBuilder() << "Drop!" << Endl);
+ TestLog("Drop!");
return TTestActorRuntime::EEventAction::DROP;
}
}
if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) {
- Cerr << (TStringBuilder() << "Collect!!! " << event->Sender << "->" << event->Recipient << " Cookie# " << event->Cookie << Endl);
+ TestLog("Collect!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie);
if (firstCollect) {
- Cerr << (TStringBuilder() << "Drop!" << Endl);
+ TestLog("Drop!");
runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill));
firstCollect = false;
return TTestActorRuntime::EEventAction::DROP;
@@ -901,7 +978,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& /*parentId*/, const TActorId& actorId) {
if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") {
tabletActor = actorId;
- Cerr << (TStringBuilder() << "KV tablet was created " << actorId << Endl);
+ TestLog("KV tablet was created ", actorId);
}
});
};
@@ -921,6 +998,91 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) {
ExecuteRead(tc, "key1", "value1", 0, 0, 0);
}
+
+Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowInitialGC) {
+ TTestContext tc;
+ TMaybe<TActorId> tabletActor;
+ TMaybe<TActorId> collectorActor;
+ ui32 collectStep = 1;
+ TQueue<TAutoPtr<IEventHandle>> savedInitialEvents;
+
+ auto setup = [&] (TTestActorRuntime &runtime) {
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) {
+ //TestLog("Event ", (event && event->GetBase() ? TypeName(*event->GetBase()) : "unknown"), ' ', event->Sender, "->", event->Recipient);
+ if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) {
+ if (collectStep == 2) {
+ savedInitialEvents.push(event);
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) {
+ }
+ if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) {
+ switch (collectStep++) {
+ case 1: {
+ runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ if (tabletActor && *tabletActor == parentId) {
+ TestLog("CreateActor by KV ", actorId, ' ', TypeName(*runtime.FindActor(actorId)));
+ }
+ if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") {
+ tabletActor = actorId;
+ }
+ if (tabletActor && *tabletActor == parentId && TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueCollector") {
+ collectorActor = actorId;
+ }
+ });
+ };
+ TFinalizer finalizer(tc);
+ bool activeZone = false;
+ tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone);
+ ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill));
+ ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+ ExecuteRead(tc, "key", "value", 0, 0, 0);
+ ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0);
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType);
+ tc.Runtime->DispatchEvents(options);
+ ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+
+
+ TDispatchOptions options2;
+ options2.FinalEvents.push_back(TEvKeyValue::TEvCollect::EventType);
+
+ bool onlyOneCollect = false;
+ try {
+ tc.Runtime->DispatchEvents(options2);
+ } catch (NActors::TSchedulingLimitReachedException) {
+ onlyOneCollect = true;
+ }
+
+ while (savedInitialEvents.size()) {
+ tc.Runtime->Send(savedInitialEvents.front().Release());
+ savedInitialEvents.pop();
+ }
+
+ TDispatchOptions options3;
+ options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType);
+
+ tc.Runtime->DispatchEvents(options3);
+
+ try {
+ tc.Runtime->DispatchEvents(options3);
+ } catch (NActors::TSchedulingLimitReachedException) {
+ onlyOneCollect = true;
+ }
+ UNIT_ASSERT(onlyOneCollect);
+}
+
+
Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOkWithNewApi) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {