aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2022-09-06 13:55:59 +0300
committerkruall <kruall@ydb.tech>2022-09-06 13:55:59 +0300
commit4d2299379e613653fa653e340e899dee02174f57 (patch)
tree92f586603e9ab73997ef10b4f2b8a77b3c203eb9
parent2af9037faec537692efae1d34591f4f346498b85 (diff)
downloadydb-4d2299379e613653fa653e340e899dee02174f57.tar.gz
Fix GC in kv tablet,
-rw-r--r--util/folder/path_ut.pyx7
-rw-r--r--ydb/core/keyvalue/keyvalue_collector.cpp72
-rw-r--r--ydb/core/keyvalue/keyvalue_ut.cpp25
-rw-r--r--ydb/core/protos/services.proto2
4 files changed, 89 insertions, 17 deletions
diff --git a/util/folder/path_ut.pyx b/util/folder/path_ut.pyx
index e2537683ee..e3b63f5c4c 100644
--- a/util/folder/path_ut.pyx
+++ b/util/folder/path_ut.pyx
@@ -356,14 +356,15 @@ class TestPath(unittest.TestCase):
def test_real_path(self):
cdef TFsPath path = TFsPath("test_real_path_a")
path.Touch()
- self.assertEquals(path.RealPath().GetPath(), os.path.join(yatest.common.work_path(), "test_real_path_a"))
- self.assertEquals(path.RealLocation().GetPath(), os.path.join(yatest.common.work_path(), "test_real_path_a"))
+ real_work_path = os.path.join(os.path.realpath(yatest.common.work_path()), "test_real_path_a")
+ self.assertEquals(path.RealPath().GetPath(), real_work_path)
+ self.assertEquals(path.RealLocation().GetPath(), real_work_path)
with self.assertRaises(RuntimeError):
path.ReadLink()
def test_cwd(self):
cdef TFsPath path = TFsPath.Cwd()
- self.assertEquals(path.GetPath(), yatest.common.work_path())
+ self.assertEquals(path.GetPath(), os.path.realpath(yatest.common.work_path()))
def test_swap(self):
cdef TFsPath first = TFsPath("first")
diff --git a/ydb/core/keyvalue/keyvalue_collector.cpp b/ydb/core/keyvalue/keyvalue_collector.cpp
index 69b1e6fa1c..0cac380251 100644
--- a/ydb/core/keyvalue/keyvalue_collector.cpp
+++ b/ydb/core/keyvalue/keyvalue_collector.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/base/counters.h>
#include <ydb/core/blobstorage/dsproxy/blobstorage_backoff.h>
+#include <ydb/core/util/stlog.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
namespace NKikimr {
@@ -91,6 +92,10 @@ public:
CollectedDoNotKeep.reserve(MaxCollectGarbageFlagsPerMessage);
}
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC04, "Start KeyValueCollector",
+ (TabletId, TabletInfo->TabletID), (IsMultiStepMode, IsMultiStepMode),
+ (Keep, CollectOperation->Keep.size()), (DoNotKeep, CollectOperation->DoNotKeep.size()));
+
Sort(CollectOperation->Keep);
for (const auto &blob: CollectOperation->Keep) {
ui32 groupId = TabletInfo->ChannelInfo(blob.Channel())->GroupForGeneration(blob.Generation());
@@ -112,12 +117,19 @@ public:
Become(&TThis::StateWait);
}
- bool ChangeGroup(const TActorContext &ctx) {
+ bool ChangeChannel(const TActorContext &ctx) {
if (CollectorForGroupForChannel.back().empty()) {
while (CollectorForGroupForChannel.size() && CollectorForGroupForChannel.back().empty()) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC09, "Empty channel, it's erased",
+ (TabletId, TabletInfo->TabletID),
+ (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)));
CollectorForGroupForChannel.pop_back();
}
if (CollectorForGroupForChannel.empty()) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC06, "Send TEvCompleteGC and die",
+ (TabletId, TabletInfo->TabletID),
+ (ErasedGroupId, CurrentChannelGroup->first),
+ (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)));
ctx.Send(KeyValueActorId, new TEvKeyValue::TEvCompleteGC());
Die(ctx);
return true;
@@ -127,8 +139,17 @@ public:
} else {
do {
if (ChannelIdxInVector) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC07, "Move to next channel",
+ (TabletId, TabletInfo->TabletID),
+ (ErasedGroupId, CurrentChannelGroup->first),
+ (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)));
ChannelIdxInVector--;
} else {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC10, "End of round; Send PartitialCompleteGC",
+ (TabletId, TabletInfo->TabletID),
+ (ErasedGroupId, CurrentChannelGroup->first),
+ (Channel, GetChannelIdxFromVecIdx(ChannelIdxInVector)),
+ (CollectedDoNotKeep, CollectedDoNotKeep.size()));
ChannelIdxInVector = CollectorForGroupForChannel.size() - 1;
CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
SendPartitialCompleteGC(true);
@@ -137,12 +158,15 @@ public:
} while (CollectorForGroupForChannel[ChannelIdxInVector].empty());
CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].begin();
}
+
return false;
}
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev, const TActorContext &ctx) {
-
NKikimrProto::EReplyStatus status = ev->Get()->Status;
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC11, "Receive TEvCollectGarbageResult",
+ (TabletId, TabletInfo->TabletID),
+ (Status, status));
if (status == NKikimrProto::OK) {
// Success
@@ -153,12 +177,20 @@ public:
isLastRequestInCollector = (collector.Step == collector.Keep.size() + collector.DoNotKeep.size());
}
if (isLastRequestInCollector) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Last group was empty, it's erased",
+ (TabletId, TabletInfo->TabletID),
+ (Status, status),
+ (ErasedGroupId, CurrentChannelGroup->first));
CurrentChannelGroup = CollectorForGroupForChannel[ChannelIdxInVector].erase(CurrentChannelGroup);
} else {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC08, "Move to next group, it's erased",
+ (TabletId, TabletInfo->TabletID),
+ (Status, status),
+ (ErasedGroupId, CurrentChannelGroup->first));
CurrentChannelGroup++;
}
if (CurrentChannelGroup == CollectorForGroupForChannel[ChannelIdxInVector].end()) {
- if (ChangeGroup(ctx)) {
+ if (ChangeChannel(ctx)) {
return;
}
}
@@ -171,7 +203,7 @@ public:
CollectorErrors++;
if (status == NKikimrProto::RACE || status == NKikimrProto::BLOCKED || status == NKikimrProto::NO_GROUP || CollectorErrors > CollectorMaxErrors) {
- LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
+ LOG_ERROR_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID
<< " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status)
<< " from Group# " << groupId << " Channel# " << channelIdx
<< " CollectorErrors# " << CollectorErrors
@@ -188,7 +220,7 @@ public:
const TDuration &timeout = TDuration::MilliSeconds(backoffMs);
ctx.Schedule(timeout, new TEvents::TEvWakeup());
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID
<< " Collector got Status# " << NKikimrProto::EReplyStatus_Name(status)
<< " from Group# " << groupId << " Channel# " << channelIdx
<< " Retrying immediately. Marker# KVC02");
@@ -200,7 +232,7 @@ public:
Y_UNUSED(ev);
ui32 channelIdx = GetChannelIdxFromVecIdx(ChannelIdxInVector);
ui32 groupId = CurrentChannelGroup->first;
- LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
+ LOG_DEBUG_S(ctx, NKikimrServices::KEYVALUE_GC, "Tablet# " << TabletInfo->TabletID
<< " Collector retrying with"
<< " Group# " << groupId << " Channel# " << channelIdx
<< " Marker# KVC03");
@@ -209,14 +241,16 @@ public:
}
void Handle(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KVC12, "Poisoned",
+ (TabletId, TabletInfo->TabletID));
Y_UNUSED(ev);
Die(ctx);
return;
}
void Handle(TEvKeyValue::TEvContinueGC::TPtr &ev) {
- LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
- << " Collector continue GC Marker# KVC04");
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC13, "Collector continue GC",
+ (TabletId, TabletInfo->TabletID));
MinGenStepInCircle = {};
CollectedDoNotKeep = std::move(ev->Get()->Buffer);
CollectedDoNotKeep.clear();
@@ -224,9 +258,8 @@ public:
}
void SendPartitialCompleteGC(bool endCircle) {
- LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::KEYVALUE, "Tablet# " << TabletInfo->TabletID
- << "end of round# " << (endCircle ? "yes" : "no")
- << " Collector send PartitialCompleteGC Marker# KVC05");
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC14, "Collector send PartitialCompleteGC",
+ (TabletId, TabletInfo->TabletID), (EndOfRound, (endCircle ? "yes" : "no")));
auto ev = std::make_unique<TEvKeyValue::TEvPartitialCompleteGC>();
if (endCircle && MinGenStepInCircle) {
ev->CollectedGenerationStep = std::move(MinGenStepInCircle);
@@ -249,7 +282,12 @@ public:
doNotKeepSize = 0;
}
- if (doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) {
+ if (CollectedDoNotKeep.size() && doNotKeepSize && CollectedDoNotKeep.size() + doNotKeepSize > MaxCollectGarbageFlagsPerMessage) {
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC15, "CollectedDoNotKeep was oevrflow; Send PartitialCompleteGC",
+ (TabletId, TabletInfo->TabletID),
+ (doNotKeepSize, doNotKeepSize),
+ (CollectedDoNotKeep.size, CollectedDoNotKeep.size()),
+ (MaxCollectGarbageFlagsPerMessage, MaxCollectGarbageFlagsPerMessage));
SendPartitialCompleteGC(false);
return;
}
@@ -299,6 +337,12 @@ public:
ui32 collectStep = CollectOperation->Header.CollectStep;
ui32 channelIdx = GetChannelIdxFromVecIdx(CollectorForGroupForChannel.size() - 1);
ui32 groupId = CurrentChannelGroup->first;
+
+
+ STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE, KVC16, "Send GC request",
+ (TabletId, TabletInfo->TabletID), (CollectGeneration, collectGeneration),
+ (CollectStep, collectStep), (ChannelIdx, channelIdx), (GroupId, groupId),
+ (KeepSize, keepSize), (DoNotKeepSize, doNotKeepSize), (IsLast, isLast));
SendToBSProxy(ctx, groupId,
new TEvBlobStorage::TEvCollectGarbage(TabletInfo->TabletID, RecordGeneration, PerGenerationCounter,
channelIdx, isLast, collectGeneration, collectStep,
@@ -318,8 +362,8 @@ public:
};
IActor* CreateKeyValueCollector(const TActorId &keyValueActorId, TIntrusivePtr<TCollectOperation> &collectOperation,
- const TTabletStorageInfo *tabletInfo, ui32 recordGeneration, ui32 perGenerationCounter, bool isSpringCleanup) {
- return new TKeyValueCollector(keyValueActorId, collectOperation, tabletInfo, recordGeneration,
+ const TTabletStorageInfo *TabletInfo, ui32 recordGeneration, ui32 perGenerationCounter, bool isSpringCleanup) {
+ return new TKeyValueCollector(keyValueActorId, collectOperation, TabletInfo, recordGeneration,
perGenerationCounter, isSpringCleanup);
}
diff --git a/ydb/core/keyvalue/keyvalue_ut.cpp b/ydb/core/keyvalue/keyvalue_ut.cpp
index ca3089ac35..53e265aceb 100644
--- a/ydb/core/keyvalue/keyvalue_ut.cpp
+++ b/ydb/core/keyvalue/keyvalue_ut.cpp
@@ -2366,5 +2366,30 @@ Y_UNIT_TEST(TestObtainLockNewApi) {
});
}
+
+Y_UNIT_TEST(TestLargeWriteAndDelete) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString &dispatchName, std::function<void(TTestActorRuntime&)> setup, bool &activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ ExecuteObtainLock(tc, 1);
+ ui32 iteration = 0;
+ // for (ui32 iteration = 0; iteration < 10; ++iteration) {
+ TDeque<TKeyValuePair> keys;
+ for (ui32 idx = 0; idx < 15'000; ++idx) {
+ TString key = TStringBuilder() << iteration << ':' << idx;
+ keys.push_back({key, "value"});
+ }
+ ExecuteWrite(tc, keys, 1, 2, NKikimrKeyValue::Priorities::PRIORITY_REALTIME);
+
+ TString fromKey = TStringBuilder() << iteration << ':' << 1'000;
+ ExecuteDeleteRange(tc, fromKey, EBorderKind::Include, "", EBorderKind::Without, 1);
+ // }
+ ExecuteDeleteRange(tc, "", EBorderKind::Without, "", EBorderKind::Without, 1);
+ });
+}
+
} // TKeyValueTest
} // NKikimr
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 4580254df6..799648646c 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -156,8 +156,10 @@ enum EServiceKikimr {
GRPC_PROXY_NO_CONNECT_ACCESS = 417;
READ_TABLE_API = 414; // deprecated, use RPC_REQUEST
RPC_REQUEST = 416;
+
// KEY VALUE section
KEYVALUE = 420;
+ KEYVALUE_GC = 421;
WILSON = 430;