diff options
author | kruall <kruall@ydb.tech> | 2022-09-06 13:55:59 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2022-09-06 13:55:59 +0300 |
commit | 4d2299379e613653fa653e340e899dee02174f57 (patch) | |
tree | 92f586603e9ab73997ef10b4f2b8a77b3c203eb9 | |
parent | 2af9037faec537692efae1d34591f4f346498b85 (diff) | |
download | ydb-4d2299379e613653fa653e340e899dee02174f57.tar.gz |
Fix GC in kv tablet,
-rw-r--r-- | util/folder/path_ut.pyx | 7 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_collector.cpp | 72 | ||||
-rw-r--r-- | ydb/core/keyvalue/keyvalue_ut.cpp | 25 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 2 |
4 files changed, 89 insertions, 17 deletions
diff --git a/util/folder/path_ut.pyx b/util/folder/path_ut.pyx index e2537683ee..e3b63f5c4c 100644 --- a/util/folder/path_ut.pyx +++ b/util/folder/path_ut.pyx @@ -356,14 +356,15 @@ class TestPath(unittest.TestCase): def test_real_path(self): cdef TFsPath path = TFsPath("test_real_path_a") path.Touch() - self.assertEquals(path.RealPath().GetPath(), os.path.join(yatest.common.work_path(), "test_real_path_a")) - self.assertEquals(path.RealLocation().GetPath(), os.path.join(yatest.common.work_path(), "test_real_path_a")) + real_work_path = os.path.join(os.path.realpath(yatest.common.work_path()), "test_real_path_a") + self.assertEquals(path.RealPath().GetPath(), real_work_path) + self.assertEquals(path.RealLocation().GetPath(), real_work_path) with self.assertRaises(RuntimeError): path.ReadLink() def test_cwd(self): cdef TFsPath path = TFsPath.Cwd() - self.assertEquals(path.GetPath(), yatest.common.work_path()) + self.assertEquals(path.GetPath(), os.path.realpath(yatest.common.work_path())) def test_swap(self): cdef TFsPath first = TFsPath("first") diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp index 69b1e6fa1c..0cac380251 100644 --- a/ydb/core/keyvalue/keyvalue_collector.cpp +++ b/ydb/core/keyvalue/keyvalue_collector.cpp @@ -3,6 +3,7 @@ #include <ydb/core/base/counters.h> #include <ydb/core/blobstorage/dsproxy/blobstorage_backoff.h> +#include <ydb/core/util/stlog.h> #include <library/cpp/actors/core/actor_bootstrapped.h> namespace NKikimr { @@ -91,6 +92,10 @@ public: CollectedDoNotKeep.reserve(MaxCollectGarbageFlagsPerMessage); } + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC04, "Start KeyValueCollector", + (TabletId, TabletInfo->TabletID), (IsMultiStepMode, IsMultiStepMode), + (Keep, CollectOperation->Keep.size()), (DoNotKeep, CollectOperation->DoNotKeep.size())); + Sort(CollectOperation->Keep); for (const auto &blob: CollectOperation->Keep) { ui32 groupId = TabletInfo->ChannelInfo(blob.Channel())->GroupForGeneration(blob.Generation()); @@ -112,12 +117,19 @@ public: Become(&TThis::StateWait); } - bool ChangeGroup(const TActorContext &ctx) { + bool ChangeChannel(const TActorContext &ctx) { if (CollectorForGroupForChannel.back().empty()) { while (CollectorForGroupForChannel.size() && CollectorForGroupForChannel.back().empty()) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC09, "Empty channel, it's erased", + (TabletId, TabletInfo->TabletID), + (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector))); CollectorForGroupForChannel.pop_back(); } if (CollectorForGroupForChannel.empty()) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC06, "Send TEvCompleteGC and die", + (TabletId, TabletInfo->TabletID), + (ErasedGroupId, CurrentChannelGroup->first), + (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector))); ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC()); Die(ctx); return true; @@ -127,8 +139,17 @@ public: } else { do { if (ChannelIdxInVector) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC07, "Move to next channel", + (TabletId, TabletInfo->TabletID), + (ErasedGroupId, CurrentChannelGroup->first), + (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector))); ChannelIdxInVector--; } else { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC10, "End of round; Send PartitialCompleteGC", + (TabletId, TabletInfo->TabletID), + (ErasedGroupId, CurrentChannelGroup->first), + (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)), + (CollectedDoNotKeep, CollectedDoNotKeep.size())); ChannelIdxInVector = CollectorForGroupForChannel.size() - 1; CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); SendPartitialCompleteGC(true); @@ -137,12 +158,15 @@ public: } while (CollectorForGroupForChannel[ChannelIdxInVector].empty()); CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin(); } + return false; } void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) { - NKikimrProto::EReplyStatus status = ev->Get()->Status; + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC11, "Receive TEvCollectGarbageResult", + (TabletId, TabletInfo->TabletID), + (Status, status)); if (status == NKikimrProto::OK) { // Success @@ -153,12 +177,20 @@ public: isLastRequestInCollector = (collector.Step == collector.Keep.size() + collector.DoNotKeep.size()); } if (isLastRequestInCollector) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Last group was empty, it's erased", + (TabletId, TabletInfo->TabletID), + (Status, status), + (ErasedGroupId, CurrentChannelGroup->first)); CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].erase(CurrentChannelGroup); } else { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Move to next group, it's erased", + (TabletId, TabletInfo->TabletID), + (Status, status), + (ErasedGroupId, CurrentChannelGroup->first)); CurrentChannelGroup++; } if (CurrentChannelGroup == CollectorForGroupForChannel[ChannelIdxInVector].end()) { - if (ChangeGroup(ctx)) { + if (ChangeChannel(ctx)) { return; } } @@ -171,7 +203,7 @@ public: CollectorErrors++; if (status == NKikimrProto::RACE || status == NKikimrProto::BLOCKED || status == NKikimrProto::NO_GROUP || CollectorErrors > CollectorMaxErrors) { - LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID + LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID << " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status) << " from Group# " << groupId << " Channel# " << channelIdx << " CollectorErrors# " << CollectorErrors @@ -188,7 +220,7 @@ public: const TDuration &timeout = TDuration::MilliSeconds(backoffMs); ctx.Schedule(timeout, new TEvents::TEvWakeup()); } else { - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID << " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status) << " from Group# " << groupId << " Channel# " << channelIdx << " Retrying immediately. Marker# KVC02"); @@ -200,7 +232,7 @@ public: Y_UNUSED(ev); ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector); ui32 groupId = CurrentChannelGroup->first; - LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID + LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID << " Collector retrying with" << " Group# " << groupId << " Channel# " << channelIdx << " Marker# KVC03"); @@ -209,14 +241,16 @@ public: } void Handle(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC12, "Poisoned", + (TabletId, TabletInfo->TabletID)); Y_UNUSED(ev); Die(ctx); return; } void Handle(TEvKeyValue::TEvContinueGC::TPtr &ev) { - LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID - << " Collector continue GC Marker# KVC04"); + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC13, "Collector continue GC", + (TabletId, TabletInfo->TabletID)); MinGenStepInCircle = {}; CollectedDoNotKeep = std::move(ev->Get()->Buffer); CollectedDoNotKeep.clear(); @@ -224,9 +258,8 @@ public: } void SendPartitialCompleteGC(bool endCircle) { - LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID - << "end of round# " << (endCircle ? "yes" : "no") - << " Collector send PartitialCompleteGC Marker# KVC05"); + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC14, "Collector send PartitialCompleteGC", + (TabletId, TabletInfo->TabletID), (EndOfRound, (endCircle ? "yes" : "no"))); auto ev = std::make_unique<TEvKeyValue::TEvPartitialCompleteGC>(); if (endCircle && MinGenStepInCircle) { ev->CollectedGenerationStep = std::move(MinGenStepInCircle); @@ -249,7 +282,12 @@ public: doNotKeepSize = 0; } - if (doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) { + if (CollectedDoNotKeep.size() && doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) { + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC15, "CollectedDoNotKeep was oevrflow; Send PartitialCompleteGC", + (TabletId, TabletInfo->TabletID), + (doNotKeepSize, doNotKeepSize), + (CollectedDoNotKeep.size, CollectedDoNotKeep.size()), + (MaxCollectGarbageFlagsPerMessage, MaxCollectGarbageFlagsPerMessage)); SendPartitialCompleteGC(false); return; } @@ -299,6 +337,12 @@ public: ui32 collectStep = CollectOperation->Header.CollectStep; ui32 channelIdx = GetChannelIdxFromVecIdx(CollectorForGroupForChannel.size() - 1); ui32 groupId = CurrentChannelGroup->first; + + + STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC16, "Send GC request", + (TabletId, TabletInfo->TabletID), (CollectGeneration, collectGeneration), + (CollectStep, collectStep), (ChannelIdx, channelIdx), (GroupId, groupId), + (KeepSize, keepSize), (DoNotKeepSize, doNotKeepSize), (IsLast, isLast)); SendToBSProxy(ctx, groupId, new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter, channelIdx, isLast, collectGeneration, collectStep, @@ -318,8 +362,8 @@ public: }; IActor* CreateKeyValueCollector(const TActorId &keyValueActorId, TIntrusivePtr<TCollectOperation> &collectOperation, - const TTabletStorageInfo *tabletInfo, ui32 recordGeneration, ui32 perGenerationCounter, bool isSpringCleanup) { - return new TKeyValueCollector(keyValueActorId, collectOperation, tabletInfo, recordGeneration, + const TTabletStorageInfo *TabletInfo, ui32 recordGeneration, ui32 perGenerationCounter, bool isSpringCleanup) { + return new TKeyValueCollector(keyValueActorId, collectOperation, TabletInfo, recordGeneration, perGenerationCounter, isSpringCleanup); } diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp index ca3089ac35..53e265aceb 100644 --- a/ydb/core/keyvalue/keyvalue_ut.cpp +++ b/ydb/core/keyvalue/keyvalue_ut.cpp @@ -2366,5 +2366,30 @@ Y_UNIT_TEST(TestObtainLockNewApi) { }); } + +Y_UNIT_TEST(TestLargeWriteAndDelete) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + ExecuteObtainLock(tc, 1); + ui32 iteration = 0; + // for (ui32 iteration = 0; iteration < 10; ++iteration) { + TDeque<TKeyValuePair> keys; + for (ui32 idx = 0; idx < 15'000; ++idx) { + TString key = TStringBuilder() << iteration << ':' << idx; + keys.push_back({key, "value"}); + } + ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME); + + TString fromKey = TStringBuilder() << iteration << ':' << 1'000; + ExecuteDeleteRange(tc, fromKey, EBorderKind::Include, "", EBorderKind::Without, 1); + // } + ExecuteDeleteRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, 1); + }); +} + } // TKeyValueTest } // NKikimr diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 4580254df6..799648646c 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -156,8 +156,10 @@ enum EServiceKikimr { GRPC_PROXY_NO_CONNECT_ACCESS = 417; READ_TABLE_API = 414; // deprecated, use RPC_REQUEST RPC_REQUEST = 416; + // KEY VALUE section KEYVALUE = 420; + KEYVALUE_GC = 421; WILSON = 430; |