diff options
author | Aleksandr Kriukov <[email protected]> | 2022-04-27 23:52:07 +0300 |
---|---|---|
committer | Aleksandr Kriukov <[email protected]> | 2022-04-27 23:52:07 +0300 |
commit | a447193ffaba945617c168539a78d6b911372369 (patch) | |
tree | 4fdf503609c9d4bf34bf6cd614af33cfb28a7c79 | |
parent | 4b54e6ccdd21efbbbe67818fa55d582a20394c58 (diff) |
Fix race during GC, KIKIMR-14680
ref:5f67b07b43501c719c827101e84f138f0cfc1e23
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 8 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 196 |
2 files changed, 184 insertions, 20 deletions
diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index a1965116336..96c00e8c0be 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -698,30 +698,32 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e } THelpers::DbUpdateState(StoredState, db, ctx); + + IsCollectEventSent = true; // corner case, if no CollectGarbage events were sent if (InitialCollectsSent == 0) { SendCutHistory(ctx); if (CollectOperation.Get()) { // finish collect operation from local base - IsCollectEventSent = true; StoreCollectComplete(ctx); } else { // initiate collection if trash was loaded from local base + IsCollectEventSent = false; PrepareCollectIfNeeded(ctx); } } } void TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) { - LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " InitialCollectsSent# " << InitialCollectsSent << " Marker# KV50"); if (--InitialCollectsSent == 0) { SendCutHistory(ctx); if (CollectOperation.Get()) { // finish collect operation from local base - IsCollectEventSent = true; StoreCollectComplete(ctx); } else { + IsCollectEventSent = false; // initiate collection if trash was loaded from local base PrepareCollectIfNeeded(ctx); } diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 7b46ebec585..a11c2f6b16c 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -11,6 +11,13 @@ const bool ENABLE_DETAILED_KV_LOG = false; namespace NKikimr { namespace { +template <typename... TArgs> +void TestLog(TArgs&&... args) { + if constexpr (ENABLE_DETAILED_KV_LOG) { + Cerr << ((TStringBuilder() << ... << args) << Endl); + } +} + void SetupLogging(TTestActorRuntime& runtime) { NActors::NLog::EPriority priority = ENABLE_DETAILED_KV_LOG ? NLog::PRI_DEBUG : NLog::PRI_ERROR; NActors::NLog::EPriority otherPriority = NLog::PRI_ERROR; @@ -82,10 +89,9 @@ struct TTestContext { SetupLogging(*Runtime); SetupTabletServices(*Runtime); setup(*Runtime); - TActorId bootstrapId = CreateTestBootstrapper(*Runtime, + CreateTestBootstrapper(*Runtime, CreateTestTabletInfo(TabletId, TabletType, TErasureType::ErasureNone), &CreateKeyValueFlat); - Cerr << (TStringBuilder() << "BootstrapId# " << bootstrapId << Endl); TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); @@ -501,22 +507,53 @@ struct TKeyValuePair { }; template <typename TRequestEvent> -void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) { +void SendRequest(const decltype(std::declval<TRequestEvent>().Record) &record, TTestContext &tc) { + auto request = std::make_unique<TRequestEvent>(); + request->Record = record; + TestLog("Send event# ", TypeName(*request)); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries()); +} + +template <typename TRequestEvent> +auto ReceiveResponse(TTestContext &tc) -> decltype(std::declval<typename TRequestEvent::TResponse>().Record) { TAutoPtr<IEventHandle> handle; - std::unique_ptr<TRequestEvent> request; - typename TRequestEvent::TResponse *response; + typename TRequestEvent::TResponse *response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle); + TestLog("Received event# ", TypeName(*response)); + return response->Record; +} + +template <typename TRequestEvent> +void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) { DoWithRetry([&] { tc.Runtime->ResetScheduledCount(); - request = std::make_unique<TRequestEvent>(); - request->Record = dp.Request; - Cerr << (TStringBuilder() << "Execute event# " << TypeName(*request) << Endl); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries()); - response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle); - dp.Response = response->Record; + SendRequest<TRequestEvent>(dp.Request, tc); + dp.Response = ReceiveResponse<TRequestEvent>(tc); return true; }); } + +void SendWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel, + NKikimrKeyValue::Priorities::Priority priority) +{ + NKikimrKeyValue::ExecuteTransactionRequest et; + + for (auto &[key, value] : pairs) { + NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = et.add_commands(); + NKikimrKeyValue::ExecuteTransactionRequest::Command::Write *write = cmd->mutable_write(); + + write->set_key(key); + write->set_value(value); + write->set_storage_channel(storageChannel); + write->set_priority(priority); + } + + et.set_tablet_id(tc.TabletId); + et.set_lock_generation(lockedGeneration); + + SendRequest<TEvKeyValue::TEvExecuteTransaction>(et, tc); +} + template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK> void ExecuteWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel, NKikimrKeyValue::Priorities::Priority priority) @@ -549,6 +586,46 @@ enum class EBorderKind { Without }; + +void SendDeleteRange(TTestContext &tc, + const TString &from, EBorderKind fromKind, + const TString &to, EBorderKind toKind, + ui64 lock_generation) +{ + NKikimrKeyValue::ExecuteTransactionRequest record; + record.set_lock_generation(lock_generation); + record.set_tablet_id(tc.TabletId); + + NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = record.add_commands(); + NKikimrKeyValue::ExecuteTransactionRequest::Command::DeleteRange *deleteRange = cmd->mutable_delete_range(); + + auto *r = deleteRange->mutable_range(); + + switch (fromKind) { + case EBorderKind::Include: + r->set_from_key_inclusive(from); + break; + case EBorderKind::Exclude: + r->set_from_key_exclusive(from); + break; + case EBorderKind::Without: + break; + } + + switch (toKind) { + case EBorderKind::Include: + r->set_to_key_inclusive(to); + break; + case EBorderKind::Exclude: + r->set_to_key_exclusive(to); + break; + case EBorderKind::Without: + break; + } + + SendRequest<TEvKeyValue::TEvExecuteTransaction>(record, tc); +} + template <bool IsSuccess = true> void ExecuteDeleteRange(TTestContext &tc, const TString &from, EBorderKind fromKind, @@ -878,19 +955,19 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { auto setup = [&] (TTestActorRuntime &runtime) { runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) { - Cerr << (TStringBuilder() << "CollectGarbageResult!!! " << event->Sender << "->" << event->Recipient << " Cookie# " << event->Cookie << Endl); + TestLog("CollectGarbageResult!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie); } if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) { - Cerr << (TStringBuilder() << "CollectGarbage!!! " << event->Sender << "->" << event->Recipient << " Cookie# " << event->Cookie << Endl); + TestLog("CollectGarbage!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie); if (!firstCollect) { - Cerr << (TStringBuilder() << "Drop!" << Endl); + TestLog("Drop!"); return TTestActorRuntime::EEventAction::DROP; } } if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) { - Cerr << (TStringBuilder() << "Collect!!! " << event->Sender << "->" << event->Recipient << " Cookie# " << event->Cookie << Endl); + TestLog("Collect!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie); if (firstCollect) { - Cerr << (TStringBuilder() << "Drop!" << Endl); + TestLog("Drop!"); runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill)); firstCollect = false; return TTestActorRuntime::EEventAction::DROP; @@ -901,7 +978,7 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& /*parentId*/, const TActorId& actorId) { if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") { tabletActor = actorId; - Cerr << (TStringBuilder() << "KV tablet was created " << actorId << Endl); + TestLog("KV tablet was created ", actorId); } }); }; @@ -921,6 +998,91 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { ExecuteRead(tc, "key1", "value1", 0, 0, 0); } + +Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowInitialGC) { + TTestContext tc; + TMaybe<TActorId> tabletActor; + TMaybe<TActorId> collectorActor; + ui32 collectStep = 1; + TQueue<TAutoPtr<IEventHandle>> savedInitialEvents; + + auto setup = [&] (TTestActorRuntime &runtime) { + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + //TestLog("Event ", (event && event->GetBase() ? TypeName(*event->GetBase()) : "unknown"), ' ', event->Sender, "->", event->Recipient); + if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) { + if (collectStep == 2) { + savedInitialEvents.push(event); + return TTestActorRuntime::EEventAction::DROP; + } + } + if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) { + } + if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) { + switch (collectStep++) { + case 1: { + runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill)); + return TTestActorRuntime::EEventAction::DROP; + } + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + if (tabletActor && *tabletActor == parentId) { + TestLog("CreateActor by KV ", actorId, ' ', TypeName(*runtime.FindActor(actorId))); + } + if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") { + tabletActor = actorId; + } + if (tabletActor && *tabletActor == parentId && TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueCollector") { + collectorActor = actorId; + } + }); + }; + TFinalizer finalizer(tc); + bool activeZone = false; + tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone); + ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill)); + ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteRead(tc, "key", "value", 0, 0, 0); + ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); + TDispatchOptions options; + options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); + tc.Runtime->DispatchEvents(options); + ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + + + TDispatchOptions options2; + options2.FinalEvents.push_back(TEvKeyValue::TEvCollect::EventType); + + bool onlyOneCollect = false; + try { + tc.Runtime->DispatchEvents(options2); + } catch (NActors::TSchedulingLimitReachedException) { + onlyOneCollect = true; + } + + while (savedInitialEvents.size()) { + tc.Runtime->Send(savedInitialEvents.front().Release()); + savedInitialEvents.pop(); + } + + TDispatchOptions options3; + options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType); + + tc.Runtime->DispatchEvents(options3); + + try { + tc.Runtime->DispatchEvents(options3); + } catch (NActors::TSchedulingLimitReachedException) { + onlyOneCollect = true; + } + UNIT_ASSERT(onlyOneCollect); +} + + Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOkWithNewApi) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { |