diff options
author | kruall <kruall@yandex-team.ru> | 2022-06-06 14:52:20 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:52:20 +0300 |
commit | 3ad62aa372aacec6af48bb3e8c2e82a768022f8b (patch) | |
tree | 9abd5c4135f10c2055d288d83cd7ad689a67c0f8 | |
parent | d664fb5c2f4a6a578c314d2ecee39c904d2b4418 (diff) | |
download | ydb-3ad62aa372aacec6af48bb3e8c2e82a768022f8b.tar.gz |
revert: 1650efddce963ea571676ae4d9c3658ea4807264; merge from trunk: r9407142; Fix race during GC, KIKIMR-14680
REVIEW: 2510720
x-ydb-stable-ref: c674fe371bece621a42fc6616ff23ff2efe610bb
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector.cpp | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_flat_impl.h | 24 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.cpp | 32 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state.h | 6 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_state_collect.cpp | 2 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 230 |
6 files changed, 273 insertions, 23 deletions
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp index 8b7696bcb2..6aa83dfd20 100644 --- a/ydb/core/keyvalue/keyvalue_collector.cpp +++ b/ydb/core/keyvalue/keyvalue_collector.cpp @@ -177,7 +177,7 @@ public: new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter, channelIdx, true, CollectOperation->Header.CollectGeneration, CollectOperation->Header.CollectStep, - keep.Release(), doNotKeep.Release(), TInstant::Max(), true), 0); + keep.Release(), doNotKeep.Release(), TInstant::Max(), true), (ui64)TKeyValueState::ECollectCookie::Soft); } STFUNC(StateWait) { diff --git a/ydb/core/keyvalue/keyvalue_flat_impl.h b/ydb/core/keyvalue/keyvalue_flat_impl.h index ae30344e14..859e07d4de 100644 --- a/ydb/core/keyvalue/keyvalue_flat_impl.h +++ b/ydb/core/keyvalue/keyvalue_flat_impl.h @@ -368,8 +368,28 @@ protected: void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() - << " Handle TEvCollectGarbageResult Marker# KV52"); - State.RegisterInitialCollectResult(ctx); + << " Handle TEvCollectGarbageResult Cookie# " << ev->Cookie << " Marker# KV52"); + if (ev->Cookie == (ui64)TKeyValueState::ECollectCookie::Hard) { + return; + } + + if (ev->Cookie == (ui64)TKeyValueState::ECollectCookie::SoftInitial) { + NKikimrProto::EReplyStatus status = ev->Get()->Status; + if (status == NKikimrProto::OK) { + State.RegisterInitialCollectResult(ctx); + } else { + LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() + << " received not ok TEvCollectGarbageResult" + << " Status# " << NKikimrProto::EReplyStatus_Name(status) + << " ErrorReason# " << ev->Get()->ErrorReason); + Send(SelfId(), new TKikimrEvents::TEvPoisonPill); + } + return; + } + + LOG_CRIT_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletID() + << " received TEvCollectGarbageResult with unexpected Cookie# " << ev->Cookie); + Send(SelfId(), new TKikimrEvents::TEvPoisonPill); } void Handle(TEvKeyValue::TEvRequest::TPtr ev, const TActorContext &ctx) { diff --git a/ydb/core/keyvalue/keyvalue_state.cpp b/ydb/core/keyvalue/keyvalue_state.cpp index 710763d964..96c00e8c0b 100644 --- a/ydb/core/keyvalue/keyvalue_state.cpp +++ b/ydb/core/keyvalue/keyvalue_state.cpp @@ -563,7 +563,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e const TActorId nodeWarden = MakeBlobStorageNodeWardenID(ctx.SelfID.NodeId()); const TActorId proxy = MakeBlobStorageProxyID(group); ctx.ExecutorThread.Send(new IEventHandle(proxy, TActorId(), ev.Release(), - IEventHandle::FlagForwardOnNondelivery, 0, &nodeWarden)); + IEventHandle::FlagForwardOnNondelivery, (ui64)TKeyValueState::ECollectCookie::Hard, &nodeWarden)); } } @@ -628,7 +628,7 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e const TActorId nodeWarden = MakeBlobStorageNodeWardenID(ctx.SelfID.NodeId()); const TActorId proxy = MakeBlobStorageProxyID(group); ctx.ExecutorThread.Send(new IEventHandle(proxy, KeyValueActorId, ev.Release(), - IEventHandle::FlagForwardOnNondelivery, 0, &nodeWarden)); + IEventHandle::FlagForwardOnNondelivery, (ui64)TKeyValueState::ECollectCookie::SoftInitial, &nodeWarden)); } } @@ -698,25 +698,35 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e } THelpers::DbUpdateState(StoredState, db, ctx); + + IsCollectEventSent = true; // corner case, if no CollectGarbage events were sent if (InitialCollectsSent == 0) { SendCutHistory(ctx); - } - if (CollectOperation.Get()) { - // finish collect operation from local base - IsCollectEventSent = true; - StoreCollectComplete(ctx); - } else { - // initiate collection if trash was loaded from local base - PrepareCollectIfNeeded(ctx); + if (CollectOperation.Get()) { + // finish collect operation from local base + StoreCollectComplete(ctx); + } else { + // initiate collection if trash was loaded from local base + IsCollectEventSent = false; + PrepareCollectIfNeeded(ctx); + } } } void TKeyValueState::RegisterInitialCollectResult(const TActorContext &ctx) { - LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " InitialCollectsSent# " << InitialCollectsSent << " Marker# KV50"); if (--InitialCollectsSent == 0) { SendCutHistory(ctx); + if (CollectOperation.Get()) { + // finish collect operation from local base + StoreCollectComplete(ctx); + } else { + IsCollectEventSent = false; + // initiate collection if trash was loaded from local base + PrepareCollectIfNeeded(ctx); + } } } diff --git a/ydb/core/keyvalue/keyvalue_state.h b/ydb/core/keyvalue/keyvalue_state.h index d179e1dfc6..ffdc1f7250 100644 --- a/ydb/core/keyvalue/keyvalue_state.h +++ b/ydb/core/keyvalue/keyvalue_state.h @@ -232,6 +232,12 @@ public: } }; + enum class ECollectCookie { + Hard = 1, + SoftInitial = 2, + Soft = 3, + }; + ui32 GetGeneration() const { return StoredState.UserGeneration; } diff --git a/ydb/core/keyvalue/keyvalue_state_collect.cpp b/ydb/core/keyvalue/keyvalue_state_collect.cpp index 604c1e20ff..248d62fd3e 100644 --- a/ydb/core/keyvalue/keyvalue_state_collect.cpp +++ b/ydb/core/keyvalue/keyvalue_state_collect.cpp @@ -6,7 +6,7 @@ namespace NKeyValue { void TKeyValueState::PrepareCollectIfNeeded(const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::KEYVALUE, "PrepareCollectIfNeeded KeyValue# " << TabletId << " Marker# KV61"); - if (CollectOperation.Get()) { + if (CollectOperation.Get() || InitialCollectsSent) { // We already are trying to collect something, just pass this time. return; } diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index 093cd7d46c..a11c2f6b16 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -11,6 +11,13 @@ const bool ENABLE_DETAILED_KV_LOG = false; namespace NKikimr { namespace { +template <typename... TArgs> +void TestLog(TArgs&&... args) { + if constexpr (ENABLE_DETAILED_KV_LOG) { + Cerr << ((TStringBuilder() << ... << args) << Endl); + } +} + void SetupLogging(TTestActorRuntime& runtime) { NActors::NLog::EPriority priority = ENABLE_DETAILED_KV_LOG ? NLog::PRI_DEBUG : NLog::PRI_ERROR; NActors::NLog::EPriority otherPriority = NLog::PRI_ERROR; @@ -500,21 +507,53 @@ struct TKeyValuePair { }; template <typename TRequestEvent> -void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) { +void SendRequest(const decltype(std::declval<TRequestEvent>().Record) &record, TTestContext &tc) { + auto request = std::make_unique<TRequestEvent>(); + request->Record = record; + TestLog("Send event# ", TypeName(*request)); + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries()); +} + +template <typename TRequestEvent> +auto ReceiveResponse(TTestContext &tc) -> decltype(std::declval<typename TRequestEvent::TResponse>().Record) { TAutoPtr<IEventHandle> handle; - std::unique_ptr<TRequestEvent> request; - typename TRequestEvent::TResponse *response; + typename TRequestEvent::TResponse *response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle); + TestLog("Received event# ", TypeName(*response)); + return response->Record; +} + +template <typename TRequestEvent> +void ExecuteEvent(TDesiredPair<TRequestEvent> &dp, TTestContext &tc) { DoWithRetry([&] { tc.Runtime->ResetScheduledCount(); - request = std::make_unique<TRequestEvent>(); - request->Record = dp.Request; - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.release(), 0, GetPipeConfigWithRetries()); - response = tc.Runtime->GrabEdgeEvent<typename TRequestEvent::TResponse>(handle); - dp.Response = response->Record; + SendRequest<TRequestEvent>(dp.Request, tc); + dp.Response = ReceiveResponse<TRequestEvent>(tc); return true; }); } + +void SendWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel, + NKikimrKeyValue::Priorities::Priority priority) +{ + NKikimrKeyValue::ExecuteTransactionRequest et; + + for (auto &[key, value] : pairs) { + NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = et.add_commands(); + NKikimrKeyValue::ExecuteTransactionRequest::Command::Write *write = cmd->mutable_write(); + + write->set_key(key); + write->set_value(value); + write->set_storage_channel(storageChannel); + write->set_priority(priority); + } + + et.set_tablet_id(tc.TabletId); + et.set_lock_generation(lockedGeneration); + + SendRequest<TEvKeyValue::TEvExecuteTransaction>(et, tc); +} + template <NKikimrKeyValue::Statuses::ReplyStatus ExpectedStatus = NKikimrKeyValue::Statuses::RSTATUS_OK> void ExecuteWrite(TTestContext &tc, const TDeque<TKeyValuePair> &pairs, ui64 lockedGeneration, ui64 storageChannel, NKikimrKeyValue::Priorities::Priority priority) @@ -547,6 +586,46 @@ enum class EBorderKind { Without }; + +void SendDeleteRange(TTestContext &tc, + const TString &from, EBorderKind fromKind, + const TString &to, EBorderKind toKind, + ui64 lock_generation) +{ + NKikimrKeyValue::ExecuteTransactionRequest record; + record.set_lock_generation(lock_generation); + record.set_tablet_id(tc.TabletId); + + NKikimrKeyValue::ExecuteTransactionRequest::Command *cmd = record.add_commands(); + NKikimrKeyValue::ExecuteTransactionRequest::Command::DeleteRange *deleteRange = cmd->mutable_delete_range(); + + auto *r = deleteRange->mutable_range(); + + switch (fromKind) { + case EBorderKind::Include: + r->set_from_key_inclusive(from); + break; + case EBorderKind::Exclude: + r->set_from_key_exclusive(from); + break; + case EBorderKind::Without: + break; + } + + switch (toKind) { + case EBorderKind::Include: + r->set_to_key_inclusive(to); + break; + case EBorderKind::Exclude: + r->set_to_key_exclusive(to); + break; + case EBorderKind::Without: + break; + } + + SendRequest<TEvKeyValue::TEvExecuteTransaction>(record, tc); +} + template <bool IsSuccess = true> void ExecuteDeleteRange(TTestContext &tc, const TString &from, EBorderKind fromKind, @@ -869,6 +948,141 @@ Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOk) { } +Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEvents) { + TTestContext tc; + TMaybe<TActorId> tabletActor; + bool firstCollect = true; + auto setup = [&] (TTestActorRuntime &runtime) { + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) { + TestLog("CollectGarbageResult!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie); + } + if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) { + TestLog("CollectGarbage!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie); + if (!firstCollect) { + TestLog("Drop!"); + return TTestActorRuntime::EEventAction::DROP; + } + } + if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) { + TestLog("Collect!!! ", event->Sender, "->", event->Recipient, " Cookie# ", event->Cookie); + if (firstCollect) { + TestLog("Drop!"); + runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill)); + firstCollect = false; + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& /*parentId*/, const TActorId& actorId) { + if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") { + tabletActor = actorId; + TestLog("KV tablet was created ", actorId); + } + }); + }; + TFinalizer finalizer(tc); + bool activeZone = false; + tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone); + ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill)); + ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteRead(tc, "key", "value", 0, 0, 0); + ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); + TDispatchOptions options; + options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); + tc.Runtime->DispatchEvents(options); + ExecuteRead<NKikimrKeyValue::Statuses::RSTATUS_NOT_FOUND>(tc, "key", "", 0, 0, 0); + ExecuteRead(tc, "key1", "value1", 0, 0, 0); +} + + +Y_UNIT_TEST(TestWriteReadDeleteWithRestartsAndCatchCollectGarbageEventsWithSlowInitialGC) { + TTestContext tc; + TMaybe<TActorId> tabletActor; + TMaybe<TActorId> collectorActor; + ui32 collectStep = 1; + TQueue<TAutoPtr<IEventHandle>> savedInitialEvents; + + auto setup = [&] (TTestActorRuntime &runtime) { + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) { + //TestLog("Event ", (event && event->GetBase() ? TypeName(*event->GetBase()) : "unknown"), ' ', event->Sender, "->", event->Recipient); + if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbageResult::EventType) { + if (collectStep == 2) { + savedInitialEvents.push(event); + return TTestActorRuntime::EEventAction::DROP; + } + } + if (tabletActor && *tabletActor == event->Sender && event->GetTypeRewrite() == TEvBlobStorage::TEvCollectGarbage::EventType) { + } + if (tabletActor && *tabletActor == event->Recipient && event->GetTypeRewrite() == TEvKeyValue::TEvCollect::EventType) { + switch (collectStep++) { + case 1: { + runtime.Send(new IEventHandle(event->Recipient, event->Recipient, new TKikimrEvents::TEvPoisonPill)); + return TTestActorRuntime::EEventAction::DROP; + } + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + runtime.SetRegistrationObserverFunc([&](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + if (tabletActor && *tabletActor == parentId) { + TestLog("CreateActor by KV ", actorId, ' ', TypeName(*runtime.FindActor(actorId))); + } + if (TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueFlat") { + tabletActor = actorId; + } + if (tabletActor && *tabletActor == parentId && TypeName(*runtime.FindActor(actorId)) == "NKikimr::NKeyValue::TKeyValueCollector") { + collectorActor = actorId; + } + }); + }; + TFinalizer finalizer(tc); + bool activeZone = false; + tc.Prepare(INITIAL_TEST_DISPATCH_NAME, setup, activeZone); + ExecuteWrite(tc, {{"key", "value"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + tc.Runtime->Send(new IEventHandle(*tabletActor, *tabletActor, new TKikimrEvents::TEvPoisonPill)); + ExecuteWrite(tc, {{"key1", "value1"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteWrite(tc, {{"key2", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + ExecuteRead(tc, "key", "value", 0, 0, 0); + ExecuteDeleteRange(tc, "key", EBorderKind::Include, "key", EBorderKind::Include, 0); + TDispatchOptions options; + options.FinalEvents.push_back(TKikimrEvents::TEvPoisonPill::EventType); + tc.Runtime->DispatchEvents(options); + ExecuteWrite(tc, {{"key3", "value2"}}, 0, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + + + TDispatchOptions options2; + options2.FinalEvents.push_back(TEvKeyValue::TEvCollect::EventType); + + bool onlyOneCollect = false; + try { + tc.Runtime->DispatchEvents(options2); + } catch (NActors::TSchedulingLimitReachedException) { + onlyOneCollect = true; + } + + while (savedInitialEvents.size()) { + tc.Runtime->Send(savedInitialEvents.front().Release()); + savedInitialEvents.pop(); + } + + TDispatchOptions options3; + options3.FinalEvents.push_back(TEvKeyValue::TEvEraseCollect::EventType); + + tc.Runtime->DispatchEvents(options3); + + try { + tc.Runtime->DispatchEvents(options3); + } catch (NActors::TSchedulingLimitReachedException) { + onlyOneCollect = true; + } + UNIT_ASSERT(onlyOneCollect); +} + + Y_UNIT_TEST(TestWriteReadDeleteWithRestartsThenResponseOkWithNewApi) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { |